Update balances in extra goroutine during import block
This commit is contained in:
parent
e270cf0f95
commit
099321126f
167
db/rocksdb.go
167
db/rocksdb.go
@ -42,17 +42,20 @@ type connectBlockStats struct {
|
|||||||
|
|
||||||
// RocksDB handle
|
// RocksDB handle
|
||||||
type RocksDB struct {
|
type RocksDB struct {
|
||||||
path string
|
path string
|
||||||
db *gorocksdb.DB
|
db *gorocksdb.DB
|
||||||
wo *gorocksdb.WriteOptions
|
wo *gorocksdb.WriteOptions
|
||||||
ro *gorocksdb.ReadOptions
|
ro *gorocksdb.ReadOptions
|
||||||
cfh []*gorocksdb.ColumnFamilyHandle
|
cfh []*gorocksdb.ColumnFamilyHandle
|
||||||
chainParser bchain.BlockChainParser
|
chainParser bchain.BlockChainParser
|
||||||
is *common.InternalState
|
is *common.InternalState
|
||||||
metrics *common.Metrics
|
metrics *common.Metrics
|
||||||
cache *gorocksdb.Cache
|
cache *gorocksdb.Cache
|
||||||
maxOpenFiles int
|
maxOpenFiles int
|
||||||
cbs connectBlockStats
|
cbs connectBlockStats
|
||||||
|
chanUpdateBalance chan updateBalanceData
|
||||||
|
chanUpdateBalanceResult chan error
|
||||||
|
updateBalancesMap map[string]*AddrBalance
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -90,7 +93,20 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha
|
|||||||
db, cfh, err := openDB(path, c, maxOpenFiles)
|
db, cfh, err := openDB(path, c, maxOpenFiles)
|
||||||
wo := gorocksdb.NewDefaultWriteOptions()
|
wo := gorocksdb.NewDefaultWriteOptions()
|
||||||
ro := gorocksdb.NewDefaultReadOptions()
|
ro := gorocksdb.NewDefaultReadOptions()
|
||||||
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}}, nil
|
rdb := &RocksDB{
|
||||||
|
path: path,
|
||||||
|
db: db,
|
||||||
|
wo: wo,
|
||||||
|
ro: ro,
|
||||||
|
cfh: cfh,
|
||||||
|
chainParser: parser,
|
||||||
|
metrics: metrics,
|
||||||
|
cache: c,
|
||||||
|
maxOpenFiles: maxOpenFiles,
|
||||||
|
cbs: connectBlockStats{},
|
||||||
|
}
|
||||||
|
rdb.initUpdateBalancesWorker()
|
||||||
|
return rdb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RocksDB) closeDB() error {
|
func (d *RocksDB) closeDB() error {
|
||||||
@ -274,6 +290,8 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
|||||||
txAddressesMap := make(map[string]*TxAddresses)
|
txAddressesMap := make(map[string]*TxAddresses)
|
||||||
balances := make(map[string]*AddrBalance)
|
balances := make(map[string]*AddrBalance)
|
||||||
if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil {
|
if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil {
|
||||||
|
// reinitialize balanceWorker so that there are no left balances in the queue
|
||||||
|
d.initUpdateBalancesWorker()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
|
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
|
||||||
@ -362,7 +380,80 @@ func (d *RocksDB) GetAndResetConnectBlockStats() string {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type updateBalanceData struct {
|
||||||
|
valueSat big.Int
|
||||||
|
strAddrDesc string
|
||||||
|
addrDesc bchain.AddressDescriptor
|
||||||
|
processed, output bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *RocksDB) initUpdateBalancesWorker() {
|
||||||
|
if d.chanUpdateBalance != nil {
|
||||||
|
close(d.chanUpdateBalance)
|
||||||
|
}
|
||||||
|
d.chanUpdateBalance = make(chan updateBalanceData, 16)
|
||||||
|
d.chanUpdateBalanceResult = make(chan error, 16)
|
||||||
|
go d.updateBalancesWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateBalancesWorker is a single worker used to update balances in parallel to processAddressesUTXO
|
||||||
|
func (d *RocksDB) updateBalancesWorker() {
|
||||||
|
var err error
|
||||||
|
for bd := range d.chanUpdateBalance {
|
||||||
|
ab, e := d.updateBalancesMap[bd.strAddrDesc]
|
||||||
|
if !e {
|
||||||
|
ab, err = d.GetAddrDescBalance(bd.addrDesc)
|
||||||
|
if err != nil {
|
||||||
|
d.chanUpdateBalanceResult <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ab == nil {
|
||||||
|
ab = &AddrBalance{}
|
||||||
|
}
|
||||||
|
d.updateBalancesMap[bd.strAddrDesc] = ab
|
||||||
|
d.cbs.balancesMiss++
|
||||||
|
} else {
|
||||||
|
d.cbs.balancesHit++
|
||||||
|
}
|
||||||
|
// add number of trx in balance only once, address can be multiple times in tx
|
||||||
|
if !bd.processed {
|
||||||
|
ab.Txs++
|
||||||
|
}
|
||||||
|
if bd.output {
|
||||||
|
ab.BalanceSat.Add(&ab.BalanceSat, &bd.valueSat)
|
||||||
|
} else {
|
||||||
|
ab.BalanceSat.Sub(&ab.BalanceSat, &bd.valueSat)
|
||||||
|
if ab.BalanceSat.Sign() < 0 {
|
||||||
|
d.resetValueSatToZero(&ab.BalanceSat, bd.addrDesc, "balance")
|
||||||
|
}
|
||||||
|
ab.SentSat.Add(&ab.SentSat, &bd.valueSat)
|
||||||
|
}
|
||||||
|
d.chanUpdateBalanceResult <- nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *RocksDB) dispatchUpdateBalance(dispatchedBalances int, valueSat *big.Int, strAddrDesc string, addrDesc bchain.AddressDescriptor, processed, output bool) (int, error) {
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// process as many results as possible
|
||||||
|
case err := <-d.chanUpdateBalanceResult:
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
dispatchedBalances--
|
||||||
|
// send input to be processed
|
||||||
|
case d.chanUpdateBalance <- updateBalanceData{*valueSat, strAddrDesc, addrDesc, processed, output}:
|
||||||
|
dispatchedBalances++
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dispatchedBalances, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error {
|
func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error {
|
||||||
|
d.updateBalancesMap = balances
|
||||||
|
dispatchedBalances := 0
|
||||||
blockTxIDs := make([][]byte, len(block.Txs))
|
blockTxIDs := make([][]byte, len(block.Txs))
|
||||||
blockTxAddresses := make([]*TxAddresses, len(block.Txs))
|
blockTxAddresses := make([]*TxAddresses, len(block.Txs))
|
||||||
// first process all outputs so that inputs can point to txs in this block
|
// first process all outputs so that inputs can point to txs in this block
|
||||||
@ -404,25 +495,10 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
|
|||||||
btxID: btxID,
|
btxID: btxID,
|
||||||
index: int32(i),
|
index: int32(i),
|
||||||
})
|
})
|
||||||
ab, e := balances[strAddrDesc]
|
dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &output.ValueSat, strAddrDesc, addrDesc, processed, true)
|
||||||
if !e {
|
if err != nil {
|
||||||
ab, err = d.GetAddrDescBalance(addrDesc)
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if ab == nil {
|
|
||||||
ab = &AddrBalance{}
|
|
||||||
}
|
|
||||||
balances[strAddrDesc] = ab
|
|
||||||
d.cbs.balancesMiss++
|
|
||||||
} else {
|
|
||||||
d.cbs.balancesHit++
|
|
||||||
}
|
}
|
||||||
// add number of trx in balance only once, address can be multiple times in tx
|
|
||||||
if !processed {
|
|
||||||
ab.Txs++
|
|
||||||
}
|
|
||||||
ab.BalanceSat.Add(&ab.BalanceSat, &output.ValueSat)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// process inputs
|
// process inputs
|
||||||
@ -488,29 +564,16 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
|
|||||||
btxID: spendingTxid,
|
btxID: spendingTxid,
|
||||||
index: ^int32(i),
|
index: ^int32(i),
|
||||||
})
|
})
|
||||||
ab, e := balances[strAddrDesc]
|
dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &ot.ValueSat, strAddrDesc, ot.AddrDesc, processed, false)
|
||||||
if !e {
|
if err != nil {
|
||||||
ab, err = d.GetAddrDescBalance(ot.AddrDesc)
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if ab == nil {
|
|
||||||
ab = &AddrBalance{}
|
|
||||||
}
|
|
||||||
balances[strAddrDesc] = ab
|
|
||||||
d.cbs.balancesMiss++
|
|
||||||
} else {
|
|
||||||
d.cbs.balancesHit++
|
|
||||||
}
|
}
|
||||||
// add number of trx in balance only once, address can be multiple times in tx
|
}
|
||||||
if !processed {
|
}
|
||||||
ab.Txs++
|
for i := 0; i < dispatchedBalances; i++ {
|
||||||
}
|
err := <-d.chanUpdateBalanceResult
|
||||||
ab.BalanceSat.Sub(&ab.BalanceSat, &ot.ValueSat)
|
if err != nil {
|
||||||
if ab.BalanceSat.Sign() < 0 {
|
return err
|
||||||
d.resetValueSatToZero(&ab.BalanceSat, ot.AddrDesc, "balance")
|
|
||||||
}
|
|
||||||
ab.SentSat.Add(&ab.SentSat, &ot.ValueSat)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user