diff --git a/db/rocksdb.go b/db/rocksdb.go index f9dfdf91..d65325f5 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -71,7 +71,8 @@ const ( ) // common columns -var cfNames = []string{"default", "height", "addresses", "blockTxs", "transactions"} +var cfNames []string +var cfBaseNames = []string{"default", "height", "addresses", "blockTxs", "transactions"} // type specific columns var cfNamesBitcoinType = []string{"addressBalance", "txAddresses"} @@ -102,6 +103,7 @@ func openDB(path string, c *gorocksdb.Cache, openFiles int) (*gorocksdb.DB, []*g func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockChainParser, metrics *common.Metrics) (d *RocksDB, err error) { glog.Infof("rocksdb: opening %s, required data version %v, cache size %v, max open files %v", path, dbVersion, cacheSize, maxOpenFiles) + cfNames = append([]string{}, cfBaseNames...) chainType := parser.GetChainType() if chainType == bchain.ChainBitcoinType { cfNames = append(cfNames, cfNamesBitcoinType...) @@ -476,6 +478,12 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add d.cbs.balancesHit++ } balance.BalanceSat.Add(&balance.BalanceSat, &output.ValueSat) + balance.Utxos = append(balance.Utxos, Utxo{ + BtxID: btxID, + Vout: int32(i), + Height: block.Height, + ValueSat: output.ValueSat, + }) counted := addToAddressesMap(addresses, strAddrDesc, btxID, int32(i)) if !counted { balance.Txs++ @@ -557,6 +565,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add balance.Txs++ } balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat) + balance.Utxos = removeUtxo(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc) if balance.BalanceSat.Sign() < 0 { d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance") } @@ -567,6 +576,16 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add return nil } +func removeUtxo(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) []Utxo { + for i, utxo := range utxos { + if utxo.Vout == vout && bytes.Compare(utxo.BtxID, btxID) == 0 { + return append(utxos[:i], utxos[i+1:]...) + } + } + glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc) + return utxos +} + // addToAddressesMap maintains mapping between addresses and transactions in one block // the method assumes that outpus in the block are processed before the inputs // the return value is true if the tx was processed before, to not to count the tx multiple times @@ -611,19 +630,16 @@ func (d *RocksDB) storeTxAddresses(wb *gorocksdb.WriteBatch, am map[string]*TxAd } func (d *RocksDB) storeBalances(wb *gorocksdb.WriteBatch, abm map[string]*AddrBalance) error { - // allocate buffer big enough for number of txs + 2 bigints - buf := make([]byte, vlq.MaxLen32+2*maxPackedBigintBytes) + // allocate buffer initial buffer + buf := make([]byte, 1024) + varBuf := make([]byte, maxPackedBigintBytes) for addrDesc, ab := range abm { - // balance with 0 transactions is removed from db - happens in disconnect + // balance with 0 transactions is removed from db - happens on disconnect if ab == nil || ab.Txs <= 0 { wb.DeleteCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc)) } else { - l := packVaruint(uint(ab.Txs), buf) - ll := packBigint(&ab.SentSat, buf[l:]) - l += ll - ll = packBigint(&ab.BalanceSat, buf[l:]) - l += ll - wb.PutCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc), buf[:l]) + buf = packAddrBalance(ab, buf, varBuf) + wb.PutCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc), buf) } } return nil @@ -728,14 +744,7 @@ func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBa if len(buf) < 3 { return nil, nil } - txs, l := unpackVaruint(buf) - sentSat, sl := unpackBigint(buf[l:]) - balanceSat, _ := unpackBigint(buf[l+sl:]) - return &AddrBalance{ - Txs: uint32(txs), - SentSat: sentSat, - BalanceSat: balanceSat, - }, nil + return unpackAddrBalance(buf, d.chainParser.PackedTxidLen()) } // GetAddressBalance returns address balance for an address or nil if address not found @@ -829,6 +838,56 @@ func appendTxOutput(txo *TxOutput, buf []byte, varBuf []byte) []byte { return buf } +func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) { + txs, l := unpackVaruint(buf) + sentSat, sl := unpackBigint(buf[l:]) + balanceSat, bl := unpackBigint(buf[l+sl:]) + l = l + sl + bl + utxos := make([]Utxo, 0) + for len(buf[l:]) >= txidUnpackedLen+3 { + btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...) + l += txidUnpackedLen + vout, ll := unpackVaruint(buf[l:]) + l += ll + height, ll := unpackVaruint(buf[l:]) + l += ll + valueSat, ll := unpackBigint(buf[l:]) + l += ll + utxos = append(utxos, Utxo{ + BtxID: btxID, + Vout: int32(vout), + Height: uint32(height), + ValueSat: valueSat, + }) + } + return &AddrBalance{ + Txs: uint32(txs), + SentSat: sentSat, + BalanceSat: balanceSat, + Utxos: utxos, + }, nil +} + +func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte { + buf = buf[:0] + l := packVaruint(uint(ab.Txs), varBuf) + buf = append(buf, varBuf[:l]...) + l = packBigint(&ab.SentSat, varBuf) + buf = append(buf, varBuf[:l]...) + l = packBigint(&ab.BalanceSat, varBuf) + buf = append(buf, varBuf[:l]...) + for _, utxo := range ab.Utxos { + buf = append(buf, utxo.BtxID...) + l = packVaruint(uint(utxo.Vout), varBuf) + buf = append(buf, varBuf[:l]...) + l = packVaruint(uint(utxo.Height), varBuf) + buf = append(buf, varBuf[:l]...) + l = packBigint(&utxo.ValueSat, varBuf) + buf = append(buf, varBuf[:l]...) + } + return buf +} + func unpackTxAddresses(buf []byte) (*TxAddresses, error) { ta := TxAddresses{} height, l := unpackVaruint(buf) diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index fbe3d4d6..40f1e192 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -99,6 +99,11 @@ func uintToHex(i uint32) string { return hex.EncodeToString(buf) } +func hexToBytes(h string) []byte { + b, _ := hex.DecodeString(h) + return b +} + func addressKeyHex(a string, height uint32, d *RocksDB) string { return dbtestdata.AddressToPubKeyHex(a, d.chainParser) + uintToHex(^height) } @@ -221,11 +226,36 @@ func verifyAfterBitcoinTypeBlock1(t *testing.T, d *RocksDB, afterDisconnect bool } } if err := checkColumn(d, cfAddressBalance, []keyPair{ - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A2), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A3), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A4), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A5), nil}, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1) + + dbtestdata.TxidB1T1 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A1), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A2) + + dbtestdata.TxidB1T1 + varuintToHex(1) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A2), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A3) + + dbtestdata.TxidB1T2 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A3), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A4) + + dbtestdata.TxidB1T2 + varuintToHex(1) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A4), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A5) + + dbtestdata.TxidB1T2 + varuintToHex(2) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A5), + nil, + }, }); err != nil { { t.Fatal(err) @@ -356,16 +386,62 @@ func verifyAfterBitcoinTypeBlock2(t *testing.T, d *RocksDB) { } } if err := checkColumn(d, cfAddressBalance, []keyPair{ - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T1A2) + bigintToHex(dbtestdata.SatZero), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A3) + bigintToHex(dbtestdata.SatZero), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A4) + bigintToHex(dbtestdata.SatZero), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A5) + bigintToHex(dbtestdata.SatB2T3A5), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr6, d.chainParser), "02" + bigintToHex(dbtestdata.SatB2T1A6) + bigintToHex(dbtestdata.SatZero), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr7, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T1A7), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr8, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A8), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.Addr9, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A9), nil}, - {dbtestdata.AddressToPubKeyHex(dbtestdata.AddrA, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T4AA), nil}, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1) + + dbtestdata.TxidB1T1 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A1), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), + "02" + bigintToHex(dbtestdata.SatB1T1A2) + bigintToHex(dbtestdata.SatZero), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), + "02" + bigintToHex(dbtestdata.SatB1T2A3) + bigintToHex(dbtestdata.SatZero), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), + "02" + bigintToHex(dbtestdata.SatB1T2A4) + bigintToHex(dbtestdata.SatZero), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), + "02" + bigintToHex(dbtestdata.SatB1T2A5) + bigintToHex(dbtestdata.SatB2T3A5) + + dbtestdata.TxidB2T3 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T3A5), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr6, d.chainParser), + "02" + bigintToHex(dbtestdata.SatB2T1A6) + bigintToHex(dbtestdata.SatZero), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr7, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T1A7) + + dbtestdata.TxidB2T1 + varuintToHex(1) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T1A7), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr8, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A8) + + dbtestdata.TxidB2T2 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T2A8), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.Addr9, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A9) + + dbtestdata.TxidB2T2 + varuintToHex(1) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T2A9), + nil, + }, + { + dbtestdata.AddressToPubKeyHex(dbtestdata.AddrA, d.chainParser), + "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T4AA) + + dbtestdata.TxidB2T4 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T4AA), + nil, + }, }); err != nil { { t.Fatal(err) @@ -626,10 +702,7 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { ValueSat: *dbtestdata.SatB2T1A7, }, { - AddrDesc: func() []byte { - b, _ := hex.DecodeString(dbtestdata.TxidB2T1Output3OpReturn) - return b - }(), + AddrDesc: hexToBytes(dbtestdata.TxidB2T1Output3OpReturn), Spent: false, ValueSat: *dbtestdata.SatZero, }, @@ -915,3 +988,78 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) { }) } } + +func Test_packAddrBalance_unpackAddrBalance(t *testing.T) { + parser := bitcoinTestnetParser() + tests := []struct { + name string + hex string + data *AddrBalance + }{ + { + name: "no utxos", + hex: "7b060b44cc1af8520514faf980ac", + data: &AddrBalance{ + BalanceSat: *big.NewInt(90110001324), + SentSat: *big.NewInt(12390110001234), + Txs: 123, + Utxos: []Utxo{}, + }, + }, + { + name: "utxos", + hex: "7b060b44cc1af8520514faf980ac00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400c87c440060b2fd12177a6effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac750098faf659010105e2e48aeabdd9b75def7b48d756ba304713c2aba7b522bf9dbc893fc4231b0782c6df6d84ccd88552087e9cba87a275ffff", + data: &AddrBalance{ + BalanceSat: *big.NewInt(90110001324), + SentSat: *big.NewInt(12390110001234), + Txs: 123, + Utxos: []Utxo{ + { + BtxID: hexToBytes(dbtestdata.TxidB1T1), + Vout: 12, + Height: 123456, + ValueSat: *big.NewInt(12390110001234 - 90110001324), + }, + { + BtxID: hexToBytes(dbtestdata.TxidB1T2), + Vout: 0, + Height: 52345689, + ValueSat: *big.NewInt(1), + }, + { + BtxID: hexToBytes(dbtestdata.TxidB2T3), + Vout: 5353453, + Height: 1234567890, + ValueSat: *big.NewInt(9123372036854775807), + }, + }, + }, + }, + { + name: "empty", + hex: "000000", + data: &AddrBalance{ + Utxos: []Utxo{}, + }, + }, + } + varBuf := make([]byte, maxPackedBigintBytes) + buf := make([]byte, 32) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := packAddrBalance(tt.data, buf, varBuf) + hex := hex.EncodeToString(b) + if !reflect.DeepEqual(hex, tt.hex) { + t.Errorf("packTxAddresses() = %v, want %v", hex, tt.hex) + } + got1, err := unpackAddrBalance(b, parser.PackedTxidLen()) + if err != nil { + t.Errorf("unpackTxAddresses() error = %v", err) + return + } + if !reflect.DeepEqual(got1, tt.data) { + t.Errorf("unpackTxAddresses() = %+v, want %+v", got1, tt.data) + } + }) + } +}