Refetch failed ethereum internal data

This commit is contained in:
Martin Boehm 2023-03-29 23:53:35 +02:00
parent 7b068e085f
commit fabad15c10
7 changed files with 313 additions and 16 deletions

92
api/ethereumtype.go Normal file
View File

@ -0,0 +1,92 @@
package api
import (
"sync"
"github.com/golang/glog"
"github.com/linxGnu/grocksdb"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/db"
)
// refetch internal data
var refetchingInternalData = false
var refetchInternalDataMux sync.Mutex
func (w *Worker) IsRefetchingInternalData() bool {
refetchInternalDataMux.Lock()
defer refetchInternalDataMux.Unlock()
return refetchingInternalData
}
func (w *Worker) RefetchInternalData() error {
refetchInternalDataMux.Lock()
defer refetchInternalDataMux.Unlock()
if !refetchingInternalData {
refetchingInternalData = true
go w.RefetchInternalDataRoutine()
}
return nil
}
const maxNumberOfRetires = 25
func (w *Worker) incrementRefetchInternalDataRetryCount(ie *db.BlockInternalDataError) {
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
err := w.db.StoreBlockInternalDataErrorEthereumType(wb, &bchain.Block{
BlockHeader: bchain.BlockHeader{
Hash: ie.Hash,
Height: ie.Height,
},
}, ie.ErrorMessage, ie.Retries+1)
if err != nil {
glog.Errorf("StoreBlockInternalDataErrorEthereumType %d %s, error %v", ie.Height, ie.Hash, err)
} else {
w.db.WriteBatch(wb)
}
}
func (w *Worker) RefetchInternalDataRoutine() {
internalErrors, err := w.db.GetBlockInternalDataErrorsEthereumType()
if err == nil {
for i := range internalErrors {
ie := &internalErrors[i]
if ie.Retries >= maxNumberOfRetires {
glog.Infof("Refetching internal data for %d %s, retries exceeded", ie.Height, ie.Hash)
continue
}
glog.Infof("Refetching internal data for %d %s, retries %d", ie.Height, ie.Hash, ie.Retries)
block, err := w.chain.GetBlock(ie.Hash, ie.Height)
var blockSpecificData *bchain.EthereumBlockSpecificData
if block != nil {
blockSpecificData, _ = block.CoinSpecificData.(*bchain.EthereumBlockSpecificData)
}
if err != nil || block == nil || blockSpecificData == nil || blockSpecificData.InternalDataError != "" {
glog.Errorf("Refetching internal data for %d %s, error %v, retrying", ie.Height, ie.Hash, err)
// try for second time to fetch the data - the 2nd attempt after the first unsuccessful has many times higher probability of success
// probably something to do with data preloaded to cache on the backend
block, err = w.chain.GetBlock(ie.Hash, ie.Height)
if err != nil || block == nil {
glog.Errorf("Refetching internal data for %d %s, error %v", ie.Height, ie.Hash, err)
continue
}
}
blockSpecificData, _ = block.CoinSpecificData.(*bchain.EthereumBlockSpecificData)
if blockSpecificData != nil && blockSpecificData.InternalDataError != "" {
glog.Errorf("Refetching internal data for %d %s, internal data error %v", ie.Height, ie.Hash, blockSpecificData.InternalDataError)
w.incrementRefetchInternalDataRetryCount(ie)
} else {
err = w.db.ReconnectInternalDataToBlockEthereumType(block)
if err != nil {
glog.Errorf("ReconnectInternalDataToBlockEthereumType %d %s, error %v", ie.Height, ie.Hash, err)
} else {
glog.Infof("Refetching internal data for %d %s, success", ie.Height, ie.Hash)
}
}
}
}
refetchInternalDataMux.Lock()
refetchingInternalData = false
refetchInternalDataMux.Unlock()
}

View File

@ -38,5 +38,5 @@ prepare-sources:
mkdir -p $(BLOCKBOOK_BASE)
cp -r /src $(BLOCKBOOK_SRC)
cd $(BLOCKBOOK_SRC) && go mod download
sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 50 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ethereum/go-ethereum*/rpc/websocket.go
sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 50 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ava-labs/coreth*/rpc/websocket.go
sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 80 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ethereum/go-ethereum*/rpc/websocket.go
sed -i 's/wsMessageSizeLimit\ =\ 15\ \*\ 1024\ \*\ 1024/wsMessageSizeLimit = 80 * 1024 * 1024/g' $(GOPATH)/pkg/mod/github.com/ava-labs/coreth*/rpc/websocket.go

View File

@ -775,6 +775,24 @@ func addToAddressesMap(addresses addressesMap, strAddrDesc string, btxID []byte,
return false
}
func (d *RocksDB) getTxIndexesForAddressAndBlock(addrDesc bchain.AddressDescriptor, height uint32) ([]txIndexes, error) {
key := packAddressKey(addrDesc, height)
val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], key)
if err != nil {
return nil, err
}
defer val.Free()
// nil data means the key was not found in DB
if val.Data() == nil {
return nil, nil
}
rv, err := d.unpackTxIndexes(val.Data())
if err != nil {
return nil, err
}
return rv, nil
}
func (d *RocksDB) storeAddresses(wb *grocksdb.WriteBatch, height uint32, addresses addressesMap) error {
for addrDesc, txi := range addresses {
ba := bchain.AddressDescriptor(addrDesc)
@ -1208,6 +1226,34 @@ func (d *RocksDB) packTxIndexes(txi []txIndexes) []byte {
return buf
}
func (d *RocksDB) unpackTxIndexes(buf []byte) ([]txIndexes, error) {
var retval []txIndexes
txidUnpackedLen := d.chainParser.PackedTxidLen()
for len(buf) > txidUnpackedLen {
btxID := make([]byte, txidUnpackedLen)
copy(btxID, buf[:txidUnpackedLen])
indexes := make([]int32, 0, 16)
buf = buf[txidUnpackedLen:]
for {
index, l := unpackVarint32(buf)
indexes = append(indexes, index>>1)
buf = buf[l:]
if index&1 == 1 {
break
}
}
retval = append(retval, txIndexes{
btxID: btxID,
indexes: indexes,
})
}
// reverse the return values, packTxIndexes is storing it in reverse
for i, j := 0, len(retval)-1; i < j; i, j = i+1, j-1 {
retval[i], retval[j] = retval[j], retval[i]
}
return retval, nil
}
func (d *RocksDB) packOutpoints(outpoints []outpoint) []byte {
buf := make([]byte, 0, 32)
bvout := make([]byte, vlq.MaxLen32)

View File

@ -388,7 +388,23 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse
return nil
}
func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*AddrContracts) error {
func (d *RocksDB) setAddressTxIndexesToAddressMap(addrDesc bchain.AddressDescriptor, height uint32, addresses addressesMap) error {
strAddrDesc := string(addrDesc)
_, found := addresses[strAddrDesc]
if !found {
txIndexes, err := d.getTxIndexesForAddressAndBlock(addrDesc, height)
if err != nil {
return err
}
if len(txIndexes) > 0 {
addresses[strAddrDesc] = txIndexes
}
}
return nil
}
// existingBlock signals that internal data are reconnected to already indexed block after they failed during standard sync
func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*AddrContracts, existingBlock bool) error {
blockTx.internalData = &ethInternalData{
internalType: id.Type,
errorMsg: id.Error,
@ -404,6 +420,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
blockTx.internalData.internalType = bchain.CALL
} else {
blockTx.internalData.contract = to
if existingBlock {
if err = d.setAddressTxIndexesToAddressMap(to, tx.BlockHeight, addresses); err != nil {
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil {
return err
}
@ -422,6 +443,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d to", err, tx.Txid, i)
}
} else {
if existingBlock {
if err = d.setAddressTxIndexesToAddressMap(to, tx.BlockHeight, addresses); err != nil {
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil {
return err
}
@ -433,6 +459,11 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d from", err, tx.Txid, i)
}
} else {
if existingBlock {
if err = d.setAddressTxIndexesToAddressMap(from, tx.BlockHeight, addresses); err != nil {
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil {
return err
}
@ -498,7 +529,7 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad
// process internal data
eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if eid.InternalData != nil {
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts); err != nil {
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false); err != nil {
return nil, err
}
}
@ -510,6 +541,53 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad
return blockTxs, nil
}
// ReconnectInternalDataToBlockEthereumType adds missing internal data to the block and stores them in db
func (d *RocksDB) ReconnectInternalDataToBlockEthereumType(block *bchain.Block) error {
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
if d.chainParser.GetChainType() != bchain.ChainEthereumType {
return errors.New("Unsupported chain type")
}
addresses := make(addressesMap)
addressContracts := make(map[string]*AddrContracts)
// process internal data
blockTxs := make([]ethBlockTx, len(block.Txs))
for txi := range block.Txs {
tx := &block.Txs[txi]
eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if eid.InternalData != nil {
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return err
}
blockTx := &blockTxs[txi]
blockTx.btxID = btxID
tx.BlockHeight = block.Height
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true); err != nil {
return err
}
}
}
if err := d.storeAddressContracts(wb, addressContracts); err != nil {
return err
}
if err := d.storeInternalDataEthereumType(wb, blockTxs); err != nil {
return err
}
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
return err
}
// remove the block from the internal errors table
wb.DeleteCF(d.cfh[cfBlockInternalDataErrors], packUint(block.Height))
if err := d.WriteBatch(wb); err != nil {
return err
}
return nil
}
var ethZeroAddress []byte = make([]byte, eth.EthereumTypeAddressDescriptorLen)
func appendAddress(buf []byte, a bchain.AddressDescriptor) []byte {
@ -867,7 +945,7 @@ func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *grocksdb.WriteBatch, b
return d.cleanupBlockTxs(wb, block)
}
func (d *RocksDB) storeBlockInternalDataErrorEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, message string, retryCount uint8) error {
func (d *RocksDB) StoreBlockInternalDataErrorEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, message string, retryCount uint8) error {
key := packUint(block.Height)
// TODO: this supposes that Txid and block hash are the same size
txid, err := d.chainParser.PackTxid(block.Hash)
@ -937,7 +1015,7 @@ func (d *RocksDB) storeBlockSpecificDataEthereumType(wb *grocksdb.WriteBatch, bl
if blockSpecificData != nil {
if blockSpecificData.InternalDataError != "" {
glog.Info("storeBlockSpecificDataEthereumType ", block.Height, ": ", blockSpecificData.InternalDataError)
if err := d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError, 0); err != nil {
if err := d.StoreBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError, 0); err != nil {
return err
}
}

View File

@ -1596,3 +1596,79 @@ func Test_packUnpackString(t *testing.T) {
})
}
}
func TestRocksDB_packTxIndexes_unpackTxIndexes(t *testing.T) {
type args struct {
txi []txIndexes
}
tests := []struct {
name string
data []txIndexes
hex string
}{
{
name: "1",
data: []txIndexes{
{
btxID: hexToBytes(dbtestdata.TxidB1T1),
indexes: []int32{1},
},
},
hex: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384006",
},
{
name: "2",
data: []txIndexes{
{
btxID: hexToBytes(dbtestdata.TxidB1T1),
indexes: []int32{-2, 1, 3, 1234, -53241},
},
{
btxID: hexToBytes(dbtestdata.TxidB1T2),
indexes: []int32{-2, -1, 0, 1, 2, 3},
},
},
hex: "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7507030004080e00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384007040ca6488cff61",
},
{
name: "3",
data: []txIndexes{
{
btxID: hexToBytes(dbtestdata.TxidB2T1),
indexes: []int32{-2, 1, 3},
},
{
btxID: hexToBytes(dbtestdata.TxidB1T1),
indexes: []int32{-2, -1, 0, 1, 2, 3},
},
{
btxID: hexToBytes(dbtestdata.TxidB1T2),
indexes: []int32{-2},
},
},
hex: "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac750500b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa384007030004080e7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d2507040e",
},
}
d := &RocksDB{
chainParser: &testBitcoinParser{
BitcoinParser: bitcoinTestnetParser(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := d.packTxIndexes(tt.data)
hex := hex.EncodeToString(b)
if !reflect.DeepEqual(hex, tt.hex) {
t.Errorf("packTxIndexes() = %v, want %v", hex, tt.hex)
}
got, err := d.unpackTxIndexes(b)
if err != nil {
t.Errorf("unpackTxIndexes() error = %v", err)
return
}
if !reflect.DeepEqual(got, tt.data) {
t.Errorf("unpackTxIndexes() = %+v, want %+v", got, tt.data)
}
})
}
}

View File

@ -120,13 +120,13 @@ const (
// InternalTemplateData is used to transfer data to the templates
type InternalTemplateData struct {
CoinName string
CoinShortcut string
CoinLabel string
ChainType bchain.ChainType
Error *api.APIError
InternalDataErrors []db.BlockInternalDataError
InInternalDataRetry bool
CoinName string
CoinShortcut string
CoinLabel string
ChainType bchain.ChainType
Error *api.APIError
InternalDataErrors []db.BlockInternalDataError
RefetchingInternalData bool
}
func (s *InternalServer) newTemplateData(r *http.Request) *InternalTemplateData {
@ -170,7 +170,10 @@ func (s *InternalServer) adminIndex(w http.ResponseWriter, r *http.Request) (tpl
func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Request) (tpl, *InternalTemplateData, error) {
if r.Method == http.MethodPost {
err := s.api.RefetchInternalData()
if err != nil {
return errorTpl, nil, err
}
}
data := s.newTemplateData(r)
internalErrors, err := s.db.GetBlockInternalDataErrorsEthereumType()
@ -178,5 +181,6 @@ func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Reque
return errorTpl, nil, err
}
data.InternalDataErrors = internalErrors
data.RefetchingInternalData = s.api.IsRefetchingInternalData()
return adminInternalErrorsTpl, data, nil
}

View File

@ -1,8 +1,9 @@
{{define "specific"}}
<h3>Blocks with errors from fetching internal data</h3>
<div class="row g-0">
<h3 class="col-md-11">Blocks with errors from fetching internal data</h3>
<div class="col-md-11">Count: {{len .InternalDataErrors}}</div>
<div class="col-md-1 justify-content-right">
{{if .InInternalDataRetry}}Fetching...{{else}}
{{if .RefetchingInternalData}}Fetching...{{else}}
<form method="POST" action="/admin/internal-data-errors">
<button type="submit" class="btn btn-outline-secondary">Retry fetch</button>
</form>