diff --git a/db/rocksdb.go b/db/rocksdb.go index db46b83c..dd34c6fa 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -42,20 +42,17 @@ type connectBlockStats struct { // RocksDB handle type RocksDB struct { - path string - db *gorocksdb.DB - wo *gorocksdb.WriteOptions - ro *gorocksdb.ReadOptions - cfh []*gorocksdb.ColumnFamilyHandle - chainParser bchain.BlockChainParser - is *common.InternalState - metrics *common.Metrics - cache *gorocksdb.Cache - maxOpenFiles int - cbs connectBlockStats - chanUpdateBalance chan updateBalanceData - chanUpdateBalanceResult chan error - updateBalancesMap map[string]*AddrBalance + path string + db *gorocksdb.DB + wo *gorocksdb.WriteOptions + ro *gorocksdb.ReadOptions + cfh []*gorocksdb.ColumnFamilyHandle + chainParser bchain.BlockChainParser + is *common.InternalState + metrics *common.Metrics + cache *gorocksdb.Cache + maxOpenFiles int + cbs connectBlockStats } const ( @@ -93,20 +90,7 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha db, cfh, err := openDB(path, c, maxOpenFiles) wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() - rdb := &RocksDB{ - path: path, - db: db, - wo: wo, - ro: ro, - cfh: cfh, - chainParser: parser, - metrics: metrics, - cache: c, - maxOpenFiles: maxOpenFiles, - cbs: connectBlockStats{}, - } - rdb.initUpdateBalancesWorker() - return rdb, nil + return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}}, nil } func (d *RocksDB) closeDB() error { @@ -290,8 +274,6 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { txAddressesMap := make(map[string]*TxAddresses) balances := make(map[string]*AddrBalance) if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil { - // reinitialize balanceWorker so that there are no left balances in the queue - d.initUpdateBalancesWorker() return err } if err := d.storeAddresses(wb, block.Height, addresses); err != nil { @@ -380,80 +362,7 @@ func (d *RocksDB) GetAndResetConnectBlockStats() string { return s } -type updateBalanceData struct { - valueSat big.Int - strAddrDesc string - addrDesc bchain.AddressDescriptor - processed, output bool -} - -func (d *RocksDB) initUpdateBalancesWorker() { - if d.chanUpdateBalance != nil { - close(d.chanUpdateBalance) - } - d.chanUpdateBalance = make(chan updateBalanceData, 16) - d.chanUpdateBalanceResult = make(chan error, 16) - go d.updateBalancesWorker() -} - -// updateBalancesWorker is a single worker used to update balances in parallel to processAddressesUTXO -func (d *RocksDB) updateBalancesWorker() { - var err error - for bd := range d.chanUpdateBalance { - ab, e := d.updateBalancesMap[bd.strAddrDesc] - if !e { - ab, err = d.GetAddrDescBalance(bd.addrDesc) - if err != nil { - d.chanUpdateBalanceResult <- err - continue - } - if ab == nil { - ab = &AddrBalance{} - } - d.updateBalancesMap[bd.strAddrDesc] = ab - d.cbs.balancesMiss++ - } else { - d.cbs.balancesHit++ - } - // add number of trx in balance only once, address can be multiple times in tx - if !bd.processed { - ab.Txs++ - } - if bd.output { - ab.BalanceSat.Add(&ab.BalanceSat, &bd.valueSat) - } else { - ab.BalanceSat.Sub(&ab.BalanceSat, &bd.valueSat) - if ab.BalanceSat.Sign() < 0 { - d.resetValueSatToZero(&ab.BalanceSat, bd.addrDesc, "balance") - } - ab.SentSat.Add(&ab.SentSat, &bd.valueSat) - } - d.chanUpdateBalanceResult <- nil - } -} - -func (d *RocksDB) dispatchUpdateBalance(dispatchedBalances int, valueSat *big.Int, strAddrDesc string, addrDesc bchain.AddressDescriptor, processed, output bool) (int, error) { -loop: - for { - select { - // process as many results as possible - case err := <-d.chanUpdateBalanceResult: - if err != nil { - return 0, err - } - dispatchedBalances-- - // send input to be processed - case d.chanUpdateBalance <- updateBalanceData{*valueSat, strAddrDesc, addrDesc, processed, output}: - dispatchedBalances++ - break loop - } - } - return dispatchedBalances, nil -} - func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error { - d.updateBalancesMap = balances - dispatchedBalances := 0 blockTxIDs := make([][]byte, len(block.Txs)) blockTxAddresses := make([]*TxAddresses, len(block.Txs)) // first process all outputs so that inputs can point to txs in this block @@ -495,10 +404,25 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string btxID: btxID, index: int32(i), }) - dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &output.ValueSat, strAddrDesc, addrDesc, processed, true) - if err != nil { - return err + ab, e := balances[strAddrDesc] + if !e { + ab, err = d.GetAddrDescBalance(addrDesc) + if err != nil { + return err + } + if ab == nil { + ab = &AddrBalance{} + } + balances[strAddrDesc] = ab + d.cbs.balancesMiss++ + } else { + d.cbs.balancesHit++ } + // add number of trx in balance only once, address can be multiple times in tx + if !processed { + ab.Txs++ + } + ab.BalanceSat.Add(&ab.BalanceSat, &output.ValueSat) } } // process inputs @@ -564,16 +488,29 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string btxID: spendingTxid, index: ^int32(i), }) - dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &ot.ValueSat, strAddrDesc, ot.AddrDesc, processed, false) - if err != nil { - return err + ab, e := balances[strAddrDesc] + if !e { + ab, err = d.GetAddrDescBalance(ot.AddrDesc) + if err != nil { + return err + } + if ab == nil { + ab = &AddrBalance{} + } + balances[strAddrDesc] = ab + d.cbs.balancesMiss++ + } else { + d.cbs.balancesHit++ } - } - } - for i := 0; i < dispatchedBalances; i++ { - err := <-d.chanUpdateBalanceResult - if err != nil { - return err + // add number of trx in balance only once, address can be multiple times in tx + if !processed { + ab.Txs++ + } + ab.BalanceSat.Sub(&ab.BalanceSat, &ot.ValueSat) + if ab.BalanceSat.Sign() < 0 { + d.resetValueSatToZero(&ab.BalanceSat, ot.AddrDesc, "balance") + } + ab.SentSat.Add(&ab.SentSat, &ot.ValueSat) } } return nil