From 9e98a4eb393479d66725c6f4d762179250adce93 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 12 May 2019 02:19:51 +0200 Subject: [PATCH] Improve performance of utxo indexing --- api/worker.go | 4 +- api/xpub.go | 2 +- db/rocksdb.go | 154 +++++++++++++++++++++++++++++++-------------- db/rocksdb_test.go | 4 +- 4 files changed, 113 insertions(+), 51 deletions(-) diff --git a/api/worker.go b/api/worker.go index bc34b8cd..781524c1 100644 --- a/api/worker.go +++ b/api/worker.go @@ -685,7 +685,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco nonce = strconv.Itoa(int(n)) } else { // ba can be nil if the address is only in mempool! - ba, err = w.db.GetAddrDescBalance(addrDesc) + ba, err = w.db.GetAddrDescBalance(addrDesc, db.AddressBalanceDetailNoUTXO) if err != nil { return nil, NewAPIError(fmt.Sprintf("Address not found, %v", err), true) } @@ -839,7 +839,7 @@ func (w *Worker) getAddrDescUtxo(addrDesc bchain.AddressDescriptor, ba *db.AddrB if !onlyMempool { // get utxo from index if ba == nil { - ba, err = w.db.GetAddrDescBalance(addrDesc) + ba, err = w.db.GetAddrDescBalance(addrDesc, db.AddressBalanceDetailUTXO) if err != nil { return nil, NewAPIError(fmt.Sprintf("Address not found, %v", err), true) } diff --git a/api/xpub.go b/api/xpub.go index 80f41085..7015005a 100644 --- a/api/xpub.go +++ b/api/xpub.go @@ -160,7 +160,7 @@ func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, m func (w *Worker) xpubDerivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, error) { var err error - if ad.balance, err = w.db.GetAddrDescBalance(ad.addrDesc); err != nil { + if ad.balance, err = w.db.GetAddrDescBalance(ad.addrDesc, db.AddressBalanceDetailUTXO); err != nil { return false, err } if ad.balance != nil { diff --git a/db/rocksdb.go b/db/rocksdb.go index d3b216df..4f5fb531 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -44,6 +44,18 @@ type connectBlockStats struct { balancesMiss int } +// AddressBalanceDetail specifies what data are returned by GetAddressBalance +type AddressBalanceDetail int + +const ( + // AddressBalanceDetailNoUTXO returns address balance without utxos + AddressBalanceDetailNoUTXO = 0 + // AddressBalanceDetailUTXO returns address balance with utxos + AddressBalanceDetailUTXO = 1 + // addressBalanceDetailUTXOIndexed returns address balance with utxos and index for updates, used only internally + addressBalanceDetailUTXOIndexed = 2 +) + // RocksDB handle type RocksDB struct { path string @@ -401,6 +413,7 @@ type AddrBalance struct { SentSat big.Int BalanceSat big.Int Utxos []Utxo + utxosMap map[string]int } // ReceivedSat computes received amount from total balance and sent amount @@ -410,6 +423,61 @@ func (ab *AddrBalance) ReceivedSat() *big.Int { return &r } +// addUtxo +func (ab *AddrBalance) addUtxo(u *Utxo) { + ab.Utxos = append(ab.Utxos, *u) + l := len(ab.Utxos) + if l >= 16 { + if len(ab.utxosMap) == 0 { + ab.utxosMap = make(map[string]int, 32) + for i := 0; i < l; i++ { + s := string(ab.Utxos[i].BtxID) + if _, e := ab.utxosMap[s]; !e { + ab.utxosMap[s] = i + } + } + } else { + s := string(u.BtxID) + if _, e := ab.utxosMap[s]; !e { + ab.utxosMap[s] = l - 1 + } + } + } +} + +// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent +// for small number of utxos the linear search is done, for larger number there is a hashmap index +// it is much faster than removing the utxo from the slice as it would cause in memory copy operations +func (ab *AddrBalance) markUtxoAsSpent(btxID []byte, vout int32) { + if len(ab.utxosMap) == 0 { + for i := range ab.Utxos { + utxo := &ab.Utxos[i] + if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) { + // mark utxo as spent by setting vout=-1 + utxo.Vout = -1 + return + } + } + } else { + if i, e := ab.utxosMap[string(btxID)]; e { + l := len(ab.Utxos) + for ; i < l; i++ { + utxo := &ab.Utxos[i] + if utxo.Vout == vout { + if bytes.Equal(utxo.BtxID, btxID) { + // mark utxo as spent by setting vout=-1 + utxo.Vout = -1 + return + } else { + break + } + } + } + } + } + glog.Errorf("Utxo %s:%d not found, using in map %v", hex.EncodeToString(btxID), vout, len(ab.utxosMap) != 0) +} + type blockTxs struct { btxID []byte inputs []outpoint @@ -467,7 +535,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add strAddrDesc := string(addrDesc) balance, e := balances[strAddrDesc] if !e { - balance, err = d.GetAddrDescBalance(addrDesc) + balance, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed) if err != nil { return err } @@ -480,7 +548,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add d.cbs.balancesHit++ } balance.BalanceSat.Add(&balance.BalanceSat, &output.ValueSat) - balance.Utxos = append(balance.Utxos, Utxo{ + balance.addUtxo(&Utxo{ BtxID: btxID, Vout: int32(i), Height: block.Height, @@ -550,7 +618,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add strAddrDesc := string(spentOutput.AddrDesc) balance, e := balances[strAddrDesc] if !e { - balance, err = d.GetAddrDescBalance(spentOutput.AddrDesc) + balance, err = d.GetAddrDescBalance(spentOutput.AddrDesc, addressBalanceDetailUTXOIndexed) if err != nil { return err } @@ -567,7 +635,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add balance.Txs++ } balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat) - markUtxoAsSpent(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc) + balance.markUtxoAsSpent(btxID, int32(input.Vout)) if balance.BalanceSat.Sign() < 0 { d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance") } @@ -578,20 +646,6 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add return nil } -// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent -// it is much faster than removing the utxo from the slice as it would cause in many memory copy operations -func markUtxoAsSpent(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) { - for i := range utxos { - utxo := &utxos[i] - if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) { - // mark utxo as spent by setting vout=-1 - utxo.Vout = -1 - return - } - } - glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc) -} - // addToAddressesMap maintains mapping between addresses and transactions in one block // the method assumes that outpus in the block are processed before the inputs // the return value is true if the tx was processed before, to not to count the tx multiple times @@ -739,7 +793,7 @@ func (d *RocksDB) getBlockTxs(height uint32) ([]blockTxs, error) { } // GetAddrDescBalance returns AddrBalance for given addrDesc -func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) { +func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor, detail AddressBalanceDetail) (*AddrBalance, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfAddressBalance], addrDesc) if err != nil { return nil, err @@ -750,16 +804,16 @@ func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBa if len(buf) < 3 { return nil, nil } - return unpackAddrBalance(buf, d.chainParser.PackedTxidLen()) + return unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), detail) } // GetAddressBalance returns address balance for an address or nil if address not found -func (d *RocksDB) GetAddressBalance(address string) (*AddrBalance, error) { +func (d *RocksDB) GetAddressBalance(address string, detail AddressBalanceDetail) (*AddrBalance, error) { addrDesc, err := d.chainParser.GetAddrDescFromAddress(address) if err != nil { return nil, err } - return d.GetAddrDescBalance(addrDesc) + return d.GetAddrDescBalance(addrDesc, detail) } func (d *RocksDB) getTxAddresses(btxID []byte) (*TxAddresses, error) { @@ -844,35 +898,43 @@ func appendTxOutput(txo *TxOutput, buf []byte, varBuf []byte) []byte { return buf } -func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) { +func unpackAddrBalance(buf []byte, txidUnpackedLen int, detail AddressBalanceDetail) (*AddrBalance, error) { txs, l := unpackVaruint(buf) sentSat, sl := unpackBigint(buf[l:]) balanceSat, bl := unpackBigint(buf[l+sl:]) l = l + sl + bl - // estimate the size of utxos to avoid reallocation - utxos := make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3) - for len(buf[l:]) >= txidUnpackedLen+3 { - btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...) - l += txidUnpackedLen - vout, ll := unpackVaruint(buf[l:]) - l += ll - height, ll := unpackVaruint(buf[l:]) - l += ll - valueSat, ll := unpackBigint(buf[l:]) - l += ll - utxos = append(utxos, Utxo{ - BtxID: btxID, - Vout: int32(vout), - Height: uint32(height), - ValueSat: valueSat, - }) - } - return &AddrBalance{ + ab := &AddrBalance{ Txs: uint32(txs), SentSat: sentSat, BalanceSat: balanceSat, - Utxos: utxos, - }, nil + } + if detail != AddressBalanceDetailNoUTXO { + // estimate the size of utxos to avoid reallocation + ab.Utxos = make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3) + // ab.utxosMap = make(map[string]int, cap(ab.Utxos)) + for len(buf[l:]) >= txidUnpackedLen+3 { + btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...) + l += txidUnpackedLen + vout, ll := unpackVaruint(buf[l:]) + l += ll + height, ll := unpackVaruint(buf[l:]) + l += ll + valueSat, ll := unpackBigint(buf[l:]) + l += ll + u := Utxo{ + BtxID: btxID, + Vout: int32(vout), + Height: uint32(height), + ValueSat: valueSat, + } + if detail == AddressBalanceDetailUTXO { + ab.Utxos = append(ab.Utxos, u) + } else { + ab.addUtxo(&u) + } + } + } + return ab, nil } func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte { @@ -1122,7 +1184,7 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, s := string(addrDesc) b, fb := balances[s] if !fb { - b, err = d.GetAddrDescBalance(addrDesc) + b, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed) if err != nil { return nil, err } @@ -1203,7 +1265,7 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, if balance.BalanceSat.Sign() < 0 { d.resetValueSatToZero(&balance.BalanceSat, t.AddrDesc, "balance") } - markUtxoAsSpent(balance.Utxos, btxID, int32(i), t.AddrDesc) + balance.markUtxoAsSpent(btxID, int32(i)) } else { ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc) glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc) diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 5fe27347..6a249953 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -656,7 +656,7 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { verifyAfterBitcoinTypeBlock2(t, d) // test public methods for address balance and tx addresses - ab, err := d.GetAddressBalance(dbtestdata.Addr5) + ab, err := d.GetAddressBalance(dbtestdata.Addr5, AddressBalanceDetailUTXO) if err != nil { t.Fatal(err) } @@ -1060,7 +1060,7 @@ func Test_packAddrBalance_unpackAddrBalance(t *testing.T) { if !reflect.DeepEqual(hex, tt.hex) { t.Errorf("packTxAddresses() = %v, want %v", hex, tt.hex) } - got1, err := unpackAddrBalance(b, parser.PackedTxidLen()) + got1, err := unpackAddrBalance(b, parser.PackedTxidLen(), AddressBalanceDetailUTXO) if err != nil { t.Errorf("unpackTxAddresses() error = %v", err) return