Bulk import ETH internal transactions

This commit is contained in:
Martin Boehm 2021-12-29 00:20:52 +01:00 committed by Martin
parent 664f58bdd5
commit 91031715f7
7 changed files with 150 additions and 15 deletions

View File

@ -554,7 +554,6 @@ func (b *EthereumRPC) processCallTrace(call rpcCallTrace, d *bchain.EthereumInte
}
// getInternalDataForBlock fetches debug trace using callTracer, extracts internal transfers and creations and destructions of contracts
// by design, it never returns error so that missing internal transactions do not stop the rest of the blockchain import
func (b *EthereumRPC) getInternalDataForBlock(blockHash string, transactions []bchain.RpcTransaction) ([]bchain.EthereumInternalData, error) {
data := make([]bchain.EthereumInternalData, len(transactions))
if b.ChainConfig.ProcessInternalTransactions {
@ -564,11 +563,11 @@ func (b *EthereumRPC) getInternalDataForBlock(blockHash string, transactions []b
err := b.rpc.CallContext(ctx, &trace, "debug_traceBlockByHash", blockHash, map[string]interface{}{"tracer": "callTracer"})
if err != nil {
glog.Error("debug_traceBlockByHash block ", blockHash, ", error ", err)
return data, nil
return data, err
}
if len(trace) != len(data) {
glog.Error("debug_traceBlockByHash block ", blockHash, ", error: trace length does not match block length ", len(trace), "!=", len(data))
return data, nil
return data, err
}
for i, result := range trace {
r := &result.Result
@ -610,10 +609,11 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
if err != nil {
return nil, err
}
// error fetching internal data does not stop the block processing
var blockSpecificData *bchain.EthereumBlockSpecificData
internalData, err := b.getInternalDataForBlock(head.Hash, body.Transactions)
if err != nil {
return nil, err
blockSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: err.Error()}
}
btxs := make([]bchain.Tx, len(body.Transactions))
@ -629,8 +629,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
}
}
bbk := bchain.Block{
BlockHeader: *bbh,
Txs: btxs,
BlockHeader: *bbh,
Txs: btxs,
CoinSpecificData: blockSpecificData,
}
return &bbk, nil
}

View File

@ -116,7 +116,8 @@ type MempoolTx struct {
// Block is block header and list of transactions
type Block struct {
BlockHeader
Txs []Tx `json:"tx"`
Txs []Tx `json:"tx"`
CoinSpecificData interface{} `json:"-"`
}
// BlockHeader contains limited data (as needed for indexing) from backend block header

View File

@ -83,3 +83,7 @@ type EthereumSpecificData struct {
InternalData *EthereumInternalData `json:"internalData,omitempty"`
Receipt *RpcReceipt `json:"receipt,omitempty"`
}
type EthereumBlockSpecificData struct {
InternalDataError string
}

View File

@ -25,6 +25,7 @@ type BulkConnect struct {
chainType bchain.ChainType
bulkAddresses []bulkAddresses
bulkAddressesCount int
ethBlockTxs []ethBlockTx
txAddressesMap map[string]*TxAddresses
balances map[string]*AddrBalance
addressContracts map[string]*AddrContracts
@ -280,6 +281,7 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx
if err != nil {
return err
}
b.ethBlockTxs = append(b.ethBlockTxs, blockTxs...)
var storeAddrContracts chan error
var sa bool
if len(b.addressContracts) > maxBulkAddrContracts {
@ -309,6 +311,16 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx
return err
}
}
if err := b.d.storeInternalDataEthereumType(wb, b.ethBlockTxs); err != nil {
return err
}
b.ethBlockTxs = b.ethBlockTxs[:0]
blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData)
if blockSpecificData != nil && blockSpecificData.InternalDataError != "" {
if err := b.d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil {
return err
}
}
if storeBlockTxs {
if err := b.d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil {
return err
@ -320,6 +332,16 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx
if bac > b.bulkAddressesCount {
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
}
} else {
// if there is InternalDataError, store it
blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData)
if blockSpecificData != nil && blockSpecificData.InternalDataError != "" {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := b.d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil {
return err
}
}
}
if storeAddrContracts != nil {
if err := <-storeAddrContracts; err != nil {

View File

@ -117,6 +117,8 @@ const (
cfAddressContracts = iota - __break__ + cfAddressBalance - 1
cfInternalData
cfContracts
cfFunctionSignatures
cfBlockInternalDataErrors
)
// common columns
@ -125,7 +127,7 @@ var cfBaseNames = []string{"default", "height", "addresses", "blockTxs", "transa
// type specific columns
var cfNamesBitcoinType = []string{"addressBalance", "txAddresses"}
var cfNamesEthereumType = []string{"addressContracts", "internalData", "contracts"}
var cfNamesEthereumType = []string{"addressContracts", "internalData", "contracts", "functionSignatures", "blockInternalDataErrors"}
func openDB(path string, c *gorocksdb.Cache, openFiles int) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
// opts with bloom filter
@ -479,6 +481,15 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
if err := d.storeAddressContracts(wb, addressContracts); err != nil {
return err
}
if err := d.storeInternalDataEthereumType(wb, blockTxs); err != nil {
return err
}
blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData)
if blockSpecificData != nil && blockSpecificData.InternalDataError != "" {
if err := d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil {
return err
}
}
if err := d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil {
return err
}

View File

@ -419,6 +419,16 @@ func (d *RocksDB) GetEthereumInternalData(txid string) (*bchain.EthereumInternal
return d.unpackEthInternalData(buf)
}
func (d *RocksDB) storeInternalDataEthereumType(wb *gorocksdb.WriteBatch, blockTxs []ethBlockTx) error {
for i := range blockTxs {
blockTx := &blockTxs[i]
if blockTx.internalData != nil {
wb.PutCF(d.cfh[cfInternalData], blockTx.btxID, packEthInternalData(blockTx.internalData))
}
}
return nil
}
func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch, block *bchain.Block, blockTxs []ethBlockTx) error {
pl := d.chainParser.PackedTxidLen()
buf := make([]byte, 0, (pl+2*eth.EthereumTypeAddressDescriptorLen)*len(blockTxs))
@ -438,7 +448,6 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch,
// internal data - store the number of addresses, with odd number the CREATE tx type
var internalDataTransfers uint
if blockTx.internalData != nil {
wb.PutCF(d.cfh[cfInternalData], blockTx.btxID, packEthInternalData(blockTx.internalData))
internalDataTransfers = uint(len(blockTx.internalData.transfers)) * 2
if blockTx.internalData.internalType == bchain.CREATE {
internalDataTransfers++
@ -470,6 +479,22 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch,
return d.cleanupBlockTxs(wb, block)
}
func (d *RocksDB) storeBlockInternalDataErrorEthereumType(wb *gorocksdb.WriteBatch, block *bchain.Block, message string) error {
key := packUint(block.Height)
txid, err := d.chainParser.PackTxid(block.Hash)
if err != nil {
return err
}
m := []byte(message)
buf := make([]byte, 0, len(txid)+len(m)+1)
// the stored structure is txid+retry count (1 byte)+error message
buf = append(buf, txid...)
buf = append(buf, 0)
buf = append(buf, m...)
wb.PutCF(d.cfh[cfBlockInternalDataErrors], key, buf)
return nil
}
func (d *RocksDB) getBlockTxsEthereumType(height uint32) ([]ethBlockTx, error) {
pl := d.chainParser.PackedTxidLen()
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], packUint(height))
@ -702,6 +727,7 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32)
key := packUint(height)
wb.DeleteCF(d.cfh[cfBlockTxs], key)
wb.DeleteCF(d.cfh[cfHeight], key)
wb.DeleteCF(d.cfh[cfBlockInternalDataErrors], key)
}
d.storeAddressContracts(wb, contracts)
err := d.db.Write(d.wo, wb)

View File

@ -10,6 +10,7 @@ import (
"github.com/juju/errors"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/bchain/coins/eth"
"github.com/trezor/blockbook/common"
"github.com/trezor/blockbook/tests/dbtestdata"
)
@ -101,7 +102,7 @@ func verifyAfterEthereumTypeBlock1(t *testing.T, d *RocksDB, afterDisconnect boo
}
}
func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB) {
func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB, wantBlockInternalDataError bool) {
if err := checkColumn(d, cfHeight, []keyPair{
{
"0041eee8",
@ -202,6 +203,22 @@ func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB) {
t.Fatal(err)
}
}
var internalDataError []keyPair
if wantBlockInternalDataError {
internalDataError = []keyPair{
{
"0041eee9",
"2b57e15e93a0ed197417a34c2498b7187df79099572c04a6b6e6ff418f74e6ee" + "00" + hex.EncodeToString([]byte("test error")),
nil,
},
}
}
if err := checkColumn(d, cfBlockInternalDataErrors, internalDataError); err != nil {
{
t.Fatal(err)
}
}
}
func formatInternalData(in *bchain.EthereumInternalData) *bchain.EthereumInternalData {
@ -247,12 +264,14 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// connect 2nd block
// connect 2nd block, simulate InternalDataError
block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser)
block2.CoinSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: "test error"}
if err := d.ConnectBlock(block2); err != nil {
t.Fatal(err)
}
verifyAfterEthereumTypeBlock2(t, d)
verifyAfterEthereumTypeBlock2(t, d, true)
block2.CoinSpecificData = nil
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes))
@ -350,7 +369,7 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
if err == nil || err.Error() != "Cannot disconnect blocks with height 4321000 and lower. It is necessary to rebuild index." {
t.Fatal(err)
}
verifyAfterEthereumTypeBlock2(t, d)
verifyAfterEthereumTypeBlock2(t, d, true)
// disconnect the 2nd block, verify that the db contains only data from the 1st block with restored unspentTxs
// and that the cached tx is removed
@ -373,10 +392,61 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
if err := d.ConnectBlock(block2); err != nil {
t.Fatal(err)
}
verifyAfterEthereumTypeBlock2(t, d)
verifyAfterEthereumTypeBlock2(t, d, false)
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes))
}
}
func Test_BulkConnect_EthereumType(t *testing.T) {
d := setupRocksDB(t, &testEthereumParser{
EthereumParser: ethereumTestnetParser(),
})
defer closeAndDestroyRocksDB(t, d)
bc, err := d.InitBulkConnect()
if err != nil {
t.Fatal(err)
}
if d.is.DbState != common.DbStateInconsistent {
t.Fatal("DB not in DbStateInconsistent")
}
if len(d.is.BlockTimes) != 0 {
t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes))
}
if err := bc.ConnectBlock(dbtestdata.GetTestEthereumTypeBlock1(d.chainParser), false); err != nil {
t.Fatal(err)
}
if err := checkColumn(d, cfBlockTxs, []keyPair{}); err != nil {
{
t.Fatal(err)
}
}
// connect 2nd block, simulate InternalDataError
block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser)
block2.CoinSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: "test error"}
if err := bc.ConnectBlock(block2, true); err != nil {
t.Fatal(err)
}
block2.CoinSpecificData = nil
if err := bc.Close(); err != nil {
t.Fatal(err)
}
if d.is.DbState != common.DbStateOpen {
t.Fatal("DB not in DbStateOpen")
}
verifyAfterEthereumTypeBlock2(t, d, true)
if len(d.is.BlockTimes) != 4321002 {
t.Fatal("Expecting is.BlockTimes 4321002, got ", len(d.is.BlockTimes))
}
}