From c2a581ea72c40451ce12867cf652c1ef571d69f9 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Fri, 11 Jan 2019 12:37:04 +0100 Subject: [PATCH] Add bulk import of EthereumType chain --- db/bulkconnect.go | 178 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 147 insertions(+), 31 deletions(-) diff --git a/db/bulkconnect.go b/db/bulkconnect.go index 8382c2c7..45e25f17 100644 --- a/db/bulkconnect.go +++ b/db/bulkconnect.go @@ -27,33 +27,33 @@ type BulkConnect struct { bulkAddressesCount int txAddressesMap map[string]*TxAddresses balances map[string]*AddrBalance + addressContracts map[string]*AddrContracts height uint32 } const ( - maxBulkAddresses = 200000 - maxBulkTxAddresses = 500000 - partialStoreAddresses = maxBulkTxAddresses / 10 - maxBulkBalances = 800000 - partialStoreBalances = maxBulkBalances / 10 + maxBulkAddresses = 200000 + maxBulkTxAddresses = 500000 + partialStoreAddresses = maxBulkTxAddresses / 10 + maxBulkBalances = 800000 + partialStoreBalances = maxBulkBalances / 10 + maxBulkAddrContracts = 1200000 + partialStoreAddrContracts = maxBulkAddrContracts / 10 ) // InitBulkConnect initializes bulk connect and switches DB to inconsistent state func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) { b := &BulkConnect{ - d: d, - chainType: d.chainParser.GetChainType(), - txAddressesMap: make(map[string]*TxAddresses), - balances: make(map[string]*AddrBalance), + d: d, + chainType: d.chainParser.GetChainType(), + txAddressesMap: make(map[string]*TxAddresses), + balances: make(map[string]*AddrBalance), + addressContracts: make(map[string]*AddrContracts), } - if b.chainType == bchain.ChainBitcoinType { - if err := d.SetInconsistentState(true); err != nil { - return nil, err - } - glog.Info("rocksdb: bulk connect init, db set to inconsistent state") - } else { - glog.Info("rocksdb: bulk connect init") + if err := d.SetInconsistentState(true); err != nil { + return nil, err } + glog.Info("rocksdb: bulk connect init, db set to inconsistent state") return b, nil } @@ -169,13 +169,7 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error { return nil } -// ConnectBlock connects block in bulk mode -func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error { - b.height = block.Height - // for non bitcoin types connect blocks in non bulk mode - if b.chainType != bchain.ChainBitcoinType { - return b.d.ConnectBlock(block) - } +func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs bool) error { addresses := make(addressesMap) if err := b.d.processAddressesBitcoinType(block, addresses, b.txAddressesMap, b.balances); err != nil { return err @@ -240,15 +234,128 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro return nil } +func (b *BulkConnect) storeAddressContracts(wb *gorocksdb.WriteBatch, all bool) (int, error) { + var ac map[string]*AddrContracts + if all { + ac = b.addressContracts + b.addressContracts = make(map[string]*AddrContracts) + } else { + ac = make(map[string]*AddrContracts) + // store some random address contracts + for k, a := range b.addressContracts { + ac[k] = a + delete(b.addressContracts, k) + if len(ac) >= partialStoreAddrContracts { + break + } + } + } + if err := b.d.storeAddressContracts(wb, ac); err != nil { + return 0, err + } + return len(ac), nil +} + +func (b *BulkConnect) parallelStoreAddressContracts(c chan error, all bool) { + defer close(c) + start := time.Now() + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + count, err := b.storeAddressContracts(wb, all) + if err != nil { + c <- err + return + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { + c <- err + return + } + glog.Info("rocksdb: height ", b.height, ", stored ", count, " addressContracts, ", len(b.addressContracts), " remaining, done in ", time.Since(start)) + c <- nil +} + +func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTxs bool) error { + addresses := make(addressesMap) + blockTxs, err := b.d.processAddressesEthereumType(block, addresses, b.addressContracts) + if err != nil { + return err + } + var storeAddrContracts chan error + var sa bool + if len(b.addressContracts) > maxBulkAddrContracts { + sa = true + storeAddrContracts = make(chan error) + go b.parallelStoreAddressContracts(storeAddrContracts, false) + } + b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{ + bi: BlockInfo{ + Hash: block.Hash, + Time: block.Time, + Txs: uint32(len(block.Txs)), + Size: uint32(block.Size), + Height: block.Height, + }, + addresses: addresses, + }) + b.bulkAddressesCount += len(addresses) + // open WriteBatch only if going to write + if sa || b.bulkAddressesCount > maxBulkAddresses || storeBlockTxs { + start := time.Now() + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + bac := b.bulkAddressesCount + if sa || b.bulkAddressesCount > maxBulkAddresses { + if err := b.storeBulkAddresses(wb); err != nil { + return err + } + } + if storeBlockTxs { + if err := b.d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil { + return err + } + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { + return err + } + if bac > b.bulkAddressesCount { + glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) + } + } + if storeAddrContracts != nil { + if err := <-storeAddrContracts; err != nil { + return err + } + } + return nil +} + +// ConnectBlock connects block in bulk mode +func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error { + b.height = block.Height + if b.chainType == bchain.ChainBitcoinType { + return b.connectBlockBitcoinType(block, storeBlockTxs) + } else if b.chainType == bchain.ChainEthereumType { + return b.connectBlockEthereumType(block, storeBlockTxs) + } + // for default is to connect blocks in non bulk mode + return b.d.ConnectBlock(block) +} + // Close flushes the cached data and switches DB from inconsistent state open // after Close, the BulkConnect cannot be used func (b *BulkConnect) Close() error { glog.Info("rocksdb: bulk connect closing") start := time.Now() - storeAddressesChan := make(chan error) - go b.parallelStoreTxAddresses(storeAddressesChan, true) - storeBalancesChan := make(chan error) - go b.parallelStoreBalances(storeBalancesChan, true) + var storeTxAddressesChan, storeBalancesChan, storeAddressContractsChan chan error + if b.chainType == bchain.ChainBitcoinType { + storeTxAddressesChan = make(chan error) + go b.parallelStoreTxAddresses(storeTxAddressesChan, true) + storeBalancesChan = make(chan error) + go b.parallelStoreBalances(storeBalancesChan, true) + } else if b.chainType == bchain.ChainEthereumType { + storeAddressContractsChan = make(chan error) + go b.parallelStoreAddressContracts(storeAddressContractsChan, true) + } wb := gorocksdb.NewWriteBatch() defer wb.Destroy() bac := b.bulkAddressesCount @@ -259,11 +366,20 @@ func (b *BulkConnect) Close() error { return err } glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) - if err := <-storeAddressesChan; err != nil { - return err + if storeTxAddressesChan != nil { + if err := <-storeTxAddressesChan; err != nil { + return err + } } - if err := <-storeBalancesChan; err != nil { - return err + if storeBalancesChan != nil { + if err := <-storeBalancesChan; err != nil { + return err + } + } + if storeAddressContractsChan != nil { + if err := <-storeAddressContractsChan; err != nil { + return err + } } if err := b.d.SetInconsistentState(false); err != nil { return err