From df2a6b55511cb2ec7d05f3a9e40d6b2b43de80ad Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 29 Apr 2018 21:35:45 +0200 Subject: [PATCH] Sync using indexv2 - WIP --- db/sync.go | 151 +++++++++++++++-------------------------------------- 1 file changed, 42 insertions(+), 109 deletions(-) diff --git a/db/sync.go b/db/sync.go index 4557a738..fb681bd4 100644 --- a/db/sync.go +++ b/db/sync.go @@ -175,6 +175,9 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { if onNewBlock != nil { onNewBlock(res.block.Hash) } + if res.block.Height > 0 && res.block.Height%1000 == 0 { + glog.Info("connected block ", res.block.Height, " ", res.block.Hash) + } } if lastRes.block != nil { @@ -191,15 +194,27 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { } var err error var wg sync.WaitGroup + bch := make(chan *bchain.Block, w.syncWorkers) hch := make(chan hashHeight, w.syncWorkers) hchClosed := atomic.Value{} hchClosed.Store(false) - lastConnectedBlock := int(lower) - 1 - connectedCh := make([]chan struct{}, w.syncWorkers) - var connectedMux sync.Mutex + var getBlockMux sync.Mutex + getBlockCond := sync.NewCond(&getBlockMux) totalWaitDuration := time.Duration(0) - totalWaitCount := 0 - work := func(i int) { + lastConnectedBlock := int(lower) - 1 + writeBlockDone := make(chan struct{}) + writeBlock := func() { + defer close(writeBlockDone) + for b := range bch { + // glog.Info("WriteBlock ", b.Height) + err = w.db.ConnectBlock(b) + if err != nil { + glog.Error("WriteBlock worker ", b.Height, " ", b.Hash, " error ", err) + } + } + glog.Info("WriteBlock exiting...") + } + getBlock := func(i int) { defer wg.Done() var err error var block *bchain.Block @@ -222,59 +237,33 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { if w.dryRun { continue } - // check if the block is the next in line to be connected - // if not, wait for the previous block connect to complete - chi := int(hh.height) % w.syncWorkers - waitForBlock := false - waitDuration := time.Duration(0) - glog.Info(i, " Going to connect block ", hh.height) - connectedMux.Lock() - if uint32(lastConnectedBlock+1) != hh.height { - if connectedCh[chi] != nil { - glog.Fatal("Channel ", chi, " is not nil!") + start := time.Now() + getBlockMux.Lock() + for { + if uint32(lastConnectedBlock+1) == hh.height { + lastConnectedBlock = int(hh.height) + // get data to writeBlock routine + // glog.Info("Worker ", i, " have block ", hh.height, ". Sending.") + bch <- block + totalWaitDuration += time.Since(start) + getBlockCond.Broadcast() + getBlockMux.Unlock() + break } - connectedCh[chi] = make(chan struct{}) - waitForBlock = true - } - connectedMux.Unlock() - if waitForBlock { - start := time.Now() - glog.Info(i, " Waiting for block ", hh.height, " ", chi) - <-connectedCh[chi] + // glog.Info("Worker ", i, " have block ", hh.height, ". Waiting.") + getBlockCond.Wait() if hchClosed.Load() == true { - glog.Error("Worker ", i, " connect block error ", err, ". Exiting...") - return + break } - waitDuration = time.Since(start) - connectedCh[chi] = nil } - err = w.db.ConnectBlock(block) - if err != nil { - glog.Error("Worker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err) - } - connectedMux.Lock() - if lastConnectedBlock < int(hh.height) { - lastConnectedBlock = int(hh.height) - } - chi = (chi + 1) % w.syncWorkers - if connectedCh[chi] != nil { - glog.Info(i, " closing channel ", chi) - close(connectedCh[chi]) - connectedCh[chi] = nil - } - totalWaitDuration += waitDuration - if waitDuration > 0 { - totalWaitCount++ - } - glog.Info("Connected block ", hh.height) - connectedMux.Unlock() } glog.Info("Worker ", i, " exiting...") } for i := 0; i < w.syncWorkers; i++ { wg.Add(1) - go work(i) + go getBlock(i) } + go writeBlock() var hash string ConnectLoop: @@ -293,7 +282,7 @@ ConnectLoop: } hch <- hashHeight{hash, h} if h > 0 && h%1000 == 0 { - glog.Info("connecting block ", h, " ", hash, " block wait time ", totalWaitDuration, " wait count ", totalWaitCount) + glog.Info("connecting block ", h, " ", hash, " wait for writeBlock ", totalWaitDuration) } h++ } @@ -301,70 +290,14 @@ ConnectLoop: close(hch) // signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop hchClosed.Store(true) - connectedMux.Lock() - for _, ch := range connectedCh { - if ch != nil { - close(ch) - } - } - connectedMux.Unlock() + // first wait for the getBlock routines to finish and then close bch channel + getBlockCond.Broadcast() wg.Wait() + close(bch) + <-writeBlockDone return err } -func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error { - connected, err := w.isBlockConnected(higher) - if err != nil || connected { - // if higher is over the best block, continue with lower block, otherwise return error - if err != bchain.ErrBlockNotFound { - return err - } - } - - height := lower - hash, err := w.chain.GetBlockHash(lower) - if err != nil { - return err - } - - for height <= higher { - block, err := w.chain.GetBlock(hash, height) - if err != nil { - return err - } - hash = block.Next - height = block.Height + 1 - if w.dryRun { - continue - } - err = w.db.ConnectBlock(block) - if err != nil { - return err - } - if block.Height%1000 == 0 { - glog.Info("connected block ", block.Height, " ", block.Hash) - go w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) - } - } - - return nil -} - -func (w *SyncWorker) isBlockConnected(height uint32) (bool, error) { - local, err := w.db.GetBlockHash(height) - if err != nil { - return false, err - } - remote, err := w.chain.GetBlockHash(height) - if err != nil { - return false, err - } - if local != remote { - return false, nil - } - return true, nil -} - type blockResult struct { block *bchain.Block err error