Sync using indexv2
This commit is contained in:
parent
df2a6b5551
commit
b3b8512958
@ -51,7 +51,7 @@ var (
|
||||
prof = flag.String("prof", "", "http server binding [address]:port of the interface to profiling data /debug/pprof/ (default no profiling)")
|
||||
|
||||
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
|
||||
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
|
||||
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks (default 8)")
|
||||
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
|
||||
|
||||
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)")
|
||||
|
||||
60
db/sync.go
60
db/sync.go
@ -187,6 +187,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectBlocksParallel uses parallel goroutines to get data from blockchain daemon
|
||||
func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
||||
type hashHeight struct {
|
||||
hash string
|
||||
@ -200,21 +201,24 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
||||
hchClosed.Store(false)
|
||||
var getBlockMux sync.Mutex
|
||||
getBlockCond := sync.NewCond(&getBlockMux)
|
||||
totalWaitDuration := time.Duration(0)
|
||||
lastConnectedBlock := int(lower) - 1
|
||||
lastConnectedBlock := lower - 1
|
||||
writeBlockDone := make(chan struct{})
|
||||
writeBlock := func() {
|
||||
writeBlockWorker := func() {
|
||||
defer close(writeBlockDone)
|
||||
lastBlock := lower - 1
|
||||
for b := range bch {
|
||||
// glog.Info("WriteBlock ", b.Height)
|
||||
err = w.db.ConnectBlock(b)
|
||||
if err != nil {
|
||||
glog.Error("WriteBlock worker ", b.Height, " ", b.Hash, " error ", err)
|
||||
if lastBlock+1 != b.Height {
|
||||
glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height)
|
||||
}
|
||||
err := w.db.ConnectBlock(b)
|
||||
if err != nil {
|
||||
glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
|
||||
}
|
||||
lastBlock = b.Height
|
||||
}
|
||||
glog.Info("WriteBlock exiting...")
|
||||
}
|
||||
getBlock := func(i int) {
|
||||
getBlockWorker := func(i int) {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
var block *bchain.Block
|
||||
@ -224,10 +228,10 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
||||
if err != nil {
|
||||
// signal came while looping in the error loop
|
||||
if hchClosed.Load() == true {
|
||||
glog.Error("Worker ", i, " connect block error ", err, ". Exiting...")
|
||||
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...")
|
||||
return
|
||||
}
|
||||
glog.Error("Worker ", i, " connect block error ", err, ". Retrying...")
|
||||
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...")
|
||||
w.metrics.IndexResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
} else {
|
||||
@ -237,34 +241,32 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
||||
if w.dryRun {
|
||||
continue
|
||||
}
|
||||
start := time.Now()
|
||||
getBlockMux.Lock()
|
||||
for {
|
||||
if uint32(lastConnectedBlock+1) == hh.height {
|
||||
lastConnectedBlock = int(hh.height)
|
||||
// get data to writeBlock routine
|
||||
// glog.Info("Worker ", i, " have block ", hh.height, ". Sending.")
|
||||
// we must make sure that the blocks are written to db in the correct order
|
||||
if lastConnectedBlock+1 == hh.height {
|
||||
// we have the right block, pass it to the writeBlockWorker
|
||||
lastConnectedBlock = hh.height
|
||||
bch <- block
|
||||
totalWaitDuration += time.Since(start)
|
||||
getBlockCond.Broadcast()
|
||||
getBlockMux.Unlock()
|
||||
break
|
||||
}
|
||||
// glog.Info("Worker ", i, " have block ", hh.height, ". Waiting.")
|
||||
getBlockCond.Wait()
|
||||
// break the endless loop on OS signal
|
||||
if hchClosed.Load() == true {
|
||||
break
|
||||
}
|
||||
// wait for the time this block is top be passed to the writeBlockWorker
|
||||
getBlockCond.Wait()
|
||||
}
|
||||
getBlockMux.Unlock()
|
||||
}
|
||||
glog.Info("Worker ", i, " exiting...")
|
||||
glog.Info("getBlockWorker ", i, " exiting...")
|
||||
}
|
||||
for i := 0; i < w.syncWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go getBlock(i)
|
||||
go getBlockWorker(i)
|
||||
}
|
||||
go writeBlock()
|
||||
|
||||
go writeBlockWorker()
|
||||
var hash string
|
||||
ConnectLoop:
|
||||
for h := lower; h <= higher; {
|
||||
@ -282,16 +284,20 @@ ConnectLoop:
|
||||
}
|
||||
hch <- hashHeight{hash, h}
|
||||
if h > 0 && h%1000 == 0 {
|
||||
glog.Info("connecting block ", h, " ", hash, " wait for writeBlock ", totalWaitDuration)
|
||||
glog.Info("connecting block ", h, " ", hash)
|
||||
}
|
||||
h++
|
||||
}
|
||||
}
|
||||
close(hch)
|
||||
// signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop
|
||||
// signal stop to workers that are in a loop
|
||||
hchClosed.Store(true)
|
||||
// first wait for the getBlock routines to finish and then close bch channel
|
||||
getBlockCond.Broadcast()
|
||||
// broadcast syncWorkers times to unstuck all waiting getBlockWorkers
|
||||
for i := 0; i < w.syncWorkers; i++ {
|
||||
getBlockCond.Broadcast()
|
||||
}
|
||||
// first wait for the getBlockWorkers to finish and then close bch channel
|
||||
// so that the getBlockWorkers do not write to the closed channel
|
||||
wg.Wait()
|
||||
close(bch)
|
||||
<-writeBlockDone
|
||||
|
||||
Loading…
Reference in New Issue
Block a user