package db import ( "bytes" "encoding/hex" "math/big" "sync" vlq "github.com/bsm/go-vlq" "github.com/golang/glog" "github.com/juju/errors" "github.com/linxGnu/grocksdb" "github.com/trezor/blockbook/bchain" "github.com/trezor/blockbook/bchain/coins/eth" ) const InternalTxIndexOffset = 1 const ContractIndexOffset = 2 // AddrContract is Contract address with number of transactions done by given address type AddrContract struct { Type bchain.TokenType Contract bchain.AddressDescriptor Txs uint Value big.Int // single value of ERC20 Ids []big.Int // multiple ERC721 tokens MultiTokenValues []bchain.MultiTokenValue // multiple ERC1155 tokens } // AddrContracts contains number of transactions and contracts for an address type AddrContracts struct { TotalTxs uint NonContractTxs uint InternalTxs uint Contracts []AddrContract } // packAddrContract packs AddrContracts into a byte buffer func packAddrContracts(acs *AddrContracts) []byte { buf := make([]byte, 0, 128) varBuf := make([]byte, maxPackedBigintBytes) l := packVaruint(acs.TotalTxs, varBuf) buf = append(buf, varBuf[:l]...) l = packVaruint(acs.NonContractTxs, varBuf) buf = append(buf, varBuf[:l]...) l = packVaruint(acs.InternalTxs, varBuf) buf = append(buf, varBuf[:l]...) for _, ac := range acs.Contracts { buf = append(buf, ac.Contract...) l = packVaruint(uint(ac.Type)+ac.Txs<<2, varBuf) buf = append(buf, varBuf[:l]...) if ac.Type == bchain.FungibleToken { l = packBigint(&ac.Value, varBuf) buf = append(buf, varBuf[:l]...) } else if ac.Type == bchain.NonFungibleToken { l = packVaruint(uint(len(ac.Ids)), varBuf) buf = append(buf, varBuf[:l]...) for i := range ac.Ids { l = packBigint(&ac.Ids[i], varBuf) buf = append(buf, varBuf[:l]...) } } else { // bchain.ERC1155 l = packVaruint(uint(len(ac.MultiTokenValues)), varBuf) buf = append(buf, varBuf[:l]...) for i := range ac.MultiTokenValues { l = packBigint(&ac.MultiTokenValues[i].Id, varBuf) buf = append(buf, varBuf[:l]...) l = packBigint(&ac.MultiTokenValues[i].Value, varBuf) buf = append(buf, varBuf[:l]...) } } } return buf } func unpackAddrContracts(buf []byte, addrDesc bchain.AddressDescriptor) (*AddrContracts, error) { tt, l := unpackVaruint(buf) buf = buf[l:] nct, l := unpackVaruint(buf) buf = buf[l:] ict, l := unpackVaruint(buf) buf = buf[l:] c := make([]AddrContract, 0, 4) for len(buf) > 0 { if len(buf) < eth.EthereumTypeAddressDescriptorLen { return nil, errors.New("Invalid data stored in cfAddressContracts for AddrDesc " + addrDesc.String()) } contract := append(bchain.AddressDescriptor(nil), buf[:eth.EthereumTypeAddressDescriptorLen]...) txs, l := unpackVaruint(buf[eth.EthereumTypeAddressDescriptorLen:]) buf = buf[eth.EthereumTypeAddressDescriptorLen+l:] ttt := bchain.TokenType(txs & 3) txs >>= 2 ac := AddrContract{ Type: ttt, Contract: contract, Txs: txs, } if ttt == bchain.FungibleToken { b, ll := unpackBigint(buf) buf = buf[ll:] ac.Value = b } else { len, ll := unpackVaruint(buf) buf = buf[ll:] if ttt == bchain.NonFungibleToken { ac.Ids = make([]big.Int, len) for i := uint(0); i < len; i++ { b, ll := unpackBigint(buf) buf = buf[ll:] ac.Ids[i] = b } } else { ac.MultiTokenValues = make([]bchain.MultiTokenValue, len) for i := uint(0); i < len; i++ { b, ll := unpackBigint(buf) buf = buf[ll:] ac.MultiTokenValues[i].Id = b b, ll = unpackBigint(buf) buf = buf[ll:] ac.MultiTokenValues[i].Value = b } } } c = append(c, ac) } return &AddrContracts{ TotalTxs: tt, NonContractTxs: nct, InternalTxs: ict, Contracts: c, }, nil } func (d *RocksDB) storeAddressContracts(wb *grocksdb.WriteBatch, acm map[string]*AddrContracts) error { for addrDesc, acs := range acm { // address with 0 contracts is removed from db - happens on disconnect if acs == nil || (acs.NonContractTxs == 0 && acs.InternalTxs == 0 && len(acs.Contracts) == 0) { wb.DeleteCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc)) } else { buf := packAddrContracts(acs) wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf) } } return nil } // GetAddrDescContracts returns AddrContracts for given addrDesc func (d *RocksDB) GetAddrDescContracts(addrDesc bchain.AddressDescriptor) (*AddrContracts, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc) if err != nil { return nil, err } defer val.Free() buf := val.Data() if len(buf) == 0 { return nil, nil } return unpackAddrContracts(buf, addrDesc) } func findContractInAddressContracts(contract bchain.AddressDescriptor, contracts []AddrContract) (int, bool) { for i := range contracts { if bytes.Equal(contract, contracts[i].Contract) { return i, true } } return 0, false } func isZeroAddress(addrDesc bchain.AddressDescriptor) bool { for _, b := range addrDesc { if b != 0 { return false } } return true } const transferTo = int32(0) const transferFrom = ^int32(0) const internalTransferTo = int32(1) const internalTransferFrom = ^int32(1) // addToAddressesMapEthereumType maintains mapping between addresses and transactions in one block // it ensures that each index is there only once, there can be for example multiple internal transactions of the same address // the return value is true if the tx was processed before, to not to count the tx multiple times func addToAddressesMapEthereumType(addresses addressesMap, strAddrDesc string, btxID []byte, index int32) bool { // check that the address was already processed in this block // if not found, it has certainly not been counted at, found := addresses[strAddrDesc] if found { // if the tx is already in the slice, append the index to the array of indexes for i, t := range at { if bytes.Equal(btxID, t.btxID) { for _, existing := range t.indexes { if existing == index { return true } } at[i].indexes = append(t.indexes, index) return true } } } addresses[strAddrDesc] = append(at, txIndexes{ btxID: btxID, indexes: []int32{index}, }) return false } func addToContract(c *AddrContract, contractIndex int, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool) int32 { var aggregate func(*big.Int, *big.Int) // index 0 is for ETH transfers, index 1 (InternalTxIndexOffset) is for internal transfers, contract indexes start with 2 (ContractIndexOffset) if index < 0 { index = ^int32(contractIndex + ContractIndexOffset) aggregate = func(s, v *big.Int) { s.Sub(s, v) if s.Sign() < 0 { // glog.Warningf("rocksdb: addToContracts: contract %s, from %s, negative aggregate", transfer.Contract, transfer.From) s.SetUint64(0) } } } else { index = int32(contractIndex + ContractIndexOffset) aggregate = func(s, v *big.Int) { s.Add(s, v) } } if transfer.Type == bchain.FungibleToken { aggregate(&c.Value, &transfer.Value) } else if transfer.Type == bchain.NonFungibleToken { if index < 0 { // remove token from the list for i := range c.Ids { if c.Ids[i].Cmp(&transfer.Value) == 0 { c.Ids = append(c.Ids[:i], c.Ids[i+1:]...) break } } } else { // add token to the list c.Ids = append(c.Ids, transfer.Value) } } else { // bchain.ERC1155 for _, t := range transfer.MultiTokenValues { for i := range c.MultiTokenValues { // find the token in the list if c.MultiTokenValues[i].Id.Cmp(&t.Id) == 0 { aggregate(&c.MultiTokenValues[i].Value, &t.Value) // if transfer from, remove if the value is zero if index < 0 && len(c.MultiTokenValues[i].Value.Bits()) == 0 { c.MultiTokenValues = append(c.MultiTokenValues[:i], c.MultiTokenValues[i+1:]...) } goto nextTransfer } } // if not found and transfer to, add to the list // it is necessary to add a copy of the value so that subsequent calls to addToContract do not change the transfer value if index >= 0 { c.MultiTokenValues = append(c.MultiTokenValues, bchain.MultiTokenValue{ Id: t.Id, Value: *new(big.Int).Set(&t.Value), }) } nextTransfer: } } if addTxCount { c.Txs++ } return index } func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool, addresses addressesMap, addressContracts map[string]*AddrContracts) error { var err error strAddrDesc := string(addrDesc) ac, e := addressContracts[strAddrDesc] if !e { ac, err = d.GetAddrDescContracts(addrDesc) if err != nil { return err } if ac == nil { ac = &AddrContracts{} } addressContracts[strAddrDesc] = ac d.cbs.balancesMiss++ } else { d.cbs.balancesHit++ } if contract == nil { if addTxCount { if index == internalTransferFrom || index == internalTransferTo { ac.InternalTxs++ } else { ac.NonContractTxs++ } } } else { // do not store contracts for 0x0000000000000000000000000000000000000000 address if !isZeroAddress(addrDesc) { // locate the contract and set i to the index in the array of contracts contractIndex, found := findContractInAddressContracts(contract, ac.Contracts) if !found { contractIndex = len(ac.Contracts) ac.Contracts = append(ac.Contracts, AddrContract{ Contract: contract, Type: transfer.Type, }) } c := &ac.Contracts[contractIndex] index = addToContract(c, contractIndex, index, contract, transfer, addTxCount) } else { if index < 0 { index = transferFrom } else { index = transferTo } } } counted := addToAddressesMapEthereumType(addresses, strAddrDesc, btxID, index) if !counted { ac.TotalTxs++ } return nil } type ethBlockTxContract struct { from, to, contract bchain.AddressDescriptor transferType bchain.TokenType value big.Int idValues []bchain.MultiTokenValue } type ethInternalTransfer struct { internalType bchain.EthereumInternalTransactionType from, to bchain.AddressDescriptor value big.Int } type ethInternalData struct { internalType bchain.EthereumInternalTransactionType contract bchain.AddressDescriptor transfers []ethInternalTransfer errorMsg string } type ethBlockTx struct { btxID []byte from, to bchain.AddressDescriptor contracts []ethBlockTxContract internalData *ethInternalData } func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*AddrContracts) error { var from, to bchain.AddressDescriptor var err error // there is only one output address in EthereumType transaction, store it in format txid 0 if len(tx.Vout) == 1 && len(tx.Vout[0].ScriptPubKey.Addresses) == 1 { to, err = d.chainParser.GetAddrDescFromAddress(tx.Vout[0].ScriptPubKey.Addresses[0]) if err != nil { // do not log ErrAddressMissing, transactions can be without to address (for example eth contracts) if err != bchain.ErrAddressMissing { glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, output", err, tx.Txid) } } else { if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, transferTo, nil, nil, true, addresses, addressContracts); err != nil { return err } blockTx.to = to } } // there is only one input address in EthereumType transaction, store it in format txid ^0 if len(tx.Vin) == 1 && len(tx.Vin[0].Addresses) == 1 { from, err = d.chainParser.GetAddrDescFromAddress(tx.Vin[0].Addresses[0]) if err != nil { if err != bchain.ErrAddressMissing { glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, input", err, tx.Txid) } } else { if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, transferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil { return err } blockTx.from = from } } return nil } func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*AddrContracts) error { blockTx.internalData = ðInternalData{ internalType: id.Type, errorMsg: id.Error, } // index contract creation if id.Type == bchain.CREATE { to, err := d.chainParser.GetAddrDescFromAddress(id.Contract) if err != nil { if err != bchain.ErrAddressMissing { glog.Warningf("rocksdb: processInternalData: %v, tx %v, create contract", err, tx.Txid) } // set the internalType to CALL if incorrect contract so that it is not breaking the packing of data to DB blockTx.internalData.internalType = bchain.CALL } else { blockTx.internalData.contract = to if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { return err } } } // index internal transfers if len(id.Transfers) > 0 { blockTx.internalData.transfers = make([]ethInternalTransfer, len(id.Transfers)) for i := range id.Transfers { iti := &id.Transfers[i] ito := &blockTx.internalData.transfers[i] to, err := d.chainParser.GetAddrDescFromAddress(iti.To) if err != nil { // do not log ErrAddressMissing, transactions can be without to address (for example eth contracts) if err != bchain.ErrAddressMissing { glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d to", err, tx.Txid, i) } } else { if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { return err } ito.to = to } from, err := d.chainParser.GetAddrDescFromAddress(iti.From) if err != nil { if err != bchain.ErrAddressMissing { glog.Warningf("rocksdb: processInternalData: %v, tx %v, internal transfer %d from", err, tx.Txid, i) } } else { if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil { return err } ito.from = from } ito.internalType = iti.Type ito.value = iti.Value } } return nil } func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*AddrContracts) error { tokenTransfers, err := d.chainParser.EthereumTypeGetTokenTransfersFromTx(tx) if err != nil { glog.Warningf("rocksdb: processContractTransfers %v, tx %v", err, tx.Txid) } blockTx.contracts = make([]ethBlockTxContract, len(tokenTransfers)) for i, t := range tokenTransfers { var contract, from, to bchain.AddressDescriptor contract, err = d.chainParser.GetAddrDescFromAddress(t.Contract) if err == nil { from, err = d.chainParser.GetAddrDescFromAddress(t.From) if err == nil { to, err = d.chainParser.GetAddrDescFromAddress(t.To) } } if err != nil { glog.Warningf("rocksdb: processContractTransfers %v, tx %v, transfer %v", err, tx.Txid, t) continue } if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, int32(i), contract, t, true, addresses, addressContracts); err != nil { return err } eq := bytes.Equal(from, to) if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, ^int32(i), contract, t, !eq, addresses, addressContracts); err != nil { return err } bc := &blockTx.contracts[i] bc.transferType = t.Type bc.from = from bc.to = to bc.contract = contract bc.value = t.Value bc.idValues = t.MultiTokenValues } return nil } func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses addressesMap, addressContracts map[string]*AddrContracts) ([]ethBlockTx, error) { blockTxs := make([]ethBlockTx, len(block.Txs)) for txi := range block.Txs { tx := &block.Txs[txi] btxID, err := d.chainParser.PackTxid(tx.Txid) if err != nil { return nil, err } blockTx := &blockTxs[txi] blockTx.btxID = btxID if err = d.processBaseTxData(blockTx, tx, addresses, addressContracts); err != nil { return nil, err } // process internal data eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData) if eid.InternalData != nil { if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts); err != nil { return nil, err } } // store contract transfers if err = d.processContractTransfers(blockTx, tx, addresses, addressContracts); err != nil { return nil, err } } return blockTxs, nil } var ethZeroAddress []byte = make([]byte, eth.EthereumTypeAddressDescriptorLen) func appendAddress(buf []byte, a bchain.AddressDescriptor) []byte { if len(a) != eth.EthereumTypeAddressDescriptorLen { buf = append(buf, ethZeroAddress...) } else { buf = append(buf, a...) } return buf } func packEthInternalData(data *ethInternalData) []byte { // allocate enough for type+contract+all transfers with bigint value buf := make([]byte, 0, (2*len(data.transfers)+1)*(eth.EthereumTypeAddressDescriptorLen+16)) varBuf := make([]byte, maxPackedBigintBytes) // internalType is one bit (CALL|CREATE), it is joined with count of internal transfers*2 l := packVaruint(uint(data.internalType)&1+uint(len(data.transfers))<<1, varBuf) buf = append(buf, varBuf[:l]...) if data.internalType == bchain.CREATE { buf = appendAddress(buf, data.contract) } for i := range data.transfers { t := &data.transfers[i] buf = append(buf, byte(t.internalType)) buf = appendAddress(buf, t.from) buf = appendAddress(buf, t.to) l = packBigint(&t.value, varBuf) buf = append(buf, varBuf[:l]...) } if len(data.errorMsg) > 0 { buf = append(buf, []byte(data.errorMsg)...) } return buf } func (d *RocksDB) unpackEthInternalData(buf []byte) (*bchain.EthereumInternalData, error) { id := bchain.EthereumInternalData{} v, l := unpackVaruint(buf) id.Type = bchain.EthereumInternalTransactionType(v & 1) id.Transfers = make([]bchain.EthereumInternalTransfer, v>>1) if id.Type == bchain.CREATE { addresses, _, _ := d.chainParser.GetAddressesFromAddrDesc(buf[l : l+eth.EthereumTypeAddressDescriptorLen]) l += eth.EthereumTypeAddressDescriptorLen if len(addresses) > 0 { id.Contract = addresses[0] } } var ll int for i := range id.Transfers { t := &id.Transfers[i] t.Type = bchain.EthereumInternalTransactionType(buf[l]) l++ addresses, _, _ := d.chainParser.GetAddressesFromAddrDesc(buf[l : l+eth.EthereumTypeAddressDescriptorLen]) l += eth.EthereumTypeAddressDescriptorLen if len(addresses) > 0 { t.From = addresses[0] } addresses, _, _ = d.chainParser.GetAddressesFromAddrDesc(buf[l : l+eth.EthereumTypeAddressDescriptorLen]) l += eth.EthereumTypeAddressDescriptorLen if len(addresses) > 0 { t.To = addresses[0] } t.Value, ll = unpackBigint(buf[l:]) l += ll } id.Error = eth.UnpackInternalTransactionError(buf[l:]) return &id, nil } // FourByteSignature contains 4byte signature of transaction value with parameters // and parsed parameters (that are not stored in DB) func packFourByteKey(fourBytes uint32, id uint32) []byte { key := make([]byte, 0, 8) key = append(key, packUint(fourBytes)...) key = append(key, packUint(id)...) return key } func packFourByteSignature(signature *bchain.FourByteSignature) []byte { buf := packString(signature.Name) for i := range signature.Parameters { buf = append(buf, packString(signature.Parameters[i])...) } return buf } func unpackFourByteSignature(buf []byte) (*bchain.FourByteSignature, error) { var signature bchain.FourByteSignature var l int signature.Name, l = unpackString(buf) for l < len(buf) { s, ll := unpackString(buf[l:]) signature.Parameters = append(signature.Parameters, s) l += ll } return &signature, nil } // GetFourByteSignature gets all 4byte signature of given fourBytes and id func (d *RocksDB) GetFourByteSignature(fourBytes uint32, id uint32) (*bchain.FourByteSignature, error) { key := packFourByteKey(fourBytes, id) val, err := d.db.GetCF(d.ro, d.cfh[cfFunctionSignatures], key) if err != nil { return nil, err } defer val.Free() buf := val.Data() if len(buf) == 0 { return nil, nil } return unpackFourByteSignature(buf) } var cachedByteSignatures = make(map[uint32]*[]bchain.FourByteSignature) var cachedByteSignaturesMux sync.Mutex // GetFourByteSignatures gets all 4byte signatures of given fourBytes // (there may be more than one signature starting with the same four bytes) func (d *RocksDB) GetFourByteSignatures(fourBytes uint32) (*[]bchain.FourByteSignature, error) { cachedByteSignaturesMux.Lock() signatures, found := cachedByteSignatures[fourBytes] cachedByteSignaturesMux.Unlock() if !found { retval := []bchain.FourByteSignature{} key := packUint(fourBytes) it := d.db.NewIteratorCF(d.ro, d.cfh[cfFunctionSignatures]) defer it.Close() for it.Seek(key); it.Valid(); it.Next() { current := it.Key().Data() if bytes.Compare(current[:4], key) > 0 { break } val := it.Value().Data() signature, err := unpackFourByteSignature(val) if err != nil { return nil, err } retval = append(retval, *signature) } cachedByteSignaturesMux.Lock() cachedByteSignatures[fourBytes] = &retval cachedByteSignaturesMux.Unlock() return &retval, nil } return signatures, nil } // StoreFourByteSignature stores 4byte signature in DB func (d *RocksDB) StoreFourByteSignature(wb *grocksdb.WriteBatch, fourBytes uint32, id uint32, signature *bchain.FourByteSignature) error { key := packFourByteKey(fourBytes, id) wb.PutCF(d.cfh[cfFunctionSignatures], key, packFourByteSignature(signature)) cachedByteSignaturesMux.Lock() delete(cachedByteSignatures, fourBytes) cachedByteSignaturesMux.Unlock() return nil } // GetEthereumInternalData gets transaction internal data from DB func (d *RocksDB) GetEthereumInternalData(txid string) (*bchain.EthereumInternalData, error) { btxID, err := d.chainParser.PackTxid(txid) if err != nil { return nil, err } return d.getEthereumInternalData(btxID) } func (d *RocksDB) getEthereumInternalData(btxID []byte) (*bchain.EthereumInternalData, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfInternalData], btxID) if err != nil { return nil, err } defer val.Free() buf := val.Data() if len(buf) == 0 { return nil, nil } return d.unpackEthInternalData(buf) } func (d *RocksDB) storeInternalDataEthereumType(wb *grocksdb.WriteBatch, blockTxs []ethBlockTx) error { for i := range blockTxs { blockTx := &blockTxs[i] if blockTx.internalData != nil { wb.PutCF(d.cfh[cfInternalData], blockTx.btxID, packEthInternalData(blockTx.internalData)) } } return nil } var cachedContracts = make(map[string]*bchain.ContractInfo) var cachedContractsMux sync.Mutex func packContractInfo(contractInfo *bchain.ContractInfo) []byte { buf := packString(contractInfo.Name) buf = append(buf, packString(contractInfo.Symbol)...) buf = append(buf, packString(string(contractInfo.Type))...) varBuf := make([]byte, vlq.MaxLen64) l := packVaruint(uint(contractInfo.Decimals), varBuf) buf = append(buf, varBuf[:l]...) l = packVaruint(uint(contractInfo.CreatedInBlock), varBuf) buf = append(buf, varBuf[:l]...) l = packVaruint(uint(contractInfo.DestructedInBlock), varBuf) buf = append(buf, varBuf[:l]...) return buf } func unpackContractInfo(buf []byte) (*bchain.ContractInfo, error) { var contractInfo bchain.ContractInfo var s string var l int var ui uint contractInfo.Name, l = unpackString(buf) buf = buf[l:] contractInfo.Symbol, l = unpackString(buf) buf = buf[l:] s, l = unpackString(buf) contractInfo.Type = bchain.TokenTypeName(s) buf = buf[l:] ui, l = unpackVaruint(buf) contractInfo.Decimals = int(ui) buf = buf[l:] ui, l = unpackVaruint(buf) contractInfo.CreatedInBlock = uint32(ui) buf = buf[l:] ui, l = unpackVaruint(buf) contractInfo.DestructedInBlock = uint32(ui) return &contractInfo, nil } func (d *RocksDB) GetContractInfoForAddress(address string) (*bchain.ContractInfo, error) { contract, err := d.chainParser.GetAddrDescFromAddress(address) if err != nil || contract == nil { return nil, err } return d.GetContractInfo(contract, "") } // GetContractInfo gets contract from cache or DB and possibly updates the type from typeFromContext // this is because it is hard to guess the type of the contract using API, it is easier to set it the first time its usage is detected in tx func (d *RocksDB) GetContractInfo(contract bchain.AddressDescriptor, typeFromContext bchain.TokenTypeName) (*bchain.ContractInfo, error) { cacheKey := string(contract) cachedContractsMux.Lock() contractInfo, found := cachedContracts[cacheKey] cachedContractsMux.Unlock() if !found { val, err := d.db.GetCF(d.ro, d.cfh[cfContracts], contract) if err != nil { return nil, err } defer val.Free() buf := val.Data() if len(buf) == 0 { return nil, nil } contractInfo, err = unpackContractInfo(buf) addresses, _, _ := d.chainParser.GetAddressesFromAddrDesc(contract) if len(addresses) > 0 { contractInfo.Contract = addresses[0] } // if the type is specified and stored contractInfo has unknown type, set and store it if typeFromContext != bchain.UnknownTokenType && contractInfo.Type == bchain.UnknownTokenType { contractInfo.Type = typeFromContext err = d.db.PutCF(d.wo, d.cfh[cfContracts], contract, packContractInfo(contractInfo)) } cachedContractsMux.Lock() cachedContracts[cacheKey] = contractInfo cachedContractsMux.Unlock() } return contractInfo, nil } // StoreContractInfo stores contractInfo in DB // if CreatedInBlock==0 and DestructedInBlock!=0, it is evaluated as a desctruction of a contract, the contract info is updated // in all other cases the contractInfo overwrites previously stored data in DB (however it should not really happen as contract is created only once) func (d *RocksDB) StoreContractInfo(wb *grocksdb.WriteBatch, contractInfo *bchain.ContractInfo) error { if contractInfo.Contract != "" { key, err := d.chainParser.GetAddrDescFromAddress(contractInfo.Contract) if err != nil { return err } if contractInfo.CreatedInBlock == 0 && contractInfo.DestructedInBlock != 0 { storedCI, err := d.GetContractInfo(key, "") if err != nil { return err } if storedCI == nil { return nil } storedCI.DestructedInBlock = contractInfo.DestructedInBlock contractInfo = storedCI } wb.PutCF(d.cfh[cfContracts], key, packContractInfo(contractInfo)) } return nil } func packBlockTx(buf []byte, blockTx *ethBlockTx) []byte { varBuf := make([]byte, maxPackedBigintBytes) buf = append(buf, blockTx.btxID...) buf = appendAddress(buf, blockTx.from) buf = appendAddress(buf, blockTx.to) // internal data are not stored in blockTx, they are fetched on disconnect directly from the cfInternalData column // contracts - store the number of address pairs l := packVaruint(uint(len(blockTx.contracts)), varBuf) buf = append(buf, varBuf[:l]...) for j := range blockTx.contracts { c := &blockTx.contracts[j] buf = appendAddress(buf, c.from) buf = appendAddress(buf, c.to) buf = appendAddress(buf, c.contract) l = packVaruint(uint(c.transferType), varBuf) buf = append(buf, varBuf[:l]...) if c.transferType == bchain.MultiToken { l = packVaruint(uint(len(c.idValues)), varBuf) buf = append(buf, varBuf[:l]...) for i := range c.idValues { l = packBigint(&c.idValues[i].Id, varBuf) buf = append(buf, varBuf[:l]...) l = packBigint(&c.idValues[i].Value, varBuf) buf = append(buf, varBuf[:l]...) } } else { // ERC20, ERC721 l = packBigint(&c.value, varBuf) buf = append(buf, varBuf[:l]...) } } return buf } func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, blockTxs []ethBlockTx) error { pl := d.chainParser.PackedTxidLen() buf := make([]byte, 0, (pl+2*eth.EthereumTypeAddressDescriptorLen)*len(blockTxs)) for i := range blockTxs { buf = packBlockTx(buf, &blockTxs[i]) } key := packUint(block.Height) wb.PutCF(d.cfh[cfBlockTxs], key, buf) return d.cleanupBlockTxs(wb, block) } func (d *RocksDB) storeBlockInternalDataErrorEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block, message string) error { key := packUint(block.Height) txid, err := d.chainParser.PackTxid(block.Hash) if err != nil { return err } m := []byte(message) buf := make([]byte, 0, len(txid)+len(m)+1) // the stored structure is txid+retry count (1 byte)+error message buf = append(buf, txid...) buf = append(buf, 0) buf = append(buf, m...) wb.PutCF(d.cfh[cfBlockInternalDataErrors], key, buf) return nil } func (d *RocksDB) storeBlockSpecificDataEthereumType(wb *grocksdb.WriteBatch, block *bchain.Block) error { blockSpecificData, _ := block.CoinSpecificData.(*bchain.EthereumBlockSpecificData) if blockSpecificData != nil { if blockSpecificData.InternalDataError != "" { glog.Info("storeBlockSpecificDataEthereumType ", block.Height, ": ", blockSpecificData.InternalDataError) if err := d.storeBlockInternalDataErrorEthereumType(wb, block, blockSpecificData.InternalDataError); err != nil { return err } } if len(blockSpecificData.AddressAliasRecords) > 0 { if err := d.storeAddressAliasRecords(wb, blockSpecificData.AddressAliasRecords); err != nil { return err } } for i := range blockSpecificData.Contracts { if err := d.StoreContractInfo(wb, &blockSpecificData.Contracts[i]); err != nil { return err } } } return nil } // unpackBlockTx unpacks ethBlockTx from buf, starting at position pos // the position is updated as the data is unpacked and returned to the caller func unpackBlockTx(buf []byte, pos int) (*ethBlockTx, int, error) { getAddress := func(i int) (bchain.AddressDescriptor, int, error) { if len(buf)-i < eth.EthereumTypeAddressDescriptorLen { glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf)) return nil, 0, errors.New("Inconsistent data in blockTxs") } a := append(bchain.AddressDescriptor(nil), buf[i:i+eth.EthereumTypeAddressDescriptorLen]...) return a, i + eth.EthereumTypeAddressDescriptorLen, nil } var from, to bchain.AddressDescriptor var err error if len(buf)-pos < eth.EthereumTypeTxidLen { glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf)) return nil, 0, errors.New("Inconsistent data in blockTxs") } txid := append([]byte(nil), buf[pos:pos+eth.EthereumTypeTxidLen]...) pos += eth.EthereumTypeTxidLen from, pos, err = getAddress(pos) if err != nil { return nil, 0, err } to, pos, err = getAddress(pos) if err != nil { return nil, 0, err } // contracts cc, l := unpackVaruint(buf[pos:]) pos += l contracts := make([]ethBlockTxContract, cc) for j := range contracts { c := &contracts[j] c.from, pos, err = getAddress(pos) if err != nil { return nil, 0, err } c.to, pos, err = getAddress(pos) if err != nil { return nil, 0, err } c.contract, pos, err = getAddress(pos) if err != nil { return nil, 0, err } cc, l = unpackVaruint(buf[pos:]) c.transferType = bchain.TokenType(cc) pos += l if c.transferType == bchain.MultiToken { cc, l = unpackVaruint(buf[pos:]) pos += l c.idValues = make([]bchain.MultiTokenValue, cc) for i := range c.idValues { c.idValues[i].Id, l = unpackBigint(buf[pos:]) pos += l c.idValues[i].Value, l = unpackBigint(buf[pos:]) pos += l } } else { // ERC20, ERC721 c.value, l = unpackBigint(buf[pos:]) pos += l } } return ðBlockTx{ btxID: txid, from: from, to: to, contracts: contracts, }, pos, nil } func (d *RocksDB) getBlockTxsEthereumType(height uint32) ([]ethBlockTx, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], packUint(height)) if err != nil { return nil, err } defer val.Free() buf := val.Data() // nil data means the key was not found in DB if buf == nil { return nil, nil } // buf can be empty slice, this means the block did not contain any transactions bt := make([]ethBlockTx, 0, 16) var btx *ethBlockTx for i := 0; i < len(buf); { btx, i, err = unpackBlockTx(buf, i) if err != nil { return nil, err } bt = append(bt, *btx) } return bt, nil } func (d *RocksDB) disconnectAddress(btxID []byte, internal bool, addrDesc bchain.AddressDescriptor, btxContract *ethBlockTxContract, addresses map[string]map[string]struct{}, contracts map[string]*AddrContracts) error { var err error // do not process empty address if len(addrDesc) == 0 { return nil } s := string(addrDesc) txid := string(btxID) // find if tx for this address was already encountered mtx, ftx := addresses[s] if !ftx { mtx = make(map[string]struct{}) mtx[txid] = struct{}{} addresses[s] = mtx } else { _, ftx = mtx[txid] if !ftx { mtx[txid] = struct{}{} } } addrContracts, fc := contracts[s] if !fc { addrContracts, err = d.GetAddrDescContracts(addrDesc) if err != nil { return err } if addrContracts != nil { contracts[s] = addrContracts } } if addrContracts != nil { if !ftx { addrContracts.TotalTxs-- } if btxContract == nil { if internal { if addrContracts.InternalTxs > 0 { addrContracts.InternalTxs-- } else { glog.Warning("AddressContracts ", addrDesc, ", InternalTxs would be negative, tx ", hex.EncodeToString(btxID)) } } else { if addrContracts.NonContractTxs > 0 { addrContracts.NonContractTxs-- } else { glog.Warning("AddressContracts ", addrDesc, ", EthTxs would be negative, tx ", hex.EncodeToString(btxID)) } } } else { contractIndex, found := findContractInAddressContracts(btxContract.contract, addrContracts.Contracts) if found { addrContract := &addrContracts.Contracts[contractIndex] if addrContract.Txs > 0 { addrContract.Txs-- if addrContract.Txs == 0 { // no transactions, remove the contract addrContracts.Contracts = append(addrContracts.Contracts[:contractIndex], addrContracts.Contracts[contractIndex+1:]...) } else { // update the values of the contract, reverse the direction var index int32 if bytes.Equal(addrDesc, btxContract.to) { index = transferFrom } else { index = transferTo } addToContract(addrContract, contractIndex, index, btxContract.contract, &bchain.TokenTransfer{ Type: btxContract.transferType, Value: btxContract.value, MultiTokenValues: btxContract.idValues, }, false) } } else { glog.Warning("AddressContracts ", addrDesc, ", contract ", contractIndex, " Txs would be negative, tx ", hex.EncodeToString(btxID)) } } else { if !isZeroAddress(addrDesc) { glog.Warning("AddressContracts ", addrDesc, ", contract ", btxContract.contract, " not found, tx ", hex.EncodeToString(btxID)) } } } } else { if !isZeroAddress(addrDesc) { glog.Warning("AddressContracts ", addrDesc, " not found, tx ", hex.EncodeToString(btxID)) } } return nil } func (d *RocksDB) disconnectInternalData(btxID []byte, addresses map[string]map[string]struct{}, contracts map[string]*AddrContracts) error { internalData, err := d.getEthereumInternalData(btxID) if err != nil { return err } if internalData != nil { if internalData.Type == bchain.CREATE { contract, err := d.chainParser.GetAddrDescFromAddress(internalData.Contract) if err != nil { return err } if err := d.disconnectAddress(btxID, true, contract, nil, addresses, contracts); err != nil { return err } } for j := range internalData.Transfers { t := &internalData.Transfers[j] var from, to bchain.AddressDescriptor from, err = d.chainParser.GetAddrDescFromAddress(t.From) if err == nil { to, err = d.chainParser.GetAddrDescFromAddress(t.To) } if err != nil { return err } if err := d.disconnectAddress(btxID, true, from, nil, addresses, contracts); err != nil { return err } // if from==to, tx is counted only once and does not have to be disconnected again if !bytes.Equal(from, to) { if err := d.disconnectAddress(btxID, true, to, nil, addresses, contracts); err != nil { return err } } } } return nil } func (d *RocksDB) disconnectBlockTxsEthereumType(wb *grocksdb.WriteBatch, height uint32, blockTxs []ethBlockTx, contracts map[string]*AddrContracts) error { glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions") addresses := make(map[string]map[string]struct{}) for i := range blockTxs { blockTx := &blockTxs[i] if err := d.disconnectAddress(blockTx.btxID, false, blockTx.from, nil, addresses, contracts); err != nil { return err } // if from==to, tx is counted only once and does not have to be disconnected again if !bytes.Equal(blockTx.from, blockTx.to) { if err := d.disconnectAddress(blockTx.btxID, false, blockTx.to, nil, addresses, contracts); err != nil { return err } } // internal data err := d.disconnectInternalData(blockTx.btxID, addresses, contracts) if err != nil { return err } // contracts for j := range blockTx.contracts { c := &blockTx.contracts[j] if err := d.disconnectAddress(blockTx.btxID, false, c.from, c, addresses, contracts); err != nil { return err } if !bytes.Equal(c.from, c.to) { if err := d.disconnectAddress(blockTx.btxID, false, c.to, c, addresses, contracts); err != nil { return err } } } wb.DeleteCF(d.cfh[cfTransactions], blockTx.btxID) wb.DeleteCF(d.cfh[cfInternalData], blockTx.btxID) } for a := range addresses { key := packAddressKey([]byte(a), height) wb.DeleteCF(d.cfh[cfAddresses], key) } return nil } // DisconnectBlockRangeEthereumType removes all data belonging to blocks in range lower-higher // it is able to disconnect only blocks for which there are data in the blockTxs column func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32) error { blocks := make([][]ethBlockTx, higher-lower+1) for height := lower; height <= higher; height++ { blockTxs, err := d.getBlockTxsEthereumType(height) if err != nil { return err } // nil blockTxs means blockTxs were not found in db if blockTxs == nil { return errors.Errorf("Cannot disconnect blocks with height %v and lower. It is necessary to rebuild index.", height) } blocks[height-lower] = blockTxs } wb := grocksdb.NewWriteBatch() defer wb.Destroy() contracts := make(map[string]*AddrContracts) for height := higher; height >= lower; height-- { if err := d.disconnectBlockTxsEthereumType(wb, height, blocks[height-lower], contracts); err != nil { return err } key := packUint(height) wb.DeleteCF(d.cfh[cfBlockTxs], key) wb.DeleteCF(d.cfh[cfHeight], key) wb.DeleteCF(d.cfh[cfBlockInternalDataErrors], key) } d.storeAddressContracts(wb, contracts) err := d.WriteBatch(wb) if err == nil { d.is.RemoveLastBlockTimes(int(higher-lower) + 1) glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) } return err }