Add utxos to addressBalance column

This commit is contained in:
Martin Boehm 2019-05-03 20:46:40 +02:00
parent 5689be20f3
commit 995d5c66b5
2 changed files with 244 additions and 37 deletions

View File

@ -71,7 +71,8 @@ const (
)
// common columns
var cfNames = []string{"default", "height", "addresses", "blockTxs", "transactions"}
var cfNames []string
var cfBaseNames = []string{"default", "height", "addresses", "blockTxs", "transactions"}
// type specific columns
var cfNamesBitcoinType = []string{"addressBalance", "txAddresses"}
@ -102,6 +103,7 @@ func openDB(path string, c *gorocksdb.Cache, openFiles int) (*gorocksdb.DB, []*g
func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockChainParser, metrics *common.Metrics) (d *RocksDB, err error) {
glog.Infof("rocksdb: opening %s, required data version %v, cache size %v, max open files %v", path, dbVersion, cacheSize, maxOpenFiles)
cfNames = append([]string{}, cfBaseNames...)
chainType := parser.GetChainType()
if chainType == bchain.ChainBitcoinType {
cfNames = append(cfNames, cfNamesBitcoinType...)
@ -476,6 +478,12 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
d.cbs.balancesHit++
}
balance.BalanceSat.Add(&balance.BalanceSat, &output.ValueSat)
balance.Utxos = append(balance.Utxos, Utxo{
BtxID: btxID,
Vout: int32(i),
Height: block.Height,
ValueSat: output.ValueSat,
})
counted := addToAddressesMap(addresses, strAddrDesc, btxID, int32(i))
if !counted {
balance.Txs++
@ -557,6 +565,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
balance.Txs++
}
balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat)
balance.Utxos = removeUtxo(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc)
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance")
}
@ -567,6 +576,16 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
return nil
}
func removeUtxo(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) []Utxo {
for i, utxo := range utxos {
if utxo.Vout == vout && bytes.Compare(utxo.BtxID, btxID) == 0 {
return append(utxos[:i], utxos[i+1:]...)
}
}
glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc)
return utxos
}
// addToAddressesMap maintains mapping between addresses and transactions in one block
// the method assumes that outpus in the block are processed before the inputs
// the return value is true if the tx was processed before, to not to count the tx multiple times
@ -611,19 +630,16 @@ func (d *RocksDB) storeTxAddresses(wb *gorocksdb.WriteBatch, am map[string]*TxAd
}
func (d *RocksDB) storeBalances(wb *gorocksdb.WriteBatch, abm map[string]*AddrBalance) error {
// allocate buffer big enough for number of txs + 2 bigints
buf := make([]byte, vlq.MaxLen32+2*maxPackedBigintBytes)
// allocate buffer initial buffer
buf := make([]byte, 1024)
varBuf := make([]byte, maxPackedBigintBytes)
for addrDesc, ab := range abm {
// balance with 0 transactions is removed from db - happens in disconnect
// balance with 0 transactions is removed from db - happens on disconnect
if ab == nil || ab.Txs <= 0 {
wb.DeleteCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc))
} else {
l := packVaruint(uint(ab.Txs), buf)
ll := packBigint(&ab.SentSat, buf[l:])
l += ll
ll = packBigint(&ab.BalanceSat, buf[l:])
l += ll
wb.PutCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc), buf[:l])
buf = packAddrBalance(ab, buf, varBuf)
wb.PutCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc), buf)
}
}
return nil
@ -728,14 +744,7 @@ func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBa
if len(buf) < 3 {
return nil, nil
}
txs, l := unpackVaruint(buf)
sentSat, sl := unpackBigint(buf[l:])
balanceSat, _ := unpackBigint(buf[l+sl:])
return &AddrBalance{
Txs: uint32(txs),
SentSat: sentSat,
BalanceSat: balanceSat,
}, nil
return unpackAddrBalance(buf, d.chainParser.PackedTxidLen())
}
// GetAddressBalance returns address balance for an address or nil if address not found
@ -829,6 +838,56 @@ func appendTxOutput(txo *TxOutput, buf []byte, varBuf []byte) []byte {
return buf
}
func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) {
txs, l := unpackVaruint(buf)
sentSat, sl := unpackBigint(buf[l:])
balanceSat, bl := unpackBigint(buf[l+sl:])
l = l + sl + bl
utxos := make([]Utxo, 0)
for len(buf[l:]) >= txidUnpackedLen+3 {
btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...)
l += txidUnpackedLen
vout, ll := unpackVaruint(buf[l:])
l += ll
height, ll := unpackVaruint(buf[l:])
l += ll
valueSat, ll := unpackBigint(buf[l:])
l += ll
utxos = append(utxos, Utxo{
BtxID: btxID,
Vout: int32(vout),
Height: uint32(height),
ValueSat: valueSat,
})
}
return &AddrBalance{
Txs: uint32(txs),
SentSat: sentSat,
BalanceSat: balanceSat,
Utxos: utxos,
}, nil
}
func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte {
buf = buf[:0]
l := packVaruint(uint(ab.Txs), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&ab.SentSat, varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&ab.BalanceSat, varBuf)
buf = append(buf, varBuf[:l]...)
for _, utxo := range ab.Utxos {
buf = append(buf, utxo.BtxID...)
l = packVaruint(uint(utxo.Vout), varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(utxo.Height), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&utxo.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
}
return buf
}
func unpackTxAddresses(buf []byte) (*TxAddresses, error) {
ta := TxAddresses{}
height, l := unpackVaruint(buf)

View File

@ -99,6 +99,11 @@ func uintToHex(i uint32) string {
return hex.EncodeToString(buf)
}
func hexToBytes(h string) []byte {
b, _ := hex.DecodeString(h)
return b
}
func addressKeyHex(a string, height uint32, d *RocksDB) string {
return dbtestdata.AddressToPubKeyHex(a, d.chainParser) + uintToHex(^height)
}
@ -221,11 +226,36 @@ func verifyAfterBitcoinTypeBlock1(t *testing.T, d *RocksDB, afterDisconnect bool
}
}
if err := checkColumn(d, cfAddressBalance, []keyPair{
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A2), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A3), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A4), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A5), nil},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1) +
dbtestdata.TxidB1T1 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A1),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A2) +
dbtestdata.TxidB1T1 + varuintToHex(1) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A2),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A3) +
dbtestdata.TxidB1T2 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A3),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A4) +
dbtestdata.TxidB1T2 + varuintToHex(1) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A4),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T2A5) +
dbtestdata.TxidB1T2 + varuintToHex(2) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T2A5),
nil,
},
}); err != nil {
{
t.Fatal(err)
@ -356,16 +386,62 @@ func verifyAfterBitcoinTypeBlock2(t *testing.T, d *RocksDB) {
}
}
if err := checkColumn(d, cfAddressBalance, []keyPair{
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T1A2) + bigintToHex(dbtestdata.SatZero), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A3) + bigintToHex(dbtestdata.SatZero), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A4) + bigintToHex(dbtestdata.SatZero), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser), "02" + bigintToHex(dbtestdata.SatB1T2A5) + bigintToHex(dbtestdata.SatB2T3A5), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr6, d.chainParser), "02" + bigintToHex(dbtestdata.SatB2T1A6) + bigintToHex(dbtestdata.SatZero), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr7, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T1A7), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr8, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A8), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.Addr9, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A9), nil},
{dbtestdata.AddressToPubKeyHex(dbtestdata.AddrA, d.chainParser), "01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T4AA), nil},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr1, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB1T1A1) +
dbtestdata.TxidB1T1 + varuintToHex(0) + varuintToHex(225493) + bigintToHex(dbtestdata.SatB1T1A1),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr2, d.chainParser),
"02" + bigintToHex(dbtestdata.SatB1T1A2) + bigintToHex(dbtestdata.SatZero),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr3, d.chainParser),
"02" + bigintToHex(dbtestdata.SatB1T2A3) + bigintToHex(dbtestdata.SatZero),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr4, d.chainParser),
"02" + bigintToHex(dbtestdata.SatB1T2A4) + bigintToHex(dbtestdata.SatZero),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr5, d.chainParser),
"02" + bigintToHex(dbtestdata.SatB1T2A5) + bigintToHex(dbtestdata.SatB2T3A5) +
dbtestdata.TxidB2T3 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T3A5),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr6, d.chainParser),
"02" + bigintToHex(dbtestdata.SatB2T1A6) + bigintToHex(dbtestdata.SatZero),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr7, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T1A7) +
dbtestdata.TxidB2T1 + varuintToHex(1) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T1A7),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr8, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A8) +
dbtestdata.TxidB2T2 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T2A8),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.Addr9, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T2A9) +
dbtestdata.TxidB2T2 + varuintToHex(1) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T2A9),
nil,
},
{
dbtestdata.AddressToPubKeyHex(dbtestdata.AddrA, d.chainParser),
"01" + bigintToHex(dbtestdata.SatZero) + bigintToHex(dbtestdata.SatB2T4AA) +
dbtestdata.TxidB2T4 + varuintToHex(0) + varuintToHex(225494) + bigintToHex(dbtestdata.SatB2T4AA),
nil,
},
}); err != nil {
{
t.Fatal(err)
@ -626,10 +702,7 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
ValueSat: *dbtestdata.SatB2T1A7,
},
{
AddrDesc: func() []byte {
b, _ := hex.DecodeString(dbtestdata.TxidB2T1Output3OpReturn)
return b
}(),
AddrDesc: hexToBytes(dbtestdata.TxidB2T1Output3OpReturn),
Spent: false,
ValueSat: *dbtestdata.SatZero,
},
@ -915,3 +988,78 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) {
})
}
}
func Test_packAddrBalance_unpackAddrBalance(t *testing.T) {
parser := bitcoinTestnetParser()
tests := []struct {
name string
hex string
data *AddrBalance
}{
{
name: "no utxos",
hex: "7b060b44cc1af8520514faf980ac",
data: &AddrBalance{
BalanceSat: *big.NewInt(90110001324),
SentSat: *big.NewInt(12390110001234),
Txs: 123,
Utxos: []Utxo{},
},
},
{
name: "utxos",
hex: "7b060b44cc1af8520514faf980ac00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400c87c440060b2fd12177a6effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac750098faf659010105e2e48aeabdd9b75def7b48d756ba304713c2aba7b522bf9dbc893fc4231b0782c6df6d84ccd88552087e9cba87a275ffff",
data: &AddrBalance{
BalanceSat: *big.NewInt(90110001324),
SentSat: *big.NewInt(12390110001234),
Txs: 123,
Utxos: []Utxo{
{
BtxID: hexToBytes(dbtestdata.TxidB1T1),
Vout: 12,
Height: 123456,
ValueSat: *big.NewInt(12390110001234 - 90110001324),
},
{
BtxID: hexToBytes(dbtestdata.TxidB1T2),
Vout: 0,
Height: 52345689,
ValueSat: *big.NewInt(1),
},
{
BtxID: hexToBytes(dbtestdata.TxidB2T3),
Vout: 5353453,
Height: 1234567890,
ValueSat: *big.NewInt(9123372036854775807),
},
},
},
},
{
name: "empty",
hex: "000000",
data: &AddrBalance{
Utxos: []Utxo{},
},
},
}
varBuf := make([]byte, maxPackedBigintBytes)
buf := make([]byte, 32)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := packAddrBalance(tt.data, buf, varBuf)
hex := hex.EncodeToString(b)
if !reflect.DeepEqual(hex, tt.hex) {
t.Errorf("packTxAddresses() = %v, want %v", hex, tt.hex)
}
got1, err := unpackAddrBalance(b, parser.PackedTxidLen())
if err != nil {
t.Errorf("unpackTxAddresses() error = %v", err)
return
}
if !reflect.DeepEqual(got1, tt.data) {
t.Errorf("unpackTxAddresses() = %+v, want %+v", got1, tt.data)
}
})
}
}