diff --git a/db/rocksdb.go b/db/rocksdb.go index 267a0ff6..1141df55 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -258,9 +258,13 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { // unspentTxs; therefore it is not possible to DisconnectBlocks this way return errors.New("DisconnectBlock is not supported for UTXO chains") } + addresses := make(map[string][]outpoint) txAddressesMap := make(map[string]*txAddresses) balances := make(map[string]*addrBalance) - if err := d.writeAddressesUTXO(wb, block, txAddressesMap, balances); err != nil { + if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil { + return err + } + if err := d.storeAddresses(wb, block.Height, addresses); err != nil { return err } if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { @@ -282,15 +286,23 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { } // BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way +type bulkAddresses struct { + height uint32 + addresses map[string][]outpoint +} + type BulkConnect struct { - d *RocksDB - isUTXO bool - txAddressesMap map[string]*txAddresses - balances map[string]*addrBalance - height uint32 + d *RocksDB + isUTXO bool + bulkAddresses []bulkAddresses + bulkAddressesCount int + txAddressesMap map[string]*txAddresses + balances map[string]*addrBalance + height uint32 } const ( + maxBulkAddresses = 400000 maxBulkTxAddresses = 2000000 partialStoreAddresses = maxBulkTxAddresses / 10 maxBulkBalances = 2500000 @@ -315,6 +327,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { defer close(c) start := time.Now() var txm map[string]*txAddresses + var sp int if all { txm = b.txAddressesMap b.txAddressesMap = make(map[string]*txAddresses) @@ -334,6 +347,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { delete(b.txAddressesMap, k) } } + sp = len(txm) // store some other random transactions if necessary if len(txm) < partialStoreAddresses { for k, a := range b.txAddressesMap { @@ -354,7 +368,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) { c <- err } } - glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) + glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) } func (b *BulkConnect) storeBalances(c chan error, all bool) { @@ -387,6 +401,17 @@ func (b *BulkConnect) storeBalances(c chan error, all bool) { glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) } +func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error { + for _, ba := range b.bulkAddresses { + if err := b.d.storeAddresses(wb, ba.height, ba.addresses); err != nil { + return err + } + } + b.bulkAddressesCount = 0 + b.bulkAddresses = b.bulkAddresses[:0] + return nil +} + func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error { b.height = block.Height if !b.isUTXO { @@ -394,11 +419,15 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro } wb := gorocksdb.NewWriteBatch() defer wb.Destroy() - if err := b.d.writeAddressesUTXO(wb, block, b.txAddressesMap, b.balances); err != nil { + addresses := make(map[string][]outpoint) + if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil { return err } + start := time.Now() + var sa bool var storeAddressesChan, storeBalancesChan chan error if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances { + sa = true if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses { storeAddressesChan = make(chan error) go b.storeTxAddresses(storeAddressesChan, false) @@ -408,6 +437,17 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro go b.storeBalances(storeBalancesChan, false) } } + b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{ + height: block.Height, + addresses: addresses, + }) + b.bulkAddressesCount += len(addresses) + bac := b.bulkAddressesCount + if sa || b.bulkAddressesCount > maxBulkAddresses { + if err := b.storeBulkAddresses(wb); err != nil { + return err + } + } if storeBlockTxs { if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { return err @@ -419,6 +459,9 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro if err := b.d.db.Write(b.d.wo, wb); err != nil { return err } + if bac > b.bulkAddressesCount { + glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) + } if storeAddressesChan != nil { if err := <-storeAddressesChan; err != nil { return err @@ -434,10 +477,21 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro func (b *BulkConnect) Close() error { glog.Info("rocksdb: bulk connect closing") + start := time.Now() storeAddressesChan := make(chan error) go b.storeTxAddresses(storeAddressesChan, true) storeBalancesChan := make(chan error) go b.storeBalances(storeBalancesChan, true) + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + bac := b.bulkAddressesCount + if err := b.storeBulkAddresses(wb); err != nil { + return err + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { + return err + } + glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) if err := <-storeAddressesChan; err != nil { return err } @@ -497,8 +551,7 @@ func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText valueSat.SetInt64(0) } -func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error { - addresses := make(map[string][]outpoint) +func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error { blockTxIDs := make([][]byte, len(block.Txs)) // first process all outputs so that inputs can point to txs in this block for txi := range block.Txs { @@ -634,7 +687,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo ab.sentSat.Add(&ab.sentSat, &ot.valueSat) } } - return d.storeAddresses(wb, block, addresses) + return nil } func processedInTx(o []outpoint, btxID []byte) bool { @@ -646,10 +699,10 @@ func processedInTx(o []outpoint, btxID []byte) bool { return false } -func (d *RocksDB) storeAddresses(wb *gorocksdb.WriteBatch, block *bchain.Block, addresses map[string][]outpoint) error { +func (d *RocksDB) storeAddresses(wb *gorocksdb.WriteBatch, height uint32, addresses map[string][]outpoint) error { for addrID, outpoints := range addresses { ba := []byte(addrID) - key := packAddressKey(ba, block.Height) + key := packAddressKey(ba, height) val := d.packOutpoints(outpoints) wb.PutCF(d.cfh[cfAddresses], key, val) }