perf: leverage binary search for managing for Ids and MultiTokenValues (#910)

The change executes a migration, which for ETH mainnet takes about 20 minutes of down time.
This commit is contained in:
kevin 2023-04-13 05:22:58 -06:00 committed by GitHub
parent fabad15c10
commit 36c744b7df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 49 deletions

View File

@ -196,6 +196,17 @@ func mainWithExitCode() int {
}
internalState.UtxoChecked = true
}
// sort addressContracts if necessary
if !internalState.SortedAddressContracts {
err = index.SortAddressContracts(chanOsSignal)
if err != nil {
glog.Error("sortAddressContracts: ", err)
return exitCodeFatal
}
internalState.SortedAddressContracts = true
}
index.SetInternalState(internalState)
if *fixUtxo {
err = index.StoreInternalState(internalState)

View File

@ -80,8 +80,6 @@ type InternalState struct {
DbColumns []InternalStateColumn `json:"dbColumns"`
UtxoChecked bool `json:"utxoChecked"`
HasFiatRates bool `json:"-"`
HasTokenFiatRates bool `json:"-"`
HistoricalFiatRatesTime time.Time `json:"historicalFiatRatesTime"`
@ -91,6 +89,10 @@ type InternalState struct {
EnableSubNewTx bool `json:"-"`
BackendInfo BackendInfo `json:"-"`
// database migrations
UtxoChecked bool `json:"utxoChecked"`
SortedAddressContracts bool `json:"sortedAddressContracts"`
}
// StartedSync signals start of synchronization

View File

@ -1857,7 +1857,7 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro
data := val.Data()
var is *common.InternalState
if len(data) == 0 {
is = &common.InternalState{Coin: rpcCoin, UtxoChecked: true, ExtendedIndex: d.extendedIndex}
is = &common.InternalState{Coin: rpcCoin, UtxoChecked: true, SortedAddressContracts: true, ExtendedIndex: d.extendedIndex}
} else {
is, err = common.UnpackInternalState(data)
if err != nil {

View File

@ -4,6 +4,8 @@ import (
"bytes"
"encoding/hex"
"math/big"
"os"
"sort"
"sync"
vlq "github.com/bsm/go-vlq"
@ -17,14 +19,101 @@ import (
const InternalTxIndexOffset = 1
const ContractIndexOffset = 2
type AggregateFn = func(*big.Int, *big.Int)
type Ids []big.Int
func (s *Ids) sort() bool {
sorted := false
sort.Slice(*s, func(i, j int) bool {
isLess := (*s)[i].CmpAbs(&(*s)[j]) == -1
if isLess == (i > j) { // it is necessary to swap - (id[i]<id[j] and i>j) or (id[i]>id[j] and i<j)
sorted = true
}
return isLess
})
return sorted
}
func (s *Ids) search(id big.Int) int {
// attempt to find id using a binary search
return sort.Search(len(*s), func(i int) bool {
return (*s)[i].CmpAbs(&id) >= 0
})
}
// insert id in ascending order
func (s *Ids) insert(id big.Int) {
i := s.search(id)
if i == len(*s) {
*s = append(*s, id)
} else {
*s = append((*s)[:i+1], (*s)[i:]...)
(*s)[i] = id
}
}
func (s *Ids) remove(id big.Int) {
i := s.search(id)
// remove id if found
if i < len(*s) && (*s)[i].CmpAbs(&id) == 0 {
*s = append((*s)[:i], (*s)[i+1:]...)
}
}
type MultiTokenValues []bchain.MultiTokenValue
func (s *MultiTokenValues) sort() bool {
sorted := false
sort.Slice(*s, func(i, j int) bool {
isLess := (*s)[i].Id.CmpAbs(&(*s)[j].Id) == -1
if isLess == (i > j) { // it is necessary to swap - (id[i]<id[j] and i>j) or (id[i]>id[j] and i<j)
sorted = true
}
return isLess
})
return sorted
}
// search for multi token value using a binary seach on id
func (s *MultiTokenValues) search(m bchain.MultiTokenValue) int {
return sort.Search(len(*s), func(i int) bool {
return (*s)[i].Id.CmpAbs(&m.Id) >= 0
})
}
func (s *MultiTokenValues) upsert(m bchain.MultiTokenValue, index int32, aggregate AggregateFn) {
i := s.search(m)
if i < len(*s) && (*s)[i].Id.CmpAbs(&m.Id) == 0 {
aggregate(&(*s)[i].Value, &m.Value)
// if transfer from, remove if the value is zero
if index < 0 && len((*s)[i].Value.Bits()) == 0 {
*s = append((*s)[:i], (*s)[i+1:]...)
}
return
}
if index >= 0 {
elem := bchain.MultiTokenValue{
Id: m.Id,
Value: *new(big.Int).Set(&m.Value),
}
if i == len(*s) {
*s = append(*s, elem)
} else {
*s = append((*s)[:i+1], (*s)[i:]...)
(*s)[i] = elem
}
}
}
// AddrContract is Contract address with number of transactions done by given address
type AddrContract struct {
Type bchain.TokenType
Contract bchain.AddressDescriptor
Txs uint
Value big.Int // single value of ERC20
Ids []big.Int // multiple ERC721 tokens
MultiTokenValues []bchain.MultiTokenValue // multiple ERC1155 tokens
Value big.Int // single value of ERC20
Ids Ids // multiple ERC721 tokens
MultiTokenValues MultiTokenValues // multiple ERC1155 tokens
}
// AddrContracts contains number of transactions and contracts for an address
@ -103,14 +192,14 @@ func unpackAddrContracts(buf []byte, addrDesc bchain.AddressDescriptor) (*AddrCo
len, ll := unpackVaruint(buf)
buf = buf[ll:]
if ttt == bchain.NonFungibleToken {
ac.Ids = make([]big.Int, len)
ac.Ids = make(Ids, len)
for i := uint(0); i < len; i++ {
b, ll := unpackBigint(buf)
buf = buf[ll:]
ac.Ids[i] = b
}
} else {
ac.MultiTokenValues = make([]bchain.MultiTokenValue, len)
ac.MultiTokenValues = make(MultiTokenValues, len)
for i := uint(0); i < len; i++ {
b, ll := unpackBigint(buf)
buf = buf[ll:]
@ -210,7 +299,7 @@ func addToAddressesMapEthereumType(addresses addressesMap, strAddrDesc string, b
}
func addToContract(c *AddrContract, contractIndex int, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool) int32 {
var aggregate func(*big.Int, *big.Int)
var aggregate AggregateFn
// index 0 is for ETH transfers, index 1 (InternalTxIndexOffset) is for internal transfers, contract indexes start with 2 (ContractIndexOffset)
if index < 0 {
index = ^int32(contractIndex + ContractIndexOffset)
@ -231,39 +320,13 @@ func addToContract(c *AddrContract, contractIndex int, index int32, contract bch
aggregate(&c.Value, &transfer.Value)
} else if transfer.Type == bchain.NonFungibleToken {
if index < 0 {
// remove token from the list
for i := range c.Ids {
if c.Ids[i].Cmp(&transfer.Value) == 0 {
c.Ids = append(c.Ids[:i], c.Ids[i+1:]...)
break
}
}
c.Ids.remove(transfer.Value)
} else {
// add token to the list
c.Ids = append(c.Ids, transfer.Value)
c.Ids.insert(transfer.Value)
}
} else { // bchain.ERC1155
for _, t := range transfer.MultiTokenValues {
for i := range c.MultiTokenValues {
// find the token in the list
if c.MultiTokenValues[i].Id.Cmp(&t.Id) == 0 {
aggregate(&c.MultiTokenValues[i].Value, &t.Value)
// if transfer from, remove if the value is zero
if index < 0 && len(c.MultiTokenValues[i].Value.Bits()) == 0 {
c.MultiTokenValues = append(c.MultiTokenValues[:i], c.MultiTokenValues[i+1:]...)
}
goto nextTransfer
}
}
// if not found and transfer to, add to the list
// it is necessary to add a copy of the value so that subsequent calls to addToContract do not change the transfer value
if index >= 0 {
c.MultiTokenValues = append(c.MultiTokenValues, bchain.MultiTokenValue{
Id: t.Id,
Value: *new(big.Int).Set(&t.Value),
})
}
nextTransfer:
c.MultiTokenValues.upsert(t, index, aggregate)
}
}
if addTxCount {
@ -1332,3 +1395,61 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32)
}
return err
}
func (d *RocksDB) SortAddressContracts(stop chan os.Signal) error {
if d.chainParser.GetChainType() != bchain.ChainEthereumType {
glog.Info("SortAddressContracts: applicable only for ethereum type coins")
return nil
}
glog.Info("SortAddressContracts: starting")
// do not use cache
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
it := d.db.NewIteratorCF(ro, d.cfh[cfAddressContracts])
defer it.Close()
var rowCount, idsSortedCount, multiTokenValuesSortedCount int
for it.SeekToFirst(); it.Valid(); it.Next() {
select {
case <-stop:
return errors.New("SortAddressContracts: interrupted")
default:
}
rowCount++
addrDesc := it.Key().Data()
buf := it.Value().Data()
if len(buf) > 0 {
ca, err := unpackAddrContracts(buf, addrDesc)
if err != nil {
glog.Error("failed to unpack AddrContracts for: ", hex.EncodeToString(addrDesc))
}
update := false
for i := range ca.Contracts {
c := &ca.Contracts[i]
if sorted := c.Ids.sort(); sorted {
idsSortedCount++
update = true
}
if sorted := c.MultiTokenValues.sort(); sorted {
multiTokenValuesSortedCount++
update = true
}
}
if update {
if err := func() error {
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
buf := packAddrContracts(ca)
wb.PutCF(d.cfh[cfAddressContracts], addrDesc, buf)
return d.WriteBatch(wb)
}(); err != nil {
return errors.Errorf("failed to write cfAddressContracts for: %v: %v", addrDesc, err)
}
}
}
if rowCount%5000000 == 0 {
glog.Infof("SortAddressContracts: progress - scanned %d rows, sorted %d ids and %d multi token values", rowCount, idsSortedCount, multiTokenValuesSortedCount)
}
}
glog.Infof("SortAddressContracts: finished - scanned %d rows, sorted %d ids and %d multi token value", rowCount, idsSortedCount, multiTokenValuesSortedCount)
return nil
}

View File

@ -765,7 +765,7 @@ func Test_packUnpackAddrContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract47, parser),
Txs: 41235,
Ids: []big.Int{
Ids: Ids{
*big.NewInt(1),
*big.NewInt(2),
*big.NewInt(3),
@ -777,7 +777,7 @@ func Test_packUnpackAddrContracts(t *testing.T) {
Type: bchain.MultiToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract4a, parser),
Txs: 64,
MultiTokenValues: []bchain.MultiTokenValue{
MultiTokenValues: MultiTokenValues{
{
Id: *big.NewInt(1),
Value: *big.NewInt(1412341234),
@ -894,7 +894,7 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 1,
Ids: []big.Int{*big.NewInt(1)},
Ids: Ids{*big.NewInt(1)},
},
},
},
@ -923,7 +923,7 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 2,
Ids: []big.Int{*big.NewInt(1), *big.NewInt(2)},
Ids: Ids{*big.NewInt(1), *big.NewInt(2)},
},
},
},
@ -952,7 +952,7 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 2,
Ids: []big.Int{*big.NewInt(2)},
Ids: Ids{*big.NewInt(2)},
},
},
},
@ -986,13 +986,13 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 2,
Ids: []big.Int{*big.NewInt(2)},
Ids: Ids{*big.NewInt(2)},
},
{
Type: bchain.MultiToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser),
Txs: 1,
MultiTokenValues: []bchain.MultiTokenValue{
MultiTokenValues: MultiTokenValues{
{
Id: *big.NewInt(11),
Value: *big.NewInt(56789),
@ -1035,13 +1035,13 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 2,
Ids: []big.Int{*big.NewInt(2)},
Ids: Ids{*big.NewInt(2)},
},
{
Type: bchain.MultiToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser),
Txs: 2,
MultiTokenValues: []bchain.MultiTokenValue{
MultiTokenValues: MultiTokenValues{
{
Id: *big.NewInt(11),
Value: *big.NewInt(56900),
@ -1088,13 +1088,13 @@ func Test_addToContracts(t *testing.T) {
Type: bchain.NonFungibleToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser),
Txs: 2,
Ids: []big.Int{*big.NewInt(2)},
Ids: Ids{*big.NewInt(2)},
},
{
Type: bchain.MultiToken,
Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser),
Txs: 3,
MultiTokenValues: []bchain.MultiTokenValue{
MultiTokenValues: MultiTokenValues{
{
Id: *big.NewInt(11),
Value: *big.NewInt(56788),