From d2d5df88f221d5dcf13edc613f2737d1e079ad14 Mon Sep 17 00:00:00 2001 From: Jakub Matys Date: Fri, 21 Sep 2018 16:01:23 +0200 Subject: [PATCH] Parallel sync improved in order to write all blocks and handle OS signal --- db/sync.go | 72 ++++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/db/sync.go b/db/sync.go index 271ad089..975bf2e6 100644 --- a/db/sync.go +++ b/db/sync.go @@ -203,14 +203,15 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { } var err error var wg sync.WaitGroup - bch := make(chan *bchain.Block, w.syncWorkers) + bch := make([]chan *bchain.Block, w.syncWorkers) + for i := 0; i < w.syncWorkers; i++ { + bch[i] = make(chan *bchain.Block) + } hch := make(chan hashHeight, w.syncWorkers) hchClosed := atomic.Value{} hchClosed.Store(false) - var getBlockMux sync.Mutex - getBlockCond := sync.NewCond(&getBlockMux) - lastConnectedBlock := lower - 1 writeBlockDone := make(chan struct{}) + terminating := make(chan struct{}) writeBlockWorker := func() { defer close(writeBlockDone) bc, err := w.db.InitBulkConnect() @@ -219,15 +220,25 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { } lastBlock := lower - 1 keep := uint32(w.chain.GetChainParser().KeepBlockAddresses()) - for b := range bch { - if lastBlock+1 != b.Height { - glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height) + WriteBlockLoop: + for { + select { + case b := <-bch[(lastBlock+1)%uint32(w.syncWorkers)]: + if b == nil { + // channel is closed and empty - work is done + break WriteBlockLoop + } + if b.Height != lastBlock+1 { + glog.Fatal("writeBlockWorker skipped block, expected block ", lastBlock+1, ", new block ", b.Height) + } + err := bc.ConnectBlock(b, b.Height+keep > higher) + if err != nil { + glog.Fatal("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err) + } + lastBlock = b.Height + case <-terminating: + break WriteBlockLoop } - err := bc.ConnectBlock(b, b.Height+keep > higher) - if err != nil { - glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err) - } - lastBlock = b.Height } err = bc.Close() if err != nil { @@ -239,6 +250,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { defer wg.Done() var err error var block *bchain.Block + GetBlockLoop: for hh := range hch { for { block, err = w.chain.GetBlock(hh.hash, hh.height) @@ -258,24 +270,11 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { if w.dryRun { continue } - getBlockMux.Lock() - for { - // we must make sure that the blocks are written to db in the correct order - if lastConnectedBlock+1 == hh.height { - // we have the right block, pass it to the writeBlockWorker - lastConnectedBlock = hh.height - bch <- block - getBlockCond.Broadcast() - break - } - // break the endless loop on OS signal - if hchClosed.Load() == true { - break - } - // wait for the time this block is top be passed to the writeBlockWorker - getBlockCond.Wait() + select { + case bch[hh.height%uint32(w.syncWorkers)] <- block: + case <-terminating: + break GetBlockLoop } - getBlockMux.Unlock() } glog.Info("getBlockWorker ", i, " exiting...") } @@ -292,6 +291,8 @@ ConnectLoop: select { case <-w.chanOsSignal: err = errors.Errorf("connectBlocksParallel interrupted at height %d", h) + // signal all workers to terminate their loops (error loops are interrupted below) + close(terminating) break ConnectLoop default: hash, err = w.chain.GetBlockHash(h) @@ -315,16 +316,13 @@ ConnectLoop: } } close(hch) - // signal stop to workers that are in a loop + // signal stop to workers that are in a error loop hchClosed.Store(true) - // broadcast syncWorkers times to unstuck all waiting getBlockWorkers - for i := 0; i < w.syncWorkers; i++ { - getBlockCond.Broadcast() - } - // first wait for the getBlockWorkers to finish and then close bch channel - // so that the getBlockWorkers do not write to the closed channel + // wait for workers and close bch that will stop writer loop wg.Wait() - close(bch) + for i := 0; i < w.syncWorkers; i++ { + close(bch[i]) + } <-writeBlockDone return err }