diff --git a/api/ethereumtype.go b/api/ethereumtype.go new file mode 100644 index 00000000..b4aa9446 --- /dev/null +++ b/api/ethereumtype.go @@ -0,0 +1,92 @@ +package api + +import ( + "sync" + + "github.com/golang/glog" + "github.com/linxGnu/grocksdb" + "github.com/trezor/blockbook/bchain" + "github.com/trezor/blockbook/db" +) + +// refetch internal data +var refetchingInternalData = false +var refetchInternalDataMux sync.Mutex + +func (w *Worker) IsRefetchingInternalData() bool { + refetchInternalDataMux.Lock() + defer refetchInternalDataMux.Unlock() + return refetchingInternalData +} + +func (w *Worker) RefetchInternalData() error { + refetchInternalDataMux.Lock() + defer refetchInternalDataMux.Unlock() + if !refetchingInternalData { + refetchingInternalData = true + go w.RefetchInternalDataRoutine() + } + return nil +} + +const maxNumberOfRetires = 25 + +func (w *Worker) incrementRefetchInternalDataRetryCount(ie *db.BlockInternalDataError) { + wb := grocksdb.NewWriteBatch() + defer wb.Destroy() + err := w.db.StoreBlockInternalDataErrorEthereumType(wb, &bchain.Block{ + BlockHeader: bchain.BlockHeader{ + Hash: ie.Hash, + Height: ie.Height, + }, + }, ie.ErrorMessage, ie.Retries+1) + if err != nil { + glog.Errorf("StoreBlockInternalDataErrorEthereumType %d %s, error %v", ie.Height, ie.Hash, err) + } else { + w.db.WriteBatch(wb) + } +} + +func (w *Worker) RefetchInternalDataRoutine() { + internalErrors, err := w.db.GetBlockInternalDataErrorsEthereumType() + if err == nil { + for i := range internalErrors { + ie := &internalErrors[i] + if ie.Retries >= maxNumberOfRetires { + glog.Infof("Refetching internal data for %d %s, retries exceeded", ie.Height, ie.Hash) + continue + } + glog.Infof("Refetching internal data for %d %s, retries %d", ie.Height, ie.Hash, ie.Retries) + block, err := w.chain.GetBlock(ie.Hash, ie.Height) + var blockSpecificData *bchain.EthereumBlockSpecificData + if block != nil { + blockSpecificData, _ = block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) + } + if err != nil || block == nil || blockSpecificData == nil || blockSpecificData.InternalDataError != "" { + glog.Errorf("Refetching internal data for %d %s, error %v, retrying", ie.Height, ie.Hash, err) + // try for second time to fetch the data - the 2nd attempt after the first unsuccessful has many times higher probability of success + // probably something to do with data preloaded to cache on the backend + block, err = w.chain.GetBlock(ie.Hash, ie.Height) + if err != nil || block == nil { + glog.Errorf("Refetching internal data for %d %s, error %v", ie.Height, ie.Hash, err) + continue + } + } + blockSpecificData, _ = block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) + if blockSpecificData != nil && blockSpecificData.InternalDataError != "" { + glog.Errorf("Refetching internal data for %d %s, internal data error %v", ie.Height, ie.Hash, blockSpecificData.InternalDataError) + w.incrementRefetchInternalDataRetryCount(ie) + } else { + err = w.db.ReconnectInternalDataToBlockEthereumType(block) + if err != nil { + glog.Errorf("ReconnectInternalDataToBlockEthereumType %d %s, error %v", ie.Height, ie.Hash, err) + } else { + glog.Infof("Refetching internal data for %d %s, success", ie.Height, ie.Hash) + } + } + } + } + refetchInternalDataMux.Lock() + refetchingInternalData = false + refetchInternalDataMux.Unlock() +} diff --git a/build/docker/bin/Makefile b/build/docker/bin/Makefile index cc3b888e..0e52b18c 100644 --- a/build/docker/bin/Makefile +++ b/build/docker/bin/Makefile @@ -38,5 +38,5 @@ prepare-sources: mkdir -p $(BLOCKBOOK_BASE) cp -r /src $(BLOCKBOOK_SRC) cd $(BLOCKBOOK_SRC) && go mod download - sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 50 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ethereum/go-ethereum*/rpc/websocket.go - sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 50 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ava-labs/coreth*/rpc/websocket.go + sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 80 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ethereum/go-ethereum*/rpc/websocket.go + sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 80 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ava-labs/coreth*/rpc/websocket.go diff --git a/db/rocksdb.go b/db/rocksdb.go index 2c47c971..ce6e37bc 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -775,6 +775,24 @@ func addToAddressesMap(addresses addressesMap, strAddrDesc string, btxID []byte, return false } +func (d *RocksDB) getTxIndexesForAddressAndBlock(addrDesc bchain.AddressDescriptor, height uint32) ([]txIndexes, error) { + key := packAddressKey(addrDesc, height) + val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], key) + if err != nil { + return nil, err + } + defer val.Free() + // nil data means the key was not found in DB + if val.Data() == nil { + return nil, nil + } + rv, err := d.unpackTxIndexes(val.Data()) + if err != nil { + return nil, err + } + return rv, nil +} + func (d *RocksDB) storeAddresses(wb *grocksdb.WriteBatch, height uint32, addresses addressesMap) error { for addrDesc, txi := range addresses { ba := bchain.AddressDescriptor(addrDesc) @@ -1208,6 +1226,34 @@ func (d *RocksDB) packTxIndexes(txi []txIndexes) []byte { return buf } +func (d *RocksDB) unpackTxIndexes(buf []byte) ([]txIndexes, error) { + var retval []txIndexes + txidUnpackedLen := d.chainParser.PackedTxidLen() + for len(buf) > txidUnpackedLen { + btxID := make([]byte, txidUnpackedLen) + copy(btxID, buf[:txidUnpackedLen]) + indexes := make([]int32, 0, 16) + buf = buf[txidUnpackedLen:] + for { + index, l := unpackVarint32(buf) + indexes = append(indexes, index>>1) + buf = buf[l:] + if index&1 == 1 { + break + } + } + retval = append(retval, txIndexes{ + btxID: btxID, + indexes: indexes, + }) + } + // reverse the return values, packTxIndexes is storing it in reverse + for i, j := 0, len(retval)-1; i < j; i, j = i+1, j-1 { + retval[i], retval[j] = retval[j], retval[i] + } + return retval, nil +} + func (d *RocksDB) packOutpoints(outpoints []outpoint) []byte { buf := make([]byte, 0, 32) bvout := make([]byte, vlq.MaxLen32) diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index 3bb77eb7..e3169e22 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -388,7 +388,23 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse return nil } -func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*AddrContracts) error { +func (d *RocksDB) setAddressTxIndexesToAddressMap(addrDesc bchain.AddressDescriptor, height uint32, addresses addressesMap) error { + strAddrDesc := string(addrDesc) + _, found := addresses[strAddrDesc] + if !found { + txIndexes, err := d.getTxIndexesForAddressAndBlock(addrDesc, height) + if err != nil { + return err + } + if len(txIndexes) > 0 { + addresses[strAddrDesc] = txIndexes + } + } + return nil +} + +// existingBlock signals that internal data are reconnected to already indexed block after they failed during standard sync +func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*AddrContracts, existingBlock bool) error { blockTx.internalData = ðInternalData{ internalType: id.Type, errorMsg: id.Error, @@ -404,6 +420,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc blockTx.internalData.internalType = bchain.CALL } else { blockTx.internalData.contract = to + if existingBlock { + if err = d.setAddressTxIndexesToAddressMap(to, tx.BlockHeight, addresses); err != nil { + return err + } + } if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { return err } @@ -422,6 +443,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d to", err, tx.Txid, i) } } else { + if existingBlock { + if err = d.setAddressTxIndexesToAddressMap(to, tx.BlockHeight, addresses); err != nil { + return err + } + } if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { return err } @@ -433,6 +459,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d from", err, tx.Txid, i) } } else { + if existingBlock { + if err = d.setAddressTxIndexesToAddressMap(from, tx.BlockHeight, addresses); err != nil { + return err + } + } if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil { return err } @@ -498,7 +529,7 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad // process internal data eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData) if eid.InternalData != nil { - if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts); err != nil { + if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false); err != nil { return nil, err } } @@ -510,6 +541,53 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad return blockTxs, nil } +// ReconnectInternalDataToBlockEthereumType adds missing internal data to the block and stores them in db +func (d *RocksDB) ReconnectInternalDataToBlockEthereumType(block *bchain.Block) error { + wb := grocksdb.NewWriteBatch() + defer wb.Destroy() + if d.chainParser.GetChainType() != bchain.ChainEthereumType { + return errors.New("Unsupported chain type") + } + + addresses := make(addressesMap) + addressContracts := make(map[string]*AddrContracts) + + // process internal data + blockTxs := make([]ethBlockTx, len(block.Txs)) + for txi := range block.Txs { + tx := &block.Txs[txi] + eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData) + if eid.InternalData != nil { + btxID, err := d.chainParser.PackTxid(tx.Txid) + if err != nil { + return err + } + blockTx := &blockTxs[txi] + blockTx.btxID = btxID + tx.BlockHeight = block.Height + if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true); err != nil { + return err + } + } + } + + if err := d.storeAddressContracts(wb, addressContracts); err != nil { + return err + } + if err := d.storeInternalDataEthereumType(wb, blockTxs); err != nil { + return err + } + if err := d.storeAddresses(wb, block.Height, addresses); err != nil { + return err + } + // remove the block from the internal errors table + wb.DeleteCF(d.cfh[cfBlockInternalDataErrors], packUint(block.Height)) + if err := d.WriteBatch(wb); err != nil { + return err + } + return nil +} + var ethZeroAddress []byte = make([]byte, eth.EthereumTypeAddressDescriptorLen) func appendAddress(buf []byte, a bchain.AddressDescriptor) []byte { @@ -867,7 +945,7 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *grocksdb.WriteBatch, b return d.cleanupBlockTxs(wb, block) } -func (d *RocksDB) storeBlockInternalDataErrorEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, message string, retryCount uint8) error { +func (d *RocksDB) StoreBlockInternalDataErrorEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, message string, retryCount uint8) error { key := packUint(block.Height) // TODO: this supposes that Txid and block hash are the same size txid, err := d.chainParser.PackTxid(block.Hash) @@ -937,7 +1015,7 @@ func (d *RocksDB) storeBlockSpecificDataEthereumType(wb *grocksdb.WriteBatch, bl if blockSpecificData != nil { if blockSpecificData.InternalDataError != "" { glog.Info("storeBlockSpecificDataEthereumType ", block.Height, ": ", blockSpecificData.InternalDataError) - if err := d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError, 0); err != nil { + if err := d.StoreBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError, 0); err != nil { return err } } diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index bb051062..70dc5741 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -1596,3 +1596,79 @@ func Test_packUnpackString(t *testing.T) { }) } } + +func TestRocksDB_packTxIndexes_unpackTxIndexes(t *testing.T) { + type args struct { + txi []txIndexes + } + tests := []struct { + name string + data []txIndexes + hex string + }{ + { + name: "1", + data: []txIndexes{ + { + btxID: hexToBytes(dbtestdata.TxidB1T1), + indexes: []int32{1}, + }, + }, + hex: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384006", + }, + { + name: "2", + data: []txIndexes{ + { + btxID: hexToBytes(dbtestdata.TxidB1T1), + indexes: []int32{-2, 1, 3, 1234, -53241}, + }, + { + btxID: hexToBytes(dbtestdata.TxidB1T2), + indexes: []int32{-2, -1, 0, 1, 2, 3}, + }, + }, + hex: "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7507030004080e00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384007040ca6488cff61", + }, + { + name: "3", + data: []txIndexes{ + { + btxID: hexToBytes(dbtestdata.TxidB2T1), + indexes: []int32{-2, 1, 3}, + }, + { + btxID: hexToBytes(dbtestdata.TxidB1T1), + indexes: []int32{-2, -1, 0, 1, 2, 3}, + }, + { + btxID: hexToBytes(dbtestdata.TxidB1T2), + indexes: []int32{-2}, + }, + }, + hex: "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac750500b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384007030004080e7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d2507040e", + }, + } + d := &RocksDB{ + chainParser: &testBitcoinParser{ + BitcoinParser: bitcoinTestnetParser(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := d.packTxIndexes(tt.data) + hex := hex.EncodeToString(b) + if !reflect.DeepEqual(hex, tt.hex) { + t.Errorf("packTxIndexes() = %v, want %v", hex, tt.hex) + } + got, err := d.unpackTxIndexes(b) + if err != nil { + t.Errorf("unpackTxIndexes() error = %v", err) + return + } + if !reflect.DeepEqual(got, tt.data) { + t.Errorf("unpackTxIndexes() = %+v, want %+v", got, tt.data) + } + }) + } +} diff --git a/server/internal.go b/server/internal.go index 37bb2a7b..df04bc64 100644 --- a/server/internal.go +++ b/server/internal.go @@ -120,13 +120,13 @@ const ( // InternalTemplateData is used to transfer data to the templates type InternalTemplateData struct { - CoinName string - CoinShortcut string - CoinLabel string - ChainType bchain.ChainType - Error *api.APIError - InternalDataErrors []db.BlockInternalDataError - InInternalDataRetry bool + CoinName string + CoinShortcut string + CoinLabel string + ChainType bchain.ChainType + Error *api.APIError + InternalDataErrors []db.BlockInternalDataError + RefetchingInternalData bool } func (s *InternalServer) newTemplateData(r *http.Request) *InternalTemplateData { @@ -170,7 +170,10 @@ func (s *InternalServer) adminIndex(w http.ResponseWriter, r *http.Request) (tpl func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Request) (tpl, *InternalTemplateData, error) { if r.Method == http.MethodPost { - + err := s.api.RefetchInternalData() + if err != nil { + return errorTpl, nil, err + } } data := s.newTemplateData(r) internalErrors, err := s.db.GetBlockInternalDataErrorsEthereumType() @@ -178,5 +181,6 @@ func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Reque return errorTpl, nil, err } data.InternalDataErrors = internalErrors + data.RefetchingInternalData = s.api.IsRefetchingInternalData() return adminInternalErrorsTpl, data, nil } diff --git a/static/internal_templates/block_internal_data_errors.html b/static/internal_templates/block_internal_data_errors.html index 53217f5b..2301f943 100644 --- a/static/internal_templates/block_internal_data_errors.html +++ b/static/internal_templates/block_internal_data_errors.html @@ -1,8 +1,9 @@ {{define "specific"}} +