diff --git a/db/rocksdb.go b/db/rocksdb.go index 1ecc4f61..6c4ebf9f 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -285,12 +285,18 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { return d.db.Write(d.wo, wb) } -// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way +// bulk connect +// in bulk mode the data are cached and stored to db in batches +// it speeds up the import in two ways: +// 1) balances and txAddresses are modified several times during the import, there is a chance that the modifications are done before write to DB +// 2) rocksdb seems to handle better fewer larger batches than continuous stream of smaller batches + type bulkAddresses struct { height uint32 addresses map[string][]outpoint } +// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way type BulkConnect struct { d *RocksDB isUTXO bool @@ -302,13 +308,14 @@ type BulkConnect struct { } const ( - maxBulkAddresses = 400000 + maxBulkAddresses = 300000 maxBulkTxAddresses = 2000000 partialStoreAddresses = maxBulkTxAddresses / 10 maxBulkBalances = 2500000 partialStoreBalances = maxBulkBalances / 10 ) +// InitBulkConnect initializes bulk connect and switches DB to inconsistent state func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) { bc := &BulkConnect{ d: d, @@ -323,9 +330,7 @@ func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) { return bc, nil } -func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { - defer close(c) - start := time.Now() +func (b *BulkConnect) storeTxAddresses(wb *gorocksdb.WriteBatch, all bool) (int, int, error) { var txm map[string]*TxAddresses var sp int if all { @@ -359,21 +364,31 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { } } } - wb := gorocksdb.NewWriteBatch() - defer wb.Destroy() if err := b.d.storeTxAddresses(wb, txm); err != nil { - c <- err - } else { - if err := b.d.db.Write(b.d.wo, wb); err != nil { - c <- err - } + return 0, 0, err } - glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) + return len(txm), sp, nil } -func (b *BulkConnect) storeBalances(c chan error, all bool) { +func (b *BulkConnect) parallelStoreTxAddresses(c chan error, all bool) { defer close(c) start := time.Now() + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + count, sp, err := b.storeTxAddresses(wb, all) + if err != nil { + c <- err + return + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { + c <- err + return + } + glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) + c <- nil +} + +func (b *BulkConnect) storeBalances(wb *gorocksdb.WriteBatch, all bool) (int, error) { var bal map[string]*AddrBalance if all { bal = b.balances @@ -389,16 +404,28 @@ func (b *BulkConnect) storeBalances(c chan error, all bool) { } } } + if err := b.d.storeBalances(wb, bal); err != nil { + return 0, err + } + return len(bal), nil +} + +func (b *BulkConnect) parallelStoreBalances(c chan error, all bool) { + defer close(c) + start := time.Now() wb := gorocksdb.NewWriteBatch() defer wb.Destroy() - if err := b.d.storeBalances(wb, bal); err != nil { + count, err := b.storeBalances(wb, all) + if err != nil { c <- err - } else { - if err := b.d.db.Write(b.d.wo, wb); err != nil { - c <- err - } + return } - glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) + if err := b.d.db.Write(b.d.wo, wb); err != nil { + c <- err + return + } + glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) + c <- nil } func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error { @@ -412,41 +439,46 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error { return nil } +// ConnectBlock connects block in bulk mode func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error { b.height = block.Height if !b.isUTXO { return b.d.ConnectBlock(block) } - wb := gorocksdb.NewWriteBatch() - defer wb.Destroy() addresses := make(map[string][]outpoint) if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil { return err } - start := time.Now() - var sa bool - var storeAddressesChan, storeBalancesChan chan error - if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances { - sa = true - if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses { - storeAddressesChan = make(chan error) - go b.storeTxAddresses(storeAddressesChan, false) - } - if len(b.balances)+partialStoreBalances > maxBulkBalances { - storeBalancesChan = make(chan error) - go b.storeBalances(storeBalancesChan, false) - } - } b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{ height: block.Height, addresses: addresses, }) b.bulkAddressesCount += len(addresses) - bac := b.bulkAddressesCount - if sa || b.bulkAddressesCount > maxBulkAddresses { - if err := b.storeBulkAddresses(wb); err != nil { - return err - } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + start := time.Now() + var count, count1, storeType int + var err error + const ( + storeNone = iota + storeTxAddresses + storeBalances + storeBulkAddresses + ) + // in one block store maximally one type of data + if len(b.txAddressesMap) > maxBulkTxAddresses { + storeType = storeTxAddresses + count, count1, err = b.storeTxAddresses(wb, false) + } else if len(b.balances) > maxBulkBalances { + storeType = storeBalances + count, err = b.storeBalances(wb, false) + } else if b.bulkAddressesCount > maxBulkAddresses { + storeType = storeBulkAddresses + count = b.bulkAddressesCount + err = b.storeBulkAddresses(wb) + } + if err != nil { + return err } if storeBlockTxs { if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { @@ -459,29 +491,26 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro if err := b.d.db.Write(b.d.wo, wb); err != nil { return err } - if bac > b.bulkAddressesCount { - glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) - } - if storeAddressesChan != nil { - if err := <-storeAddressesChan; err != nil { - return err - } - } - if storeBalancesChan != nil { - if err := <-storeBalancesChan; err != nil { - return err - } + switch storeType { + case storeTxAddresses: + glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", count1, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) + case storeBalances: + glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) + case storeBulkAddresses: + glog.Info("rocksdb: height ", b.height, ", stored ", count, " addresses, done in ", time.Since(start)) } return nil } +// Close flushes the cached data and switches DB from inconsistent state open +// after Close, the BulkConnect cannot be used func (b *BulkConnect) Close() error { glog.Info("rocksdb: bulk connect closing") start := time.Now() storeAddressesChan := make(chan error) - go b.storeTxAddresses(storeAddressesChan, true) + go b.parallelStoreTxAddresses(storeAddressesChan, true) storeBalancesChan := make(chan error) - go b.storeBalances(storeBalancesChan, true) + go b.parallelStoreBalances(storeBalancesChan, true) wb := gorocksdb.NewWriteBatch() defer wb.Destroy() bac := b.bulkAddressesCount @@ -634,6 +663,7 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string spendingTxid := blockTxIDs[txi] ta := blockTxAddresses[txi] ta.Inputs = make([]TxInput, len(tx.Vin)) + logged := false for i, input := range tx.Vin { tai := &ta.Inputs[i] btxID, err := d.chainParser.PackTxid(input.Txid) @@ -670,7 +700,10 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string // mark the output as spent in tx ot.Spent = true if len(ot.addrID) == 0 { - glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v skipping empty address", block.Height, tx.Txid, input.Txid, input.Vout) + if !logged { + glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v skipping empty address", block.Height, tx.Txid, input.Txid, input.Vout) + logged = true + } continue } strAddrID := string(ot.addrID)