Optimize bulk connect of blocks
This commit is contained in:
parent
d45d028ef2
commit
5621ed49f3
@ -258,9 +258,13 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
||||
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
||||
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
||||
}
|
||||
addresses := make(map[string][]outpoint)
|
||||
txAddressesMap := make(map[string]*txAddresses)
|
||||
balances := make(map[string]*addrBalance)
|
||||
if err := d.writeAddressesUTXO(wb, block, txAddressesMap, balances); err != nil {
|
||||
if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
|
||||
@ -282,15 +286,23 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
||||
}
|
||||
|
||||
// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way
|
||||
type bulkAddresses struct {
|
||||
height uint32
|
||||
addresses map[string][]outpoint
|
||||
}
|
||||
|
||||
type BulkConnect struct {
|
||||
d *RocksDB
|
||||
isUTXO bool
|
||||
txAddressesMap map[string]*txAddresses
|
||||
balances map[string]*addrBalance
|
||||
height uint32
|
||||
d *RocksDB
|
||||
isUTXO bool
|
||||
bulkAddresses []bulkAddresses
|
||||
bulkAddressesCount int
|
||||
txAddressesMap map[string]*txAddresses
|
||||
balances map[string]*addrBalance
|
||||
height uint32
|
||||
}
|
||||
|
||||
const (
|
||||
maxBulkAddresses = 400000
|
||||
maxBulkTxAddresses = 2000000
|
||||
partialStoreAddresses = maxBulkTxAddresses / 10
|
||||
maxBulkBalances = 2500000
|
||||
@ -315,6 +327,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
|
||||
defer close(c)
|
||||
start := time.Now()
|
||||
var txm map[string]*txAddresses
|
||||
var sp int
|
||||
if all {
|
||||
txm = b.txAddressesMap
|
||||
b.txAddressesMap = make(map[string]*txAddresses)
|
||||
@ -334,6 +347,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
|
||||
delete(b.txAddressesMap, k)
|
||||
}
|
||||
}
|
||||
sp = len(txm)
|
||||
// store some other random transactions if necessary
|
||||
if len(txm) < partialStoreAddresses {
|
||||
for k, a := range b.txAddressesMap {
|
||||
@ -354,7 +368,7 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
|
||||
c <- err
|
||||
}
|
||||
}
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
|
||||
}
|
||||
|
||||
func (b *BulkConnect) storeBalances(c chan error, all bool) {
|
||||
@ -387,6 +401,17 @@ func (b *BulkConnect) storeBalances(c chan error, all bool) {
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
|
||||
}
|
||||
|
||||
func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error {
|
||||
for _, ba := range b.bulkAddresses {
|
||||
if err := b.d.storeAddresses(wb, ba.height, ba.addresses); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.bulkAddressesCount = 0
|
||||
b.bulkAddresses = b.bulkAddresses[:0]
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error {
|
||||
b.height = block.Height
|
||||
if !b.isUTXO {
|
||||
@ -394,11 +419,15 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
|
||||
}
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
if err := b.d.writeAddressesUTXO(wb, block, b.txAddressesMap, b.balances); err != nil {
|
||||
addresses := make(map[string][]outpoint)
|
||||
if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil {
|
||||
return err
|
||||
}
|
||||
start := time.Now()
|
||||
var sa bool
|
||||
var storeAddressesChan, storeBalancesChan chan error
|
||||
if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances {
|
||||
sa = true
|
||||
if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses {
|
||||
storeAddressesChan = make(chan error)
|
||||
go b.storeTxAddresses(storeAddressesChan, false)
|
||||
@ -408,6 +437,17 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
|
||||
go b.storeBalances(storeBalancesChan, false)
|
||||
}
|
||||
}
|
||||
b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{
|
||||
height: block.Height,
|
||||
addresses: addresses,
|
||||
})
|
||||
b.bulkAddressesCount += len(addresses)
|
||||
bac := b.bulkAddressesCount
|
||||
if sa || b.bulkAddressesCount > maxBulkAddresses {
|
||||
if err := b.storeBulkAddresses(wb); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if storeBlockTxs {
|
||||
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
|
||||
return err
|
||||
@ -419,6 +459,9 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
|
||||
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||
return err
|
||||
}
|
||||
if bac > b.bulkAddressesCount {
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
|
||||
}
|
||||
if storeAddressesChan != nil {
|
||||
if err := <-storeAddressesChan; err != nil {
|
||||
return err
|
||||
@ -434,10 +477,21 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
|
||||
|
||||
func (b *BulkConnect) Close() error {
|
||||
glog.Info("rocksdb: bulk connect closing")
|
||||
start := time.Now()
|
||||
storeAddressesChan := make(chan error)
|
||||
go b.storeTxAddresses(storeAddressesChan, true)
|
||||
storeBalancesChan := make(chan error)
|
||||
go b.storeBalances(storeBalancesChan, true)
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
bac := b.bulkAddressesCount
|
||||
if err := b.storeBulkAddresses(wb); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
|
||||
if err := <-storeAddressesChan; err != nil {
|
||||
return err
|
||||
}
|
||||
@ -497,8 +551,7 @@ func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText
|
||||
valueSat.SetInt64(0)
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error {
|
||||
addresses := make(map[string][]outpoint)
|
||||
func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error {
|
||||
blockTxIDs := make([][]byte, len(block.Txs))
|
||||
// first process all outputs so that inputs can point to txs in this block
|
||||
for txi := range block.Txs {
|
||||
@ -634,7 +687,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
||||
ab.sentSat.Add(&ab.sentSat, &ot.valueSat)
|
||||
}
|
||||
}
|
||||
return d.storeAddresses(wb, block, addresses)
|
||||
return nil
|
||||
}
|
||||
|
||||
func processedInTx(o []outpoint, btxID []byte) bool {
|
||||
@ -646,10 +699,10 @@ func processedInTx(o []outpoint, btxID []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (d *RocksDB) storeAddresses(wb *gorocksdb.WriteBatch, block *bchain.Block, addresses map[string][]outpoint) error {
|
||||
func (d *RocksDB) storeAddresses(wb *gorocksdb.WriteBatch, height uint32, addresses map[string][]outpoint) error {
|
||||
for addrID, outpoints := range addresses {
|
||||
ba := []byte(addrID)
|
||||
key := packAddressKey(ba, block.Height)
|
||||
key := packAddressKey(ba, height)
|
||||
val := d.packOutpoints(outpoints)
|
||||
wb.PutCF(d.cfh[cfAddresses], key, val)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user