diff --git a/blockbook.go b/blockbook.go index eae21db3..288adbfa 100644 --- a/blockbook.go +++ b/blockbook.go @@ -196,6 +196,17 @@ func mainWithExitCode() int { } internalState.UtxoChecked = true } + + // sort addressContracts if necessary + if !internalState.SortedAddressContracts { + err = index.SortAddressContracts(chanOsSignal) + if err != nil { + glog.Error("sortAddressContracts: ", err) + return exitCodeFatal + } + internalState.SortedAddressContracts = true + } + index.SetInternalState(internalState) if *fixUtxo { err = index.StoreInternalState(internalState) diff --git a/common/internalstate.go b/common/internalstate.go index a8799efd..8878bf77 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -80,8 +80,6 @@ type InternalState struct { DbColumns []InternalStateColumn `json:"dbColumns"` - UtxoChecked bool `json:"utxoChecked"` - HasFiatRates bool `json:"-"` HasTokenFiatRates bool `json:"-"` HistoricalFiatRatesTime time.Time `json:"historicalFiatRatesTime"` @@ -91,6 +89,10 @@ type InternalState struct { EnableSubNewTx bool `json:"-"` BackendInfo BackendInfo `json:"-"` + + // database migrations + UtxoChecked bool `json:"utxoChecked"` + SortedAddressContracts bool `json:"sortedAddressContracts"` } // StartedSync signals start of synchronization diff --git a/db/rocksdb.go b/db/rocksdb.go index ce6e37bc..521ecd74 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -1857,7 +1857,7 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro data := val.Data() var is *common.InternalState if len(data) == 0 { - is = &common.InternalState{Coin: rpcCoin, UtxoChecked: true, ExtendedIndex: d.extendedIndex} + is = &common.InternalState{Coin: rpcCoin, UtxoChecked: true, SortedAddressContracts: true, ExtendedIndex: d.extendedIndex} } else { is, err = common.UnpackInternalState(data) if err != nil { diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index e3169e22..29bff516 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/hex" "math/big" + "os" + "sort" "sync" vlq "github.com/bsm/go-vlq" @@ -17,14 +19,101 @@ import ( const InternalTxIndexOffset = 1 const ContractIndexOffset = 2 +type AggregateFn = func(*big.Int, *big.Int) + +type Ids []big.Int + +func (s *Ids) sort() bool { + sorted := false + sort.Slice(*s, func(i, j int) bool { + isLess := (*s)[i].CmpAbs(&(*s)[j]) == -1 + if isLess == (i > j) { // it is necessary to swap - (id[i]j) or (id[i]>id[j] and i= 0 + }) +} + +// insert id in ascending order +func (s *Ids) insert(id big.Int) { + i := s.search(id) + if i == len(*s) { + *s = append(*s, id) + } else { + *s = append((*s)[:i+1], (*s)[i:]...) + (*s)[i] = id + } +} + +func (s *Ids) remove(id big.Int) { + i := s.search(id) + // remove id if found + if i < len(*s) && (*s)[i].CmpAbs(&id) == 0 { + *s = append((*s)[:i], (*s)[i+1:]...) + } +} + +type MultiTokenValues []bchain.MultiTokenValue + +func (s *MultiTokenValues) sort() bool { + sorted := false + sort.Slice(*s, func(i, j int) bool { + isLess := (*s)[i].Id.CmpAbs(&(*s)[j].Id) == -1 + if isLess == (i > j) { // it is necessary to swap - (id[i]j) or (id[i]>id[j] and i= 0 + }) +} + +func (s *MultiTokenValues) upsert(m bchain.MultiTokenValue, index int32, aggregate AggregateFn) { + i := s.search(m) + if i < len(*s) && (*s)[i].Id.CmpAbs(&m.Id) == 0 { + aggregate(&(*s)[i].Value, &m.Value) + // if transfer from, remove if the value is zero + if index < 0 && len((*s)[i].Value.Bits()) == 0 { + *s = append((*s)[:i], (*s)[i+1:]...) + } + return + } + if index >= 0 { + elem := bchain.MultiTokenValue{ + Id: m.Id, + Value: *new(big.Int).Set(&m.Value), + } + if i == len(*s) { + *s = append(*s, elem) + } else { + *s = append((*s)[:i+1], (*s)[i:]...) + (*s)[i] = elem + } + } +} + // AddrContract is Contract address with number of transactions done by given address type AddrContract struct { Type bchain.TokenType Contract bchain.AddressDescriptor Txs uint - Value big.Int // single value of ERC20 - Ids []big.Int // multiple ERC721 tokens - MultiTokenValues []bchain.MultiTokenValue // multiple ERC1155 tokens + Value big.Int // single value of ERC20 + Ids Ids // multiple ERC721 tokens + MultiTokenValues MultiTokenValues // multiple ERC1155 tokens } // AddrContracts contains number of transactions and contracts for an address @@ -103,14 +192,14 @@ func unpackAddrContracts(buf []byte, addrDesc bchain.AddressDescriptor) (*AddrCo len, ll := unpackVaruint(buf) buf = buf[ll:] if ttt == bchain.NonFungibleToken { - ac.Ids = make([]big.Int, len) + ac.Ids = make(Ids, len) for i := uint(0); i < len; i++ { b, ll := unpackBigint(buf) buf = buf[ll:] ac.Ids[i] = b } } else { - ac.MultiTokenValues = make([]bchain.MultiTokenValue, len) + ac.MultiTokenValues = make(MultiTokenValues, len) for i := uint(0); i < len; i++ { b, ll := unpackBigint(buf) buf = buf[ll:] @@ -210,7 +299,7 @@ func addToAddressesMapEthereumType(addresses addressesMap, strAddrDesc string, b } func addToContract(c *AddrContract, contractIndex int, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool) int32 { - var aggregate func(*big.Int, *big.Int) + var aggregate AggregateFn // index 0 is for ETH transfers, index 1 (InternalTxIndexOffset) is for internal transfers, contract indexes start with 2 (ContractIndexOffset) if index < 0 { index = ^int32(contractIndex + ContractIndexOffset) @@ -231,39 +320,13 @@ func addToContract(c *AddrContract, contractIndex int, index int32, contract bch aggregate(&c.Value, &transfer.Value) } else if transfer.Type == bchain.NonFungibleToken { if index < 0 { - // remove token from the list - for i := range c.Ids { - if c.Ids[i].Cmp(&transfer.Value) == 0 { - c.Ids = append(c.Ids[:i], c.Ids[i+1:]...) - break - } - } + c.Ids.remove(transfer.Value) } else { - // add token to the list - c.Ids = append(c.Ids, transfer.Value) + c.Ids.insert(transfer.Value) } } else { // bchain.ERC1155 for _, t := range transfer.MultiTokenValues { - for i := range c.MultiTokenValues { - // find the token in the list - if c.MultiTokenValues[i].Id.Cmp(&t.Id) == 0 { - aggregate(&c.MultiTokenValues[i].Value, &t.Value) - // if transfer from, remove if the value is zero - if index < 0 && len(c.MultiTokenValues[i].Value.Bits()) == 0 { - c.MultiTokenValues = append(c.MultiTokenValues[:i], c.MultiTokenValues[i+1:]...) - } - goto nextTransfer - } - } - // if not found and transfer to, add to the list - // it is necessary to add a copy of the value so that subsequent calls to addToContract do not change the transfer value - if index >= 0 { - c.MultiTokenValues = append(c.MultiTokenValues, bchain.MultiTokenValue{ - Id: t.Id, - Value: *new(big.Int).Set(&t.Value), - }) - } - nextTransfer: + c.MultiTokenValues.upsert(t, index, aggregate) } } if addTxCount { @@ -1332,3 +1395,61 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32) } return err } + +func (d *RocksDB) SortAddressContracts(stop chan os.Signal) error { + if d.chainParser.GetChainType() != bchain.ChainEthereumType { + glog.Info("SortAddressContracts: applicable only for ethereum type coins") + return nil + } + glog.Info("SortAddressContracts: starting") + // do not use cache + ro := grocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + it := d.db.NewIteratorCF(ro, d.cfh[cfAddressContracts]) + defer it.Close() + var rowCount, idsSortedCount, multiTokenValuesSortedCount int + for it.SeekToFirst(); it.Valid(); it.Next() { + select { + case <-stop: + return errors.New("SortAddressContracts: interrupted") + default: + } + rowCount++ + addrDesc := it.Key().Data() + buf := it.Value().Data() + if len(buf) > 0 { + ca, err := unpackAddrContracts(buf, addrDesc) + if err != nil { + glog.Error("failed to unpack AddrContracts for: ", hex.EncodeToString(addrDesc)) + } + update := false + for i := range ca.Contracts { + c := &ca.Contracts[i] + if sorted := c.Ids.sort(); sorted { + idsSortedCount++ + update = true + } + if sorted := c.MultiTokenValues.sort(); sorted { + multiTokenValuesSortedCount++ + update = true + } + } + if update { + if err := func() error { + wb := grocksdb.NewWriteBatch() + defer wb.Destroy() + buf := packAddrContracts(ca) + wb.PutCF(d.cfh[cfAddressContracts], addrDesc, buf) + return d.WriteBatch(wb) + }(); err != nil { + return errors.Errorf("failed to write cfAddressContracts for: %v: %v", addrDesc, err) + } + } + } + if rowCount%5000000 == 0 { + glog.Infof("SortAddressContracts: progress - scanned %d rows, sorted %d ids and %d multi token values", rowCount, idsSortedCount, multiTokenValuesSortedCount) + } + } + glog.Infof("SortAddressContracts: finished - scanned %d rows, sorted %d ids and %d multi token value", rowCount, idsSortedCount, multiTokenValuesSortedCount) + return nil +} diff --git a/db/rocksdb_ethereumtype_test.go b/db/rocksdb_ethereumtype_test.go index 8083f7eb..c08c5e3e 100644 --- a/db/rocksdb_ethereumtype_test.go +++ b/db/rocksdb_ethereumtype_test.go @@ -765,7 +765,7 @@ func Test_packUnpackAddrContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract47, parser), Txs: 41235, - Ids: []big.Int{ + Ids: Ids{ *big.NewInt(1), *big.NewInt(2), *big.NewInt(3), @@ -777,7 +777,7 @@ func Test_packUnpackAddrContracts(t *testing.T) { Type: bchain.MultiToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract4a, parser), Txs: 64, - MultiTokenValues: []bchain.MultiTokenValue{ + MultiTokenValues: MultiTokenValues{ { Id: *big.NewInt(1), Value: *big.NewInt(1412341234), @@ -894,7 +894,7 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 1, - Ids: []big.Int{*big.NewInt(1)}, + Ids: Ids{*big.NewInt(1)}, }, }, }, @@ -923,7 +923,7 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 2, - Ids: []big.Int{*big.NewInt(1), *big.NewInt(2)}, + Ids: Ids{*big.NewInt(1), *big.NewInt(2)}, }, }, }, @@ -952,7 +952,7 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 2, - Ids: []big.Int{*big.NewInt(2)}, + Ids: Ids{*big.NewInt(2)}, }, }, }, @@ -986,13 +986,13 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 2, - Ids: []big.Int{*big.NewInt(2)}, + Ids: Ids{*big.NewInt(2)}, }, { Type: bchain.MultiToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser), Txs: 1, - MultiTokenValues: []bchain.MultiTokenValue{ + MultiTokenValues: MultiTokenValues{ { Id: *big.NewInt(11), Value: *big.NewInt(56789), @@ -1035,13 +1035,13 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 2, - Ids: []big.Int{*big.NewInt(2)}, + Ids: Ids{*big.NewInt(2)}, }, { Type: bchain.MultiToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser), Txs: 2, - MultiTokenValues: []bchain.MultiTokenValue{ + MultiTokenValues: MultiTokenValues{ { Id: *big.NewInt(11), Value: *big.NewInt(56900), @@ -1088,13 +1088,13 @@ func Test_addToContracts(t *testing.T) { Type: bchain.NonFungibleToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContract6f, parser), Txs: 2, - Ids: []big.Int{*big.NewInt(2)}, + Ids: Ids{*big.NewInt(2)}, }, { Type: bchain.MultiToken, Contract: addressToAddrDesc(dbtestdata.EthAddrContractCd, parser), Txs: 3, - MultiTokenValues: []bchain.MultiTokenValue{ + MultiTokenValues: MultiTokenValues{ { Id: *big.NewInt(11), Value: *big.NewInt(56788),