Add bulk import of EthereumType chain
This commit is contained in:
parent
f109513464
commit
c2a581ea72
@ -27,33 +27,33 @@ type BulkConnect struct {
|
||||
bulkAddressesCount int
|
||||
txAddressesMap map[string]*TxAddresses
|
||||
balances map[string]*AddrBalance
|
||||
addressContracts map[string]*AddrContracts
|
||||
height uint32
|
||||
}
|
||||
|
||||
const (
|
||||
maxBulkAddresses = 200000
|
||||
maxBulkTxAddresses = 500000
|
||||
partialStoreAddresses = maxBulkTxAddresses / 10
|
||||
maxBulkBalances = 800000
|
||||
partialStoreBalances = maxBulkBalances / 10
|
||||
maxBulkAddresses = 200000
|
||||
maxBulkTxAddresses = 500000
|
||||
partialStoreAddresses = maxBulkTxAddresses / 10
|
||||
maxBulkBalances = 800000
|
||||
partialStoreBalances = maxBulkBalances / 10
|
||||
maxBulkAddrContracts = 1200000
|
||||
partialStoreAddrContracts = maxBulkAddrContracts / 10
|
||||
)
|
||||
|
||||
// InitBulkConnect initializes bulk connect and switches DB to inconsistent state
|
||||
func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) {
|
||||
b := &BulkConnect{
|
||||
d: d,
|
||||
chainType: d.chainParser.GetChainType(),
|
||||
txAddressesMap: make(map[string]*TxAddresses),
|
||||
balances: make(map[string]*AddrBalance),
|
||||
d: d,
|
||||
chainType: d.chainParser.GetChainType(),
|
||||
txAddressesMap: make(map[string]*TxAddresses),
|
||||
balances: make(map[string]*AddrBalance),
|
||||
addressContracts: make(map[string]*AddrContracts),
|
||||
}
|
||||
if b.chainType == bchain.ChainBitcoinType {
|
||||
if err := d.SetInconsistentState(true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Info("rocksdb: bulk connect init, db set to inconsistent state")
|
||||
} else {
|
||||
glog.Info("rocksdb: bulk connect init")
|
||||
if err := d.SetInconsistentState(true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Info("rocksdb: bulk connect init, db set to inconsistent state")
|
||||
return b, nil
|
||||
}
|
||||
|
||||
@ -169,13 +169,7 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectBlock connects block in bulk mode
|
||||
func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error {
|
||||
b.height = block.Height
|
||||
// for non bitcoin types connect blocks in non bulk mode
|
||||
if b.chainType != bchain.ChainBitcoinType {
|
||||
return b.d.ConnectBlock(block)
|
||||
}
|
||||
func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs bool) error {
|
||||
addresses := make(addressesMap)
|
||||
if err := b.d.processAddressesBitcoinType(block, addresses, b.txAddressesMap, b.balances); err != nil {
|
||||
return err
|
||||
@ -240,15 +234,128 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BulkConnect) storeAddressContracts(wb *gorocksdb.WriteBatch, all bool) (int, error) {
|
||||
var ac map[string]*AddrContracts
|
||||
if all {
|
||||
ac = b.addressContracts
|
||||
b.addressContracts = make(map[string]*AddrContracts)
|
||||
} else {
|
||||
ac = make(map[string]*AddrContracts)
|
||||
// store some random address contracts
|
||||
for k, a := range b.addressContracts {
|
||||
ac[k] = a
|
||||
delete(b.addressContracts, k)
|
||||
if len(ac) >= partialStoreAddrContracts {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := b.d.storeAddressContracts(wb, ac); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(ac), nil
|
||||
}
|
||||
|
||||
func (b *BulkConnect) parallelStoreAddressContracts(c chan error, all bool) {
|
||||
defer close(c)
|
||||
start := time.Now()
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
count, err := b.storeAddressContracts(wb, all)
|
||||
if err != nil {
|
||||
c <- err
|
||||
return
|
||||
}
|
||||
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||
c <- err
|
||||
return
|
||||
}
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", count, " addressContracts, ", len(b.addressContracts), " remaining, done in ", time.Since(start))
|
||||
c <- nil
|
||||
}
|
||||
|
||||
func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTxs bool) error {
|
||||
addresses := make(addressesMap)
|
||||
blockTxs, err := b.d.processAddressesEthereumType(block, addresses, b.addressContracts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var storeAddrContracts chan error
|
||||
var sa bool
|
||||
if len(b.addressContracts) > maxBulkAddrContracts {
|
||||
sa = true
|
||||
storeAddrContracts = make(chan error)
|
||||
go b.parallelStoreAddressContracts(storeAddrContracts, false)
|
||||
}
|
||||
b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{
|
||||
bi: BlockInfo{
|
||||
Hash: block.Hash,
|
||||
Time: block.Time,
|
||||
Txs: uint32(len(block.Txs)),
|
||||
Size: uint32(block.Size),
|
||||
Height: block.Height,
|
||||
},
|
||||
addresses: addresses,
|
||||
})
|
||||
b.bulkAddressesCount += len(addresses)
|
||||
// open WriteBatch only if going to write
|
||||
if sa || b.bulkAddressesCount > maxBulkAddresses || storeBlockTxs {
|
||||
start := time.Now()
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
bac := b.bulkAddressesCount
|
||||
if sa || b.bulkAddressesCount > maxBulkAddresses {
|
||||
if err := b.storeBulkAddresses(wb); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if storeBlockTxs {
|
||||
if err := b.d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||
return err
|
||||
}
|
||||
if bac > b.bulkAddressesCount {
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
|
||||
}
|
||||
}
|
||||
if storeAddrContracts != nil {
|
||||
if err := <-storeAddrContracts; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectBlock connects block in bulk mode
|
||||
func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error {
|
||||
b.height = block.Height
|
||||
if b.chainType == bchain.ChainBitcoinType {
|
||||
return b.connectBlockBitcoinType(block, storeBlockTxs)
|
||||
} else if b.chainType == bchain.ChainEthereumType {
|
||||
return b.connectBlockEthereumType(block, storeBlockTxs)
|
||||
}
|
||||
// for default is to connect blocks in non bulk mode
|
||||
return b.d.ConnectBlock(block)
|
||||
}
|
||||
|
||||
// Close flushes the cached data and switches DB from inconsistent state open
|
||||
// after Close, the BulkConnect cannot be used
|
||||
func (b *BulkConnect) Close() error {
|
||||
glog.Info("rocksdb: bulk connect closing")
|
||||
start := time.Now()
|
||||
storeAddressesChan := make(chan error)
|
||||
go b.parallelStoreTxAddresses(storeAddressesChan, true)
|
||||
storeBalancesChan := make(chan error)
|
||||
go b.parallelStoreBalances(storeBalancesChan, true)
|
||||
var storeTxAddressesChan, storeBalancesChan, storeAddressContractsChan chan error
|
||||
if b.chainType == bchain.ChainBitcoinType {
|
||||
storeTxAddressesChan = make(chan error)
|
||||
go b.parallelStoreTxAddresses(storeTxAddressesChan, true)
|
||||
storeBalancesChan = make(chan error)
|
||||
go b.parallelStoreBalances(storeBalancesChan, true)
|
||||
} else if b.chainType == bchain.ChainEthereumType {
|
||||
storeAddressContractsChan = make(chan error)
|
||||
go b.parallelStoreAddressContracts(storeAddressContractsChan, true)
|
||||
}
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
bac := b.bulkAddressesCount
|
||||
@ -259,11 +366,20 @@ func (b *BulkConnect) Close() error {
|
||||
return err
|
||||
}
|
||||
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
|
||||
if err := <-storeAddressesChan; err != nil {
|
||||
return err
|
||||
if storeTxAddressesChan != nil {
|
||||
if err := <-storeTxAddressesChan; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := <-storeBalancesChan; err != nil {
|
||||
return err
|
||||
if storeBalancesChan != nil {
|
||||
if err := <-storeBalancesChan; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if storeAddressContractsChan != nil {
|
||||
if err := <-storeAddressContractsChan; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := b.d.SetInconsistentState(false); err != nil {
|
||||
return err
|
||||
|
||||
Loading…
Reference in New Issue
Block a user