diff --git a/blockbook.go b/blockbook.go index 7f70cbe4..4eecef01 100644 --- a/blockbook.go +++ b/blockbook.go @@ -51,7 +51,7 @@ var ( prof = flag.String("prof", "", "http server binding [address]:port of the interface to profiling data /debug/pprof/ (default no profiling)") syncChunk = flag.Int("chunk", 100, "block chunk size for processing") - syncWorkers = flag.Int("workers", 8, "number of workers to process blocks") + syncWorkers = flag.Int("workers", 8, "number of workers to process blocks (default 8)") dryRun = flag.Bool("dryrun", false, "do not index blocks, only download") httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)") diff --git a/db/sync.go b/db/sync.go index fb681bd4..7f25a56a 100644 --- a/db/sync.go +++ b/db/sync.go @@ -187,6 +187,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { return nil } +// ConnectBlocksParallel uses parallel goroutines to get data from blockchain daemon func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { type hashHeight struct { hash string @@ -200,21 +201,24 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { hchClosed.Store(false) var getBlockMux sync.Mutex getBlockCond := sync.NewCond(&getBlockMux) - totalWaitDuration := time.Duration(0) - lastConnectedBlock := int(lower) - 1 + lastConnectedBlock := lower - 1 writeBlockDone := make(chan struct{}) - writeBlock := func() { + writeBlockWorker := func() { defer close(writeBlockDone) + lastBlock := lower - 1 for b := range bch { - // glog.Info("WriteBlock ", b.Height) - err = w.db.ConnectBlock(b) - if err != nil { - glog.Error("WriteBlock worker ", b.Height, " ", b.Hash, " error ", err) + if lastBlock+1 != b.Height { + glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height) } + err := w.db.ConnectBlock(b) + if err != nil { + glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err) + } + lastBlock = b.Height } glog.Info("WriteBlock exiting...") } - getBlock := func(i int) { + getBlockWorker := func(i int) { defer wg.Done() var err error var block *bchain.Block @@ -224,10 +228,10 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { if err != nil { // signal came while looping in the error loop if hchClosed.Load() == true { - glog.Error("Worker ", i, " connect block error ", err, ". Exiting...") + glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") return } - glog.Error("Worker ", i, " connect block error ", err, ". Retrying...") + glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...") w.metrics.IndexResyncErrors.With(common.Labels{"error": err.Error()}).Inc() time.Sleep(time.Millisecond * 500) } else { @@ -237,34 +241,32 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { if w.dryRun { continue } - start := time.Now() getBlockMux.Lock() for { - if uint32(lastConnectedBlock+1) == hh.height { - lastConnectedBlock = int(hh.height) - // get data to writeBlock routine - // glog.Info("Worker ", i, " have block ", hh.height, ". Sending.") + // we must make sure that the blocks are written to db in the correct order + if lastConnectedBlock+1 == hh.height { + // we have the right block, pass it to the writeBlockWorker + lastConnectedBlock = hh.height bch <- block - totalWaitDuration += time.Since(start) getBlockCond.Broadcast() - getBlockMux.Unlock() break } - // glog.Info("Worker ", i, " have block ", hh.height, ". Waiting.") - getBlockCond.Wait() + // break the endless loop on OS signal if hchClosed.Load() == true { break } + // wait for the time this block is top be passed to the writeBlockWorker + getBlockCond.Wait() } + getBlockMux.Unlock() } - glog.Info("Worker ", i, " exiting...") + glog.Info("getBlockWorker ", i, " exiting...") } for i := 0; i < w.syncWorkers; i++ { wg.Add(1) - go getBlock(i) + go getBlockWorker(i) } - go writeBlock() - + go writeBlockWorker() var hash string ConnectLoop: for h := lower; h <= higher; { @@ -282,16 +284,20 @@ ConnectLoop: } hch <- hashHeight{hash, h} if h > 0 && h%1000 == 0 { - glog.Info("connecting block ", h, " ", hash, " wait for writeBlock ", totalWaitDuration) + glog.Info("connecting block ", h, " ", hash) } h++ } } close(hch) - // signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop + // signal stop to workers that are in a loop hchClosed.Store(true) - // first wait for the getBlock routines to finish and then close bch channel - getBlockCond.Broadcast() + // broadcast syncWorkers times to unstuck all waiting getBlockWorkers + for i := 0; i < w.syncWorkers; i++ { + getBlockCond.Broadcast() + } + // first wait for the getBlockWorkers to finish and then close bch channel + // so that the getBlockWorkers do not write to the closed channel wg.Wait() close(bch) <-writeBlockDone