From 91031715f70f5d41218283d57713625b50a7662e Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 29 Dec 2021 00:20:52 +0100 Subject: [PATCH] Bulk import ETH internal transactions --- bchain/coins/eth/ethrpc.go | 15 ++++--- bchain/types.go | 3 +- bchain/types_ethereum_type.go | 4 ++ db/bulkconnect.go | 22 +++++++++ db/rocksdb.go | 13 +++++- db/rocksdb_ethereumtype.go | 28 +++++++++++- db/rocksdb_ethereumtype_test.go | 80 ++++++++++++++++++++++++++++++--- 7 files changed, 150 insertions(+), 15 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index da3e02d8..df3dc52f 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -554,7 +554,6 @@ func (b *EthereumRPC) processCallTrace(call rpcCallTrace, d *bchain.EthereumInte } // getInternalDataForBlock fetches debug trace using callTracer, extracts internal transfers and creations and destructions of contracts -// by design, it never returns error so that missing internal transactions do not stop the rest of the blockchain import func (b *EthereumRPC) getInternalDataForBlock(blockHash string, transactions []bchain.RpcTransaction) ([]bchain.EthereumInternalData, error) { data := make([]bchain.EthereumInternalData, len(transactions)) if b.ChainConfig.ProcessInternalTransactions { @@ -564,11 +563,11 @@ func (b *EthereumRPC) getInternalDataForBlock(blockHash string, transactions []b err := b.rpc.CallContext(ctx, &trace, "debug_traceBlockByHash", blockHash, map[string]interface{}{"tracer": "callTracer"}) if err != nil { glog.Error("debug_traceBlockByHash block ", blockHash, ", error ", err) - return data, nil + return data, err } if len(trace) != len(data) { glog.Error("debug_traceBlockByHash block ", blockHash, ", error: trace length does not match block length ", len(trace), "!=", len(data)) - return data, nil + return data, err } for i, result := range trace { r := &result.Result @@ -610,10 +609,11 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error if err != nil { return nil, err } - + // error fetching internal data does not stop the block processing + var blockSpecificData *bchain.EthereumBlockSpecificData internalData, err := b.getInternalDataForBlock(head.Hash, body.Transactions) if err != nil { - return nil, err + blockSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: err.Error()} } btxs := make([]bchain.Tx, len(body.Transactions)) @@ -629,8 +629,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error } } bbk := bchain.Block{ - BlockHeader: *bbh, - Txs: btxs, + BlockHeader: *bbh, + Txs: btxs, + CoinSpecificData: blockSpecificData, } return &bbk, nil } diff --git a/bchain/types.go b/bchain/types.go index 555db84a..ae590abc 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -116,7 +116,8 @@ type MempoolTx struct { // Block is block header and list of transactions type Block struct { BlockHeader - Txs []Tx `json:"tx"` + Txs []Tx `json:"tx"` + CoinSpecificData interface{} `json:"-"` } // BlockHeader contains limited data (as needed for indexing) from backend block header diff --git a/bchain/types_ethereum_type.go b/bchain/types_ethereum_type.go index 94e4ecc2..93bf3c97 100644 --- a/bchain/types_ethereum_type.go +++ b/bchain/types_ethereum_type.go @@ -83,3 +83,7 @@ type EthereumSpecificData struct { InternalData *EthereumInternalData `json:"internalData,omitempty"` Receipt *RpcReceipt `json:"receipt,omitempty"` } + +type EthereumBlockSpecificData struct { + InternalDataError string +} diff --git a/db/bulkconnect.go b/db/bulkconnect.go index 27412eed..f6bf4ba0 100644 --- a/db/bulkconnect.go +++ b/db/bulkconnect.go @@ -25,6 +25,7 @@ type BulkConnect struct { chainType bchain.ChainType bulkAddresses []bulkAddresses bulkAddressesCount int + ethBlockTxs []ethBlockTx txAddressesMap map[string]*TxAddresses balances map[string]*AddrBalance addressContracts map[string]*AddrContracts @@ -280,6 +281,7 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx if err != nil { return err } + b.ethBlockTxs = append(b.ethBlockTxs, blockTxs...) var storeAddrContracts chan error var sa bool if len(b.addressContracts) > maxBulkAddrContracts { @@ -309,6 +311,16 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx return err } } + if err := b.d.storeInternalDataEthereumType(wb, b.ethBlockTxs); err != nil { + return err + } + b.ethBlockTxs = b.ethBlockTxs[:0] + blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) + if blockSpecificData != nil && blockSpecificData.InternalDataError != "" { + if err := b.d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil { + return err + } + } if storeBlockTxs { if err := b.d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil { return err @@ -320,6 +332,16 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx if bac > b.bulkAddressesCount { glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) } + } else { + // if there is InternalDataError, store it + blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) + if blockSpecificData != nil && blockSpecificData.InternalDataError != "" { + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + if err := b.d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil { + return err + } + } } if storeAddrContracts != nil { if err := <-storeAddrContracts; err != nil { diff --git a/db/rocksdb.go b/db/rocksdb.go index 62c771b0..4438db17 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -117,6 +117,8 @@ const ( cfAddressContracts = iota - __break__ + cfAddressBalance - 1 cfInternalData cfContracts + cfFunctionSignatures + cfBlockInternalDataErrors ) // common columns @@ -125,7 +127,7 @@ var cfBaseNames = []string{"default", "height", "addresses", "blockTxs", "transa // type specific columns var cfNamesBitcoinType = []string{"addressBalance", "txAddresses"} -var cfNamesEthereumType = []string{"addressContracts", "internalData", "contracts"} +var cfNamesEthereumType = []string{"addressContracts", "internalData", "contracts", "functionSignatures", "blockInternalDataErrors"} func openDB(path string, c *gorocksdb.Cache, openFiles int) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { // opts with bloom filter @@ -479,6 +481,15 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { if err := d.storeAddressContracts(wb, addressContracts); err != nil { return err } + if err := d.storeInternalDataEthereumType(wb, blockTxs); err != nil { + return err + } + blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) + if blockSpecificData != nil && blockSpecificData.InternalDataError != "" { + if err := d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil { + return err + } + } if err := d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil { return err } diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index a46cac27..cde8eab0 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -419,6 +419,16 @@ func (d *RocksDB) GetEthereumInternalData(txid string) (*bchain.EthereumInternal return d.unpackEthInternalData(buf) } +func (d *RocksDB) storeInternalDataEthereumType(wb *gorocksdb.WriteBatch, blockTxs []ethBlockTx) error { + for i := range blockTxs { + blockTx := &blockTxs[i] + if blockTx.internalData != nil { + wb.PutCF(d.cfh[cfInternalData], blockTx.btxID, packEthInternalData(blockTx.internalData)) + } + } + return nil +} + func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch, block *bchain.Block, blockTxs []ethBlockTx) error { pl := d.chainParser.PackedTxidLen() buf := make([]byte, 0, (pl+2*eth.EthereumTypeAddressDescriptorLen)*len(blockTxs)) @@ -438,7 +448,6 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch, // internal data - store the number of addresses, with odd number the CREATE tx type var internalDataTransfers uint if blockTx.internalData != nil { - wb.PutCF(d.cfh[cfInternalData], blockTx.btxID, packEthInternalData(blockTx.internalData)) internalDataTransfers = uint(len(blockTx.internalData.transfers)) * 2 if blockTx.internalData.internalType == bchain.CREATE { internalDataTransfers++ @@ -470,6 +479,22 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch, return d.cleanupBlockTxs(wb, block) } +func (d *RocksDB) storeBlockInternalDataErrorEthereumType(wb *gorocksdb.WriteBatch, block *bchain.Block, message string) error { + key := packUint(block.Height) + txid, err := d.chainParser.PackTxid(block.Hash) + if err != nil { + return err + } + m := []byte(message) + buf := make([]byte, 0, len(txid)+len(m)+1) + // the stored structure is txid+retry count (1 byte)+error message + buf = append(buf, txid...) + buf = append(buf, 0) + buf = append(buf, m...) + wb.PutCF(d.cfh[cfBlockInternalDataErrors], key, buf) + return nil +} + func (d *RocksDB) getBlockTxsEthereumType(height uint32) ([]ethBlockTx, error) { pl := d.chainParser.PackedTxidLen() val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], packUint(height)) @@ -702,6 +727,7 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32) key := packUint(height) wb.DeleteCF(d.cfh[cfBlockTxs], key) wb.DeleteCF(d.cfh[cfHeight], key) + wb.DeleteCF(d.cfh[cfBlockInternalDataErrors], key) } d.storeAddressContracts(wb, contracts) err := d.db.Write(d.wo, wb) diff --git a/db/rocksdb_ethereumtype_test.go b/db/rocksdb_ethereumtype_test.go index 20df296d..d3ce848b 100644 --- a/db/rocksdb_ethereumtype_test.go +++ b/db/rocksdb_ethereumtype_test.go @@ -10,6 +10,7 @@ import ( "github.com/juju/errors" "github.com/trezor/blockbook/bchain" "github.com/trezor/blockbook/bchain/coins/eth" + "github.com/trezor/blockbook/common" "github.com/trezor/blockbook/tests/dbtestdata" ) @@ -101,7 +102,7 @@ func verifyAfterEthereumTypeBlock1(t *testing.T, d *RocksDB, afterDisconnect boo } } -func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB) { +func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB, wantBlockInternalDataError bool) { if err := checkColumn(d, cfHeight, []keyPair{ { "0041eee8", @@ -202,6 +203,22 @@ func verifyAfterEthereumTypeBlock2(t *testing.T, d *RocksDB) { t.Fatal(err) } } + + var internalDataError []keyPair + if wantBlockInternalDataError { + internalDataError = []keyPair{ + { + "0041eee9", + "2b57e15e93a0ed197417a34c2498b7187df79099572c04a6b6e6ff418f74e6ee" + "00" + hex.EncodeToString([]byte("test error")), + nil, + }, + } + } + if err := checkColumn(d, cfBlockInternalDataErrors, internalDataError); err != nil { + { + t.Fatal(err) + } + } } func formatInternalData(in *bchain.EthereumInternalData) *bchain.EthereumInternalData { @@ -247,12 +264,14 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) } - // connect 2nd block + // connect 2nd block, simulate InternalDataError block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser) + block2.CoinSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: "test error"} if err := d.ConnectBlock(block2); err != nil { t.Fatal(err) } - verifyAfterEthereumTypeBlock2(t, d) + verifyAfterEthereumTypeBlock2(t, d, true) + block2.CoinSpecificData = nil if len(d.is.BlockTimes) != 2 { t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes)) @@ -350,7 +369,7 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { if err == nil || err.Error() != "Cannot disconnect blocks with height 4321000 and lower. It is necessary to rebuild index." { t.Fatal(err) } - verifyAfterEthereumTypeBlock2(t, d) + verifyAfterEthereumTypeBlock2(t, d, true) // disconnect the 2nd block, verify that the db contains only data from the 1st block with restored unspentTxs // and that the cached tx is removed @@ -373,10 +392,61 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { if err := d.ConnectBlock(block2); err != nil { t.Fatal(err) } - verifyAfterEthereumTypeBlock2(t, d) + verifyAfterEthereumTypeBlock2(t, d, false) if len(d.is.BlockTimes) != 2 { t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes)) } } + +func Test_BulkConnect_EthereumType(t *testing.T) { + d := setupRocksDB(t, &testEthereumParser{ + EthereumParser: ethereumTestnetParser(), + }) + defer closeAndDestroyRocksDB(t, d) + + bc, err := d.InitBulkConnect() + if err != nil { + t.Fatal(err) + } + + if d.is.DbState != common.DbStateInconsistent { + t.Fatal("DB not in DbStateInconsistent") + } + + if len(d.is.BlockTimes) != 0 { + t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes)) + } + + if err := bc.ConnectBlock(dbtestdata.GetTestEthereumTypeBlock1(d.chainParser), false); err != nil { + t.Fatal(err) + } + if err := checkColumn(d, cfBlockTxs, []keyPair{}); err != nil { + { + t.Fatal(err) + } + } + + // connect 2nd block, simulate InternalDataError + block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser) + block2.CoinSpecificData = &bchain.EthereumBlockSpecificData{InternalDataError: "test error"} + if err := bc.ConnectBlock(block2, true); err != nil { + t.Fatal(err) + } + block2.CoinSpecificData = nil + + if err := bc.Close(); err != nil { + t.Fatal(err) + } + + if d.is.DbState != common.DbStateOpen { + t.Fatal("DB not in DbStateOpen") + } + + verifyAfterEthereumTypeBlock2(t, d, true) + + if len(d.is.BlockTimes) != 4321002 { + t.Fatal("Expecting is.BlockTimes 4321002, got ", len(d.is.BlockTimes)) + } +}