Parallel sync improved in order to write all blocks and handle OS signal

This commit is contained in:
Jakub Matys 2018-09-21 16:01:23 +02:00 committed by Martin Boehm
parent b97b562ff7
commit d2d5df88f2

View File

@ -203,14 +203,15 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
} }
var err error var err error
var wg sync.WaitGroup var wg sync.WaitGroup
bch := make(chan *bchain.Block, w.syncWorkers) bch := make([]chan *bchain.Block, w.syncWorkers)
for i := 0; i < w.syncWorkers; i++ {
bch[i] = make(chan *bchain.Block)
}
hch := make(chan hashHeight, w.syncWorkers) hch := make(chan hashHeight, w.syncWorkers)
hchClosed := atomic.Value{} hchClosed := atomic.Value{}
hchClosed.Store(false) hchClosed.Store(false)
var getBlockMux sync.Mutex
getBlockCond := sync.NewCond(&getBlockMux)
lastConnectedBlock := lower - 1
writeBlockDone := make(chan struct{}) writeBlockDone := make(chan struct{})
terminating := make(chan struct{})
writeBlockWorker := func() { writeBlockWorker := func() {
defer close(writeBlockDone) defer close(writeBlockDone)
bc, err := w.db.InitBulkConnect() bc, err := w.db.InitBulkConnect()
@ -219,15 +220,25 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
} }
lastBlock := lower - 1 lastBlock := lower - 1
keep := uint32(w.chain.GetChainParser().KeepBlockAddresses()) keep := uint32(w.chain.GetChainParser().KeepBlockAddresses())
for b := range bch { WriteBlockLoop:
if lastBlock+1 != b.Height { for {
glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height) select {
case b := <-bch[(lastBlock+1)%uint32(w.syncWorkers)]:
if b == nil {
// channel is closed and empty - work is done
break WriteBlockLoop
}
if b.Height != lastBlock+1 {
glog.Fatal("writeBlockWorker skipped block, expected block ", lastBlock+1, ", new block ", b.Height)
}
err := bc.ConnectBlock(b, b.Height+keep > higher)
if err != nil {
glog.Fatal("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
}
lastBlock = b.Height
case <-terminating:
break WriteBlockLoop
} }
err := bc.ConnectBlock(b, b.Height+keep > higher)
if err != nil {
glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
}
lastBlock = b.Height
} }
err = bc.Close() err = bc.Close()
if err != nil { if err != nil {
@ -239,6 +250,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
defer wg.Done() defer wg.Done()
var err error var err error
var block *bchain.Block var block *bchain.Block
GetBlockLoop:
for hh := range hch { for hh := range hch {
for { for {
block, err = w.chain.GetBlock(hh.hash, hh.height) block, err = w.chain.GetBlock(hh.hash, hh.height)
@ -258,24 +270,11 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
if w.dryRun { if w.dryRun {
continue continue
} }
getBlockMux.Lock() select {
for { case bch[hh.height%uint32(w.syncWorkers)] <- block:
// we must make sure that the blocks are written to db in the correct order case <-terminating:
if lastConnectedBlock+1 == hh.height { break GetBlockLoop
// we have the right block, pass it to the writeBlockWorker
lastConnectedBlock = hh.height
bch <- block
getBlockCond.Broadcast()
break
}
// break the endless loop on OS signal
if hchClosed.Load() == true {
break
}
// wait for the time this block is top be passed to the writeBlockWorker
getBlockCond.Wait()
} }
getBlockMux.Unlock()
} }
glog.Info("getBlockWorker ", i, " exiting...") glog.Info("getBlockWorker ", i, " exiting...")
} }
@ -292,6 +291,8 @@ ConnectLoop:
select { select {
case <-w.chanOsSignal: case <-w.chanOsSignal:
err = errors.Errorf("connectBlocksParallel interrupted at height %d", h) err = errors.Errorf("connectBlocksParallel interrupted at height %d", h)
// signal all workers to terminate their loops (error loops are interrupted below)
close(terminating)
break ConnectLoop break ConnectLoop
default: default:
hash, err = w.chain.GetBlockHash(h) hash, err = w.chain.GetBlockHash(h)
@ -315,16 +316,13 @@ ConnectLoop:
} }
} }
close(hch) close(hch)
// signal stop to workers that are in a loop // signal stop to workers that are in a error loop
hchClosed.Store(true) hchClosed.Store(true)
// broadcast syncWorkers times to unstuck all waiting getBlockWorkers // wait for workers and close bch that will stop writer loop
for i := 0; i < w.syncWorkers; i++ {
getBlockCond.Broadcast()
}
// first wait for the getBlockWorkers to finish and then close bch channel
// so that the getBlockWorkers do not write to the closed channel
wg.Wait() wg.Wait()
close(bch) for i := 0; i < w.syncWorkers; i++ {
close(bch[i])
}
<-writeBlockDone <-writeBlockDone
return err return err
} }