From 099321126fd26e571fede09bb48369c2e64787ca Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 26 Sep 2018 11:50:43 +0200 Subject: [PATCH] Update balances in extra goroutine during import block --- db/rocksdb.go | 167 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 115 insertions(+), 52 deletions(-) diff --git a/db/rocksdb.go b/db/rocksdb.go index dd34c6fa..db46b83c 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -42,17 +42,20 @@ type connectBlockStats struct { // RocksDB handle type RocksDB struct { - path string - db *gorocksdb.DB - wo *gorocksdb.WriteOptions - ro *gorocksdb.ReadOptions - cfh []*gorocksdb.ColumnFamilyHandle - chainParser bchain.BlockChainParser - is *common.InternalState - metrics *common.Metrics - cache *gorocksdb.Cache - maxOpenFiles int - cbs connectBlockStats + path string + db *gorocksdb.DB + wo *gorocksdb.WriteOptions + ro *gorocksdb.ReadOptions + cfh []*gorocksdb.ColumnFamilyHandle + chainParser bchain.BlockChainParser + is *common.InternalState + metrics *common.Metrics + cache *gorocksdb.Cache + maxOpenFiles int + cbs connectBlockStats + chanUpdateBalance chan updateBalanceData + chanUpdateBalanceResult chan error + updateBalancesMap map[string]*AddrBalance } const ( @@ -90,7 +93,20 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha db, cfh, err := openDB(path, c, maxOpenFiles) wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() - return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}}, nil + rdb := &RocksDB{ + path: path, + db: db, + wo: wo, + ro: ro, + cfh: cfh, + chainParser: parser, + metrics: metrics, + cache: c, + maxOpenFiles: maxOpenFiles, + cbs: connectBlockStats{}, + } + rdb.initUpdateBalancesWorker() + return rdb, nil } func (d *RocksDB) closeDB() error { @@ -274,6 +290,8 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { txAddressesMap := make(map[string]*TxAddresses) balances := make(map[string]*AddrBalance) if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil { + // reinitialize balanceWorker so that there are no left balances in the queue + d.initUpdateBalancesWorker() return err } if err := d.storeAddresses(wb, block.Height, addresses); err != nil { @@ -362,7 +380,80 @@ func (d *RocksDB) GetAndResetConnectBlockStats() string { return s } +type updateBalanceData struct { + valueSat big.Int + strAddrDesc string + addrDesc bchain.AddressDescriptor + processed, output bool +} + +func (d *RocksDB) initUpdateBalancesWorker() { + if d.chanUpdateBalance != nil { + close(d.chanUpdateBalance) + } + d.chanUpdateBalance = make(chan updateBalanceData, 16) + d.chanUpdateBalanceResult = make(chan error, 16) + go d.updateBalancesWorker() +} + +// updateBalancesWorker is a single worker used to update balances in parallel to processAddressesUTXO +func (d *RocksDB) updateBalancesWorker() { + var err error + for bd := range d.chanUpdateBalance { + ab, e := d.updateBalancesMap[bd.strAddrDesc] + if !e { + ab, err = d.GetAddrDescBalance(bd.addrDesc) + if err != nil { + d.chanUpdateBalanceResult <- err + continue + } + if ab == nil { + ab = &AddrBalance{} + } + d.updateBalancesMap[bd.strAddrDesc] = ab + d.cbs.balancesMiss++ + } else { + d.cbs.balancesHit++ + } + // add number of trx in balance only once, address can be multiple times in tx + if !bd.processed { + ab.Txs++ + } + if bd.output { + ab.BalanceSat.Add(&ab.BalanceSat, &bd.valueSat) + } else { + ab.BalanceSat.Sub(&ab.BalanceSat, &bd.valueSat) + if ab.BalanceSat.Sign() < 0 { + d.resetValueSatToZero(&ab.BalanceSat, bd.addrDesc, "balance") + } + ab.SentSat.Add(&ab.SentSat, &bd.valueSat) + } + d.chanUpdateBalanceResult <- nil + } +} + +func (d *RocksDB) dispatchUpdateBalance(dispatchedBalances int, valueSat *big.Int, strAddrDesc string, addrDesc bchain.AddressDescriptor, processed, output bool) (int, error) { +loop: + for { + select { + // process as many results as possible + case err := <-d.chanUpdateBalanceResult: + if err != nil { + return 0, err + } + dispatchedBalances-- + // send input to be processed + case d.chanUpdateBalance <- updateBalanceData{*valueSat, strAddrDesc, addrDesc, processed, output}: + dispatchedBalances++ + break loop + } + } + return dispatchedBalances, nil +} + func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error { + d.updateBalancesMap = balances + dispatchedBalances := 0 blockTxIDs := make([][]byte, len(block.Txs)) blockTxAddresses := make([]*TxAddresses, len(block.Txs)) // first process all outputs so that inputs can point to txs in this block @@ -404,25 +495,10 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string btxID: btxID, index: int32(i), }) - ab, e := balances[strAddrDesc] - if !e { - ab, err = d.GetAddrDescBalance(addrDesc) - if err != nil { - return err - } - if ab == nil { - ab = &AddrBalance{} - } - balances[strAddrDesc] = ab - d.cbs.balancesMiss++ - } else { - d.cbs.balancesHit++ + dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &output.ValueSat, strAddrDesc, addrDesc, processed, true) + if err != nil { + return err } - // add number of trx in balance only once, address can be multiple times in tx - if !processed { - ab.Txs++ - } - ab.BalanceSat.Add(&ab.BalanceSat, &output.ValueSat) } } // process inputs @@ -488,29 +564,16 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string btxID: spendingTxid, index: ^int32(i), }) - ab, e := balances[strAddrDesc] - if !e { - ab, err = d.GetAddrDescBalance(ot.AddrDesc) - if err != nil { - return err - } - if ab == nil { - ab = &AddrBalance{} - } - balances[strAddrDesc] = ab - d.cbs.balancesMiss++ - } else { - d.cbs.balancesHit++ + dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &ot.ValueSat, strAddrDesc, ot.AddrDesc, processed, false) + if err != nil { + return err } - // add number of trx in balance only once, address can be multiple times in tx - if !processed { - ab.Txs++ - } - ab.BalanceSat.Sub(&ab.BalanceSat, &ot.ValueSat) - if ab.BalanceSat.Sign() < 0 { - d.resetValueSatToZero(&ab.BalanceSat, ot.AddrDesc, "balance") - } - ab.SentSat.Add(&ab.SentSat, &ot.ValueSat) + } + } + for i := 0; i < dispatchedBalances; i++ { + err := <-d.chanUpdateBalanceResult + if err != nil { + return err } } return nil