From 9bdb83deff1a16beb80fc7b59bb225c259aa13c1 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Thu, 1 Mar 2018 18:37:01 +0100 Subject: [PATCH] Refactor sync code to own file --- blockbook.go | 364 ++------------------------------------------------- db/sync.go | 349 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 362 insertions(+), 351 deletions(-) create mode 100644 db/sync.go diff --git a/blockbook.go b/blockbook.go index b03d6a34..cffe57fa 100644 --- a/blockbook.go +++ b/blockbook.go @@ -6,7 +6,6 @@ import ( "flag" "os" "os/signal" - "sync" "syscall" "time" @@ -40,7 +39,7 @@ var ( dbPath = flag.String("path", "./data", "path to address index directory") - blockHeight = flag.Int("blockheight", -1, "height of the starting block") + blockFrom = flag.Int("blockheight", -1, "height of the starting block") blockUntil = flag.Int("blockuntil", -1, "height of the final block") rollbackHeight = flag.Int("rollback", -1, "rollback to the given height and quit") @@ -74,6 +73,7 @@ var ( chain *bchain.BitcoinRPC mempool *bchain.Mempool index *db.RocksDB + syncWorker *db.SyncWorker callbacksOnNewBlockHash []func(hash string) callbacksOnNewTxAddr []func(txid string, addr string) chanOsSignal chan os.Signal @@ -140,8 +140,13 @@ func main() { return } + syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, uint32(*blockFrom), *dryRun, chanOsSignal) + if err != nil { + glog.Fatalf("NewSyncWorker %v", err) + } + if *synchronize { - if err := resyncIndex(nil); err != nil { + if err := syncWorker.ResyncIndex(nil); err != nil { glog.Error("resyncIndex ", err) return } @@ -210,11 +215,11 @@ func main() { } } - if *blockHeight >= 0 { + if *blockFrom >= 0 { if *blockUntil < 0 { - *blockUntil = *blockHeight + *blockUntil = *blockFrom } - height := uint32(*blockHeight) + height := uint32(*blockFrom) until := uint32(*blockUntil) address := *queryAddress @@ -229,12 +234,7 @@ func main() { return } } else if !*synchronize { - if err = connectBlocksParallelInChunks( - height, - until, - *syncChunk, - *syncWorkers, - ); err != nil { + if err = syncWorker.ConnectBlocksParallelInChunks(height, until); err != nil { glog.Error("connectBlocksParallelInChunks ", err) return } @@ -281,7 +281,7 @@ func syncIndexLoop() { glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second tickAndDebounce(resyncIndexPeriodMs*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() { - if err := resyncIndex(onNewBlockHash); err != nil { + if err := syncWorker.ResyncIndex(onNewBlockHash); err != nil { glog.Error("syncIndexLoop ", errors.ErrorStack(err)) } }) @@ -355,341 +355,3 @@ func printResult(txid string, vout uint32, isOutput bool) error { glog.Info(txid, vout, isOutput) return nil } - -func resyncIndex(onNewBlock func(hash string)) error { - remote, err := chain.GetBestBlockHash() - if err != nil { - return err - } - localBestHeight, local, err := index.GetBestBlock() - if err != nil { - local = "" - } - - // If the locally indexed block is the same as the best block on the - // network, we're done. - if local == remote { - glog.Infof("resync: synced on %d %s", localBestHeight, local) - return nil - } - - var header *bchain.BlockHeader - if local != "" { - // Is local tip on the best chain? - header, err = chain.GetBlockHeader(local) - forked := false - if err != nil { - if e, ok := err.(*bchain.RPCError); ok && e.Message == "Block not found" { - forked = true - } else { - return err - } - } else { - if header.Confirmations < 0 { - forked = true - } - } - - if forked { - // find and disconnect forked blocks and then synchronize again - glog.Info("resync: local is forked") - var height uint32 - for height = localBestHeight - 1; height >= 0; height-- { - local, err = index.GetBlockHash(height) - if err != nil { - return err - } - remote, err = chain.GetBlockHash(height) - if err != nil { - return err - } - if local == remote { - break - } - } - err = index.DisconnectBlocks(height+1, localBestHeight) - if err != nil { - return err - } - return resyncIndex(onNewBlock) - } - } - - startHeight := uint32(0) - var hash string - if header != nil { - glog.Info("resync: local is behind") - hash = header.Next - startHeight = localBestHeight - } else { - // If the local block is missing, we're indexing from the genesis block - // or from the start block specified by flags - if *blockHeight > 0 { - startHeight = uint32(*blockHeight) - } - glog.Info("resync: genesis from block ", startHeight) - hash, err = chain.GetBlockHash(startHeight) - if err != nil { - return err - } - } - - // if parallel operation is enabled and the number of blocks to be connected is large, - // use parallel routine to load majority of blocks - if *syncWorkers > 1 { - chainBestHeight, err := chain.GetBestBlockHeight() - if err != nil { - return err - } - if chainBestHeight-startHeight > uint32(*syncChunk) { - glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", startHeight, chainBestHeight, *syncWorkers) - err = connectBlocksParallel( - startHeight, - chainBestHeight, - *syncWorkers, - ) - if err != nil { - return err - } - // after parallel load finish the sync using standard way, - // new blocks may have been created in the meantime - return resyncIndex(onNewBlock) - } - } - - return connectBlocks(hash, onNewBlock) -} - -func connectBlocks(hash string, onNewBlock func(hash string)) error { - bch := make(chan blockResult, 8) - done := make(chan struct{}) - defer close(done) - - go getBlockChain(hash, bch, done) - - var lastRes blockResult - for res := range bch { - lastRes = res - if res.err != nil { - return res.err - } - err := index.ConnectBlock(res.block) - if err != nil { - return err - } - if onNewBlock != nil { - onNewBlock(res.block.Hash) - } - } - - if lastRes.block != nil { - glog.Infof("resync: synced on %d %s", lastRes.block.Height, lastRes.block.Hash) - } - - return nil -} - -func connectBlocksParallel( - lower uint32, - higher uint32, - numWorkers int, -) error { - type hashHeight struct { - hash string - height uint32 - } - var err error - var wg sync.WaitGroup - hch := make(chan hashHeight, numWorkers) - running := make([]bool, numWorkers) - work := func(i int) { - defer wg.Done() - var err error - var block *bchain.Block - for hh := range hch { - running[i] = true - for { - block, err = chain.GetBlockWithoutHeader(hh.hash, hh.height) - if err != nil { - glog.Error("Connect block error ", err, ". Retrying...") - time.Sleep(time.Millisecond * 500) - } else { - break - } - } - if *dryRun { - running[i] = false - continue - } - err = index.ConnectBlock(block) - if err != nil { - glog.Error("Connect block ", hh.height, " ", hh.hash, " error ", err) - } - running[i] = false - } - } - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go work(i) - } - - var hash string -ConnectLoop: - for h := lower; h <= higher; { - select { - case <-chanOsSignal: - // wait for the workers to finish block - i := 0 - WaitAgain: - for ; i < 60; i++ { - for _, r := range running { - if r { - glog.Info("Waiting ", i, "s for workers to finish ", running) - time.Sleep(time.Millisecond * 1000) - continue WaitAgain - } - } - break - } - err = errors.Errorf("connectBlocksParallel interrupted at height %d", h) - break ConnectLoop - default: - hash, err = chain.GetBlockHash(h) - if err != nil { - glog.Error("GetBlockHash error ", err) - time.Sleep(time.Millisecond * 500) - continue - } - hch <- hashHeight{hash, h} - if h > 0 && h%1000 == 0 { - glog.Info("connecting block ", h, " ", hash) - } - h++ - } - } - close(hch) - wg.Wait() - return err -} - -func connectBlocksParallelInChunks( - lower uint32, - higher uint32, - chunkSize int, - numWorkers int, -) error { - var wg sync.WaitGroup - - work := func(i int) { - defer wg.Done() - - offset := uint32(chunkSize * i) - stride := uint32(chunkSize * numWorkers) - - for low := lower + offset; low <= higher; low += stride { - high := low + uint32(chunkSize-1) - if high > higher { - high = higher - } - err := connectBlockChunk(low, high) - if err != nil { - if e, ok := err.(*bchain.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") { - break - } - glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err) - } - } - } - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go work(i) - } - wg.Wait() - - return nil -} - -func connectBlockChunk( - lower uint32, - higher uint32, -) error { - connected, err := isBlockConnected(higher) - if err != nil || connected { - // if higher is over the best block, continue with lower block, otherwise return error - if e, ok := err.(*bchain.RPCError); !ok || e.Message != "Block height out of range" { - return err - } - } - - height := lower - hash, err := chain.GetBlockHash(lower) - if err != nil { - return err - } - - for height <= higher { - block, err := chain.GetBlock(hash) - if err != nil { - return err - } - hash = block.Next - height = block.Height + 1 - if *dryRun { - continue - } - err = index.ConnectBlock(block) - if err != nil { - return err - } - if block.Height%1000 == 0 { - glog.Info("connected block ", block.Height, " ", block.Hash) - } - } - - return nil -} - -func isBlockConnected( - height uint32, -) (bool, error) { - local, err := index.GetBlockHash(height) - if err != nil { - return false, err - } - remote, err := chain.GetBlockHash(height) - if err != nil { - return false, err - } - if local != remote { - return false, nil - } - return true, nil -} - -type blockResult struct { - block *bchain.Block - err error -} - -func getBlockChain( - hash string, - out chan blockResult, - done chan struct{}, -) { - defer close(out) - - for hash != "" { - select { - case <-done: - return - default: - } - block, err := chain.GetBlock(hash) - if err != nil { - out <- blockResult{err: err} - return - } - hash = block.Next - out <- blockResult{block: block} - } -} diff --git a/db/sync.go b/db/sync.go new file mode 100644 index 00000000..b96cb572 --- /dev/null +++ b/db/sync.go @@ -0,0 +1,349 @@ +package db + +import ( + "blockbook/bchain" + "os" + "sync" + "time" + + "github.com/golang/glog" + "github.com/juju/errors" +) + +// SyncWorker is handle to SyncWorker +type SyncWorker struct { + db *RocksDB + chain *bchain.BitcoinRPC + syncWorkers, syncChunk int + dryRun bool + startHeight uint32 + chanOsSignal chan os.Signal +} + +// NewSyncWorker creates new SyncWorker and returns its handle +func NewSyncWorker(db *RocksDB, chain *bchain.BitcoinRPC, syncWorkers, syncChunk int, minStartHeight uint32, dryRun bool, chanOsSignal chan os.Signal) (*SyncWorker, error) { + return &SyncWorker{ + db: db, + chain: chain, + syncWorkers: syncWorkers, + syncChunk: syncChunk, + dryRun: dryRun, + startHeight: minStartHeight, + chanOsSignal: chanOsSignal, + }, nil +} + +// ResyncIndex synchronizes index to the top of the blockchain +// onNewBlock is called when new block is connected, but not in initial parallel sync +func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { + remote, err := w.chain.GetBestBlockHash() + if err != nil { + return err + } + localBestHeight, local, err := w.db.GetBestBlock() + if err != nil { + local = "" + } + + // If the locally indexed block is the same as the best block on the + // network, we're done. + if local == remote { + glog.Infof("resync: synced on %d %s", localBestHeight, local) + return nil + } + + var header *bchain.BlockHeader + if local != "" { + // Is local tip on the best chain? + header, err = w.chain.GetBlockHeader(local) + forked := false + if err != nil { + if e, ok := err.(*bchain.RPCError); ok && e.Message == "Block not found" { + forked = true + } else { + return err + } + } else { + if header.Confirmations < 0 { + forked = true + } + } + + if forked { + // find and disconnect forked blocks and then synchronize again + glog.Info("resync: local is forked") + var height uint32 + for height = localBestHeight - 1; height >= 0; height-- { + local, err = w.db.GetBlockHash(height) + if err != nil { + return err + } + remote, err = w.chain.GetBlockHash(height) + if err != nil { + return err + } + if local == remote { + break + } + } + err = w.db.DisconnectBlocks(height+1, localBestHeight) + if err != nil { + return err + } + return w.ResyncIndex(onNewBlock) + } + } + + var hash string + if header != nil { + glog.Info("resync: local is behind") + hash = header.Next + w.startHeight = localBestHeight + } else { + // If the local block is missing, we're indexing from the genesis block + // or from the start block specified by flags + glog.Info("resync: genesis from block ", w.startHeight) + hash, err = w.chain.GetBlockHash(w.startHeight) + if err != nil { + return err + } + } + + // if parallel operation is enabled and the number of blocks to be connected is large, + // use parallel routine to load majority of blocks + if w.syncWorkers > 1 { + chainBestHeight, err := w.chain.GetBestBlockHeight() + if err != nil { + return err + } + if chainBestHeight-w.startHeight > uint32(w.syncChunk) { + glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, chainBestHeight, w.syncWorkers) + err = w.connectBlocksParallel(w.startHeight, chainBestHeight) + if err != nil { + return err + } + // after parallel load finish the sync using standard way, + // new blocks may have been created in the meantime + return w.ResyncIndex(onNewBlock) + } + } + + return w.connectBlocks(hash, onNewBlock) +} + +func (w *SyncWorker) connectBlocks(hash string, onNewBlock func(hash string)) error { + bch := make(chan blockResult, 8) + done := make(chan struct{}) + defer close(done) + + go w.getBlockChain(hash, bch, done) + + var lastRes blockResult + for res := range bch { + lastRes = res + if res.err != nil { + return res.err + } + err := w.db.ConnectBlock(res.block) + if err != nil { + return err + } + if onNewBlock != nil { + onNewBlock(res.block.Hash) + } + } + + if lastRes.block != nil { + glog.Infof("resync: synced on %d %s", lastRes.block.Height, lastRes.block.Hash) + } + + return nil +} + +func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error { + type hashHeight struct { + hash string + height uint32 + } + var err error + var wg sync.WaitGroup + hch := make(chan hashHeight, w.syncWorkers) + running := make([]bool, w.syncWorkers) + work := func(i int) { + defer wg.Done() + var err error + var block *bchain.Block + for hh := range hch { + running[i] = true + for { + block, err = w.chain.GetBlockWithoutHeader(hh.hash, hh.height) + if err != nil { + glog.Error("Connect block error ", err, ". Retrying...") + time.Sleep(time.Millisecond * 500) + } else { + break + } + } + if w.dryRun { + running[i] = false + continue + } + err = w.db.ConnectBlock(block) + if err != nil { + glog.Error("Connect block ", hh.height, " ", hh.hash, " error ", err) + } + running[i] = false + } + } + for i := 0; i < w.syncWorkers; i++ { + wg.Add(1) + go work(i) + } + + var hash string +ConnectLoop: + for h := lower; h <= higher; { + select { + case <-w.chanOsSignal: + // wait for the workers to finish block + i := 0 + WaitAgain: + for ; i < 60; i++ { + for _, r := range running { + if r { + glog.Info("Waiting ", i, "s for workers to finish ", running) + time.Sleep(time.Millisecond * 1000) + continue WaitAgain + } + } + break + } + err = errors.Errorf("connectBlocksParallel interrupted at height %d", h) + break ConnectLoop + default: + hash, err = w.chain.GetBlockHash(h) + if err != nil { + glog.Error("GetBlockHash error ", err) + time.Sleep(time.Millisecond * 500) + continue + } + hch <- hashHeight{hash, h} + if h > 0 && h%1000 == 0 { + glog.Info("connecting block ", h, " ", hash) + } + h++ + } + } + close(hch) + wg.Wait() + return err +} + +func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error { + connected, err := w.isBlockConnected(higher) + if err != nil || connected { + // if higher is over the best block, continue with lower block, otherwise return error + if e, ok := err.(*bchain.RPCError); !ok || e.Message != "Block height out of range" { + return err + } + } + + height := lower + hash, err := w.chain.GetBlockHash(lower) + if err != nil { + return err + } + + for height <= higher { + block, err := w.chain.GetBlock(hash) + if err != nil { + return err + } + hash = block.Next + height = block.Height + 1 + if w.dryRun { + continue + } + err = w.db.ConnectBlock(block) + if err != nil { + return err + } + if block.Height%1000 == 0 { + glog.Info("connected block ", block.Height, " ", block.Hash) + } + } + + return nil +} + +// ConnectBlocksParallelInChunks connect blocks in chunks +func (w *SyncWorker) ConnectBlocksParallelInChunks(lower, higher uint32) error { + var wg sync.WaitGroup + + work := func(i int) { + defer wg.Done() + + offset := uint32(w.syncChunk * i) + stride := uint32(w.syncChunk * w.syncWorkers) + + for low := lower + offset; low <= higher; low += stride { + high := low + uint32(w.syncChunk-1) + if high > higher { + high = higher + } + err := w.connectBlockChunk(low, high) + if err != nil { + if e, ok := err.(*bchain.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") { + break + } + glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err) + } + } + } + for i := 0; i < w.syncWorkers; i++ { + wg.Add(1) + go work(i) + } + wg.Wait() + + return nil +} + +func (w *SyncWorker) isBlockConnected(height uint32) (bool, error) { + local, err := w.db.GetBlockHash(height) + if err != nil { + return false, err + } + remote, err := w.db.GetBlockHash(height) + if err != nil { + return false, err + } + if local != remote { + return false, nil + } + return true, nil +} + +type blockResult struct { + block *bchain.Block + err error +} + +func (w *SyncWorker) getBlockChain(hash string, out chan blockResult, done chan struct{}) { + defer close(out) + + for hash != "" { + select { + case <-done: + return + default: + } + block, err := w.chain.GetBlock(hash) + if err != nil { + out <- blockResult{err: err} + return + } + hash = block.Next + out <- blockResult{block: block} + } +}