From d45d028ef229cfe11875afd18a8d823b71aedc25 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 19 Aug 2018 00:23:26 +0200 Subject: [PATCH] Implement bulk connect blocks --- blockbook.go | 6 +- common/internalstate.go | 2 + db/rocksdb.go | 225 ++++++++++++++++++++++++++++++++++++---- db/rocksdb_test.go | 39 +++++++ db/sync.go | 15 ++- 5 files changed, 266 insertions(+), 21 deletions(-) diff --git a/blockbook.go b/blockbook.go index cea2c5b6..7e3da52b 100644 --- a/blockbook.go +++ b/blockbook.go @@ -175,7 +175,11 @@ func main() { } index.SetInternalState(internalState) if internalState.DbState != common.DbStateClosed { - glog.Warning("internalState: database in not closed state ", internalState.DbState, ", possibly previous ungraceful shutdown") + if internalState.DbState == common.DbStateInconsistent { + glog.Error("internalState: database is in inconsistent state and cannot be used") + return + } + glog.Warning("internalState: database was left in open state, possibly previous ungraceful shutdown") } if *computeColumnStats { diff --git a/common/internalstate.go b/common/internalstate.go index 8e7b2b61..1053649c 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -11,6 +11,8 @@ const ( DbStateClosed = uint32(iota) // DbStateOpen means db is open or application died without closing the db DbStateOpen + // DbStateInconsistent means db is in inconsistent state and cannot be used + DbStateInconsistent ) // InternalStateColumn contains the data of a db column diff --git a/db/rocksdb.go b/db/rocksdb.go index 74ebb3e1..267a0ff6 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -258,7 +258,18 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { // unspentTxs; therefore it is not possible to DisconnectBlocks this way return errors.New("DisconnectBlock is not supported for UTXO chains") } - if err := d.writeAddressesUTXO(wb, block); err != nil { + txAddressesMap := make(map[string]*txAddresses) + balances := make(map[string]*addrBalance) + if err := d.writeAddressesUTXO(wb, block, txAddressesMap, balances); err != nil { + return err + } + if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { + return err + } + if err := d.storeBalances(wb, balances); err != nil { + return err + } + if err := d.storeAndCleanupBlockTxs(wb, block); err != nil { return err } } else { @@ -270,6 +281,177 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { return d.db.Write(d.wo, wb) } +// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way +type BulkConnect struct { + d *RocksDB + isUTXO bool + txAddressesMap map[string]*txAddresses + balances map[string]*addrBalance + height uint32 +} + +const ( + maxBulkTxAddresses = 2000000 + partialStoreAddresses = maxBulkTxAddresses / 10 + maxBulkBalances = 2500000 + partialStoreBalances = maxBulkBalances / 10 +) + +func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) { + bc := &BulkConnect{ + d: d, + isUTXO: d.chainParser.IsUTXOChain(), + txAddressesMap: make(map[string]*txAddresses), + balances: make(map[string]*addrBalance), + } + if err := d.SetInconsistentState(true); err != nil { + return nil, err + } + glog.Info("rocksdb: bulk connect init, db set to inconsistent state") + return bc, nil +} + +func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { + defer close(c) + start := time.Now() + var txm map[string]*txAddresses + if all { + txm = b.txAddressesMap + b.txAddressesMap = make(map[string]*txAddresses) + } else { + txm = make(map[string]*txAddresses) + for k, a := range b.txAddressesMap { + // store all completely spent transactions, they will not be modified again + r := true + for _, o := range a.outputs { + if o.spent == false { + r = false + break + } + } + if r { + txm[k] = a + delete(b.txAddressesMap, k) + } + } + // store some other random transactions if necessary + if len(txm) < partialStoreAddresses { + for k, a := range b.txAddressesMap { + txm[k] = a + delete(b.txAddressesMap, k) + if len(txm) >= partialStoreAddresses { + break + } + } + } + } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + if err := b.d.storeTxAddresses(wb, txm); err != nil { + c <- err + } else { + if err := b.d.db.Write(b.d.wo, wb); err != nil { + c <- err + } + } + glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) +} + +func (b *BulkConnect) storeBalances(c chan error, all bool) { + defer close(c) + start := time.Now() + var bal map[string]*addrBalance + if all { + bal = b.balances + b.balances = make(map[string]*addrBalance) + } else { + bal = make(map[string]*addrBalance) + // store some random balances + for k, a := range b.balances { + bal[k] = a + delete(b.balances, k) + if len(bal) >= partialStoreBalances { + break + } + } + } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + if err := b.d.storeBalances(wb, bal); err != nil { + c <- err + } else { + if err := b.d.db.Write(b.d.wo, wb); err != nil { + c <- err + } + } + glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) +} + +func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error { + b.height = block.Height + if !b.isUTXO { + return b.d.ConnectBlock(block) + } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + if err := b.d.writeAddressesUTXO(wb, block, b.txAddressesMap, b.balances); err != nil { + return err + } + var storeAddressesChan, storeBalancesChan chan error + if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances { + if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses { + storeAddressesChan = make(chan error) + go b.storeTxAddresses(storeAddressesChan, false) + } + if len(b.balances)+partialStoreBalances > maxBulkBalances { + storeBalancesChan = make(chan error) + go b.storeBalances(storeBalancesChan, false) + } + } + if storeBlockTxs { + if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { + return err + } + } + if err := b.d.writeHeight(wb, block, opInsert); err != nil { + return err + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { + return err + } + if storeAddressesChan != nil { + if err := <-storeAddressesChan; err != nil { + return err + } + } + if storeBalancesChan != nil { + if err := <-storeBalancesChan; err != nil { + return err + } + } + return nil +} + +func (b *BulkConnect) Close() error { + glog.Info("rocksdb: bulk connect closing") + storeAddressesChan := make(chan error) + go b.storeTxAddresses(storeAddressesChan, true) + storeBalancesChan := make(chan error) + go b.storeBalances(storeBalancesChan, true) + if err := <-storeAddressesChan; err != nil { + return err + } + if err := <-storeBalancesChan; err != nil { + return err + } + if err := b.d.SetInconsistentState(false); err != nil { + return err + } + glog.Info("rocksdb: bulk connect closed, db set to open state") + b.d = nil + return nil +} + // Addresses index type outpoint struct { @@ -315,11 +497,9 @@ func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText valueSat.SetInt64(0) } -func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error { +func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error { addresses := make(map[string][]outpoint) blockTxIDs := make([][]byte, len(block.Txs)) - txAddressesMap := make(map[string]*txAddresses) - balances := make(map[string]*addrBalance) // first process all outputs so that inputs can point to txs in this block for txi := range block.Txs { tx := &block.Txs[txi] @@ -454,16 +634,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo ab.sentSat.Add(&ab.sentSat, &ot.valueSat) } } - if err := d.storeAddresses(wb, block, addresses); err != nil { - return err - } - if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { - return err - } - if err := d.storeBalances(wb, balances); err != nil { - return err - } - return d.storeAndCleanupBlockTxs(wb, block) + return d.storeAddresses(wb, block, addresses) } func processedInTx(o []outpoint, btxID []byte) bool { @@ -1228,6 +1399,18 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro return is, nil } +func (d *RocksDB) SetInconsistentState(inconsistent bool) error { + if d.is == nil { + return errors.New("Internal state not created") + } + if inconsistent { + d.is.DbState = common.DbStateInconsistent + } else { + d.is.DbState = common.DbStateOpen + } + return d.storeState(d.is) +} + // SetInternalState sets the InternalState to be used by db to collect internal state func (d *RocksDB) SetInternalState(is *common.InternalState) { d.is = is @@ -1235,11 +1418,17 @@ func (d *RocksDB) SetInternalState(is *common.InternalState) { // StoreInternalState stores the internal state to db func (d *RocksDB) StoreInternalState(is *common.InternalState) error { - for c := 0; c < len(cfNames); c++ { - rows, keyBytes, valueBytes := d.is.GetDBColumnStatValues(c) - d.metrics.DbColumnRows.With(common.Labels{"column": cfNames[c]}).Set(float64(rows)) - d.metrics.DbColumnSize.With(common.Labels{"column": cfNames[c]}).Set(float64(keyBytes + valueBytes)) + if d.metrics != nil { + for c := 0; c < len(cfNames); c++ { + rows, keyBytes, valueBytes := d.is.GetDBColumnStatValues(c) + d.metrics.DbColumnRows.With(common.Labels{"column": cfNames[c]}).Set(float64(rows)) + d.metrics.DbColumnSize.With(common.Labels{"column": cfNames[c]}).Set(float64(keyBytes + valueBytes)) + } } + return d.storeState(is) +} + +func (d *RocksDB) storeState(is *common.InternalState) error { buf, err := is.Pack() if err != nil { return err diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 3ffe7e11..adf9e6c9 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -5,6 +5,7 @@ package db import ( "blockbook/bchain" "blockbook/bchain/coins/btc" + "blockbook/common" "encoding/hex" "fmt" "io/ioutil" @@ -720,7 +721,45 @@ func TestRocksDB_Index_UTXO(t *testing.T) { t.Fatal(err) } verifyAfterUTXOBlock2(t, d) +} +func Test_BulkConnect_UTXO(t *testing.T) { + d := setupRocksDB(t, &testBitcoinParser{ + BitcoinParser: bitcoinTestnetParser(), + }) + defer closeAndDestroyRocksDB(t, d) + + bc, err := d.InitBulkConnect() + if err != nil { + t.Fatal(err) + } + + if d.is.DbState != common.DbStateInconsistent { + t.Fatal("DB not in DbStateInconsistent") + } + + if err := bc.ConnectBlock(getTestUTXOBlock1(t, d), false); err != nil { + t.Fatal(err) + } + if err := checkColumn(d, cfBlockTxs, []keyPair{}); err != nil { + { + t.Fatal(err) + } + } + + if err := bc.ConnectBlock(getTestUTXOBlock2(t, d), true); err != nil { + t.Fatal(err) + } + + if err := bc.Close(); err != nil { + t.Fatal(err) + } + + if d.is.DbState != common.DbStateOpen { + t.Fatal("DB not in DbStateOpen") + } + + verifyAfterUTXOBlock2(t, d) } func Test_packBigint_unpackBigint(t *testing.T) { diff --git a/db/sync.go b/db/sync.go index fecfa09c..32bc143d 100644 --- a/db/sync.go +++ b/db/sync.go @@ -213,17 +213,26 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { writeBlockDone := make(chan struct{}) writeBlockWorker := func() { defer close(writeBlockDone) + bc, err := w.db.InitBulkConnect() + if err != nil { + glog.Error("sync: InitBulkConnect error ", err) + } lastBlock := lower - 1 + keep := uint32(w.chain.GetChainParser().KeepBlockAddresses()) for b := range bch { if lastBlock+1 != b.Height { glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height) } - err := w.db.ConnectBlock(b) + err := bc.ConnectBlock(b, b.Height+keep > higher) if err != nil { glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err) } lastBlock = b.Height } + err = bc.Close() + if err != nil { + glog.Error("sync: bulkconnect.Close error ", err) + } glog.Info("WriteBlock exiting...") } getBlockWorker := func(i int) { @@ -276,6 +285,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { } go writeBlockWorker() var hash string + start := time.Now() ConnectLoop: for h := lower; h <= higher; { select { @@ -292,7 +302,8 @@ ConnectLoop: } hch <- hashHeight{hash, h} if h > 0 && h%1000 == 0 { - glog.Info("connecting block ", h, " ", hash) + glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start)) + start = time.Now() } h++ }