diff --git a/db/rocksdb.go b/db/rocksdb.go index aa53d74c..dfff330a 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -277,15 +277,21 @@ type outpoint struct { index int32 } -type txAddress struct { +type txInput struct { + addrID []byte + vout uint32 + valueSat big.Int +} + +type txOutput struct { addrID []byte spent bool valueSat big.Int } type txAddresses struct { - inputs []txAddress - outputs []txAddress + inputs []txInput + outputs []txOutput } type addrBalance struct { @@ -294,6 +300,17 @@ type addrBalance struct { balanceSat big.Int } +func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText string) { + ad, err := d.chainParser.OutputScriptToAddresses(addrID) + had := hex.EncodeToString(addrID) + if err != nil { + glog.Warningf("rocksdb: unparsable address hex '%v' reached negative %s %v, resetting to 0. Parser error %v", had, logText, valueSat.String(), err) + } else { + glog.Warningf("rocksdb: address %v hex '%v' reached negative %s %v, resetting to 0", ad, had, logText, valueSat.String()) + } + valueSat.SetInt64(0) +} + func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error { addresses := make(map[string][]outpoint) blockTxids := make([][]byte, len(block.Txs)) @@ -307,7 +324,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo } blockTxids[txi] = btxID ta := txAddresses{} - ta.outputs = make([]txAddress, len(tx.Vout)) + ta.outputs = make([]txOutput, len(tx.Vout)) txAddressesMap[string(btxID)] = &ta for i, output := range tx.Vout { tao := &ta.outputs[i] @@ -358,7 +375,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo for txi, tx := range block.Txs { spendingTxid := blockTxids[txi] ta := txAddressesMap[string(spendingTxid)] - ta.inputs = make([]txAddress, len(tx.Vin)) + ta.inputs = make([]txInput, len(tx.Vin)) for i, input := range tx.Vin { tai := &ta.inputs[i] btxID, err := d.chainParser.PackTxid(input.Txid) @@ -392,6 +409,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo continue } tai.addrID = ot.addrID + tai.vout = input.Vout tai.valueSat = ot.valueSat // mark the output as spent in tx ot.spent = true @@ -427,14 +445,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo } ab.balanceSat.Sub(&ab.balanceSat, &ot.valueSat) if ab.balanceSat.Sign() < 0 { - ad, err := d.chainParser.OutputScriptToAddresses(ot.addrID) - had := hex.EncodeToString(ot.addrID) - if err != nil { - glog.Warningf("rocksdb: unparsable address hex '%v' reached negative balance %v, resetting to 0. Parser error %v", had, ab.balanceSat.String(), err) - } else { - glog.Warningf("rocksdb: address %v hex '%v' reached negative balance %v, resetting to 0", ad, had, ab.balanceSat.String()) - } - ab.balanceSat.SetInt64(0) + d.resetValueSatToZero(&ab.balanceSat, ot.addrID, "balance") } ab.sentSat.Add(&ab.sentSat, &ot.valueSat) } @@ -584,25 +595,37 @@ func packTxAddresses(ta *txAddresses, buf []byte, varBuf []byte) []byte { l := packVaruint(uint(len(ta.inputs)), varBuf) buf = append(buf, varBuf[:l]...) for i := range ta.inputs { - buf = appendTxAddress(&ta.inputs[i], buf, varBuf) + buf = appendTxInput(&ta.inputs[i], buf, varBuf) } l = packVaruint(uint(len(ta.outputs)), varBuf) buf = append(buf, varBuf[:l]...) for i := range ta.outputs { - buf = appendTxAddress(&ta.outputs[i], buf, varBuf) + buf = appendTxOutput(&ta.outputs[i], buf, varBuf) } return buf } -func appendTxAddress(txa *txAddress, buf []byte, varBuf []byte) []byte { - la := len(txa.addrID) - if txa.spent { +func appendTxInput(txi *txInput, buf []byte, varBuf []byte) []byte { + la := len(txi.addrID) + l := packVarint(la, varBuf) + buf = append(buf, varBuf[:l]...) + buf = append(buf, txi.addrID...) + l = packBigint(&txi.valueSat, varBuf) + buf = append(buf, varBuf[:l]...) + l = packVaruint(uint(txi.vout), varBuf) + buf = append(buf, varBuf[:l]...) + return buf +} + +func appendTxOutput(txo *txOutput, buf []byte, varBuf []byte) []byte { + la := len(txo.addrID) + if txo.spent { la = ^la } l := packVarint(la, varBuf) buf = append(buf, varBuf[:l]...) - buf = append(buf, txa.addrID...) - l = packBigint(&txa.valueSat, varBuf) + buf = append(buf, txo.addrID...) + l = packBigint(&txo.valueSat, varBuf) buf = append(buf, varBuf[:l]...) return buf } @@ -610,29 +633,41 @@ func appendTxAddress(txa *txAddress, buf []byte, varBuf []byte) []byte { func unpackTxAddresses(buf []byte) (*txAddresses, error) { ta := txAddresses{} inputs, l := unpackVaruint(buf) - ta.inputs = make([]txAddress, inputs) + ta.inputs = make([]txInput, inputs) for i := uint(0); i < inputs; i++ { - l += unpackTxAddress(&ta.inputs[i], buf[l:]) + l += unpackTxInput(&ta.inputs[i], buf[l:]) } outputs, ll := unpackVaruint(buf[l:]) l += ll - ta.outputs = make([]txAddress, outputs) + ta.outputs = make([]txOutput, outputs) for i := uint(0); i < outputs; i++ { - l += unpackTxAddress(&ta.outputs[i], buf[l:]) + l += unpackTxOutput(&ta.outputs[i], buf[l:]) } return &ta, nil } -func unpackTxAddress(ta *txAddress, buf []byte) int { +func unpackTxInput(ti *txInput, buf []byte) int { + al, l := unpackVarint(buf) + ti.addrID = make([]byte, al) + copy(ti.addrID, buf[l:l+al]) + al += l + ti.valueSat, l = unpackBigint(buf[al:]) + al += l + v, l := unpackVaruint(buf[al:]) + ti.vout = uint32(v) + return l + al +} + +func unpackTxOutput(to *txOutput, buf []byte) int { al, l := unpackVarint(buf) if al < 0 { - ta.spent = true + to.spent = true al = ^al } - ta.addrID = make([]byte, al) - copy(ta.addrID, buf[l:l+al]) + to.addrID = make([]byte, al) + copy(to.addrID, buf[l:l+al]) al += l - ta.valueSat, l = unpackBigint(buf[al:]) + to.valueSat, l = unpackBigint(buf[al:]) return l + al } @@ -759,11 +794,7 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) { return d.chainParser.UnpackBlockHash(val.Data()) } -func (d *RocksDB) writeHeight( - wb *gorocksdb.WriteBatch, - block *bchain.Block, - op int, -) error { +func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { key := packUint(block.Height) switch op { @@ -776,7 +807,6 @@ func (d *RocksDB) writeHeight( case opDelete: wb.DeleteCF(d.cfh[cfHeight], key) } - return nil } @@ -827,12 +857,46 @@ func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]b return addrKeys, addrValues, nil } -func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses, txAddressesToUpdate map[string]*txAddresses, txsToDelete map[string]struct{}, balances map[string]*addrBalance) error { - findBalance := func(addrID []byte) (*addrBalance, error) { +// get all transactions of the given address and match it to input to find spent output +func (d *RocksDB) findSpentTx(ti *txInput) ([]byte, *txAddresses, error) { + start := packAddressKey(ti.addrID, 0) + stop := packAddressKey(ti.addrID, ^uint32(0)) + it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses]) + defer it.Close() + for it.Seek(start); it.Valid(); it.Next() { + key := it.Key().Data() + val := it.Value().Data() + if bytes.Compare(key, stop) > 0 { + break + } + outpoints, err := d.unpackOutpoints(val) + if err != nil { + return nil, nil, err + } + for _, o := range outpoints { + // process only outputs that match + if o.index >= 0 && uint32(o.index) == ti.vout { + a, err := d.getTxAddresses(o.btxID) + if err != nil { + return nil, nil, err + } + if bytes.Equal(a.outputs[o.index].addrID, ti.addrID) { + return o.btxID, a, nil + } + } + } + } + return nil, nil, nil +} + +func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses, + txAddressesToUpdate map[string]*txAddresses, balances map[string]*addrBalance) error { + addresses := make(map[string]struct{}) + getAddressBalance := func(addrID []byte) (*addrBalance, error) { var err error s := string(addrID) - b, found := balances[s] - if !found { + b, fb := balances[s] + if !fb { b, err = d.getAddressBalance(addrID) if err != nil { return nil, err @@ -842,15 +906,52 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, return b, nil } for _, t := range txa.inputs { - b, err := findBalance(t.addrID) + s := string(t.addrID) + _, fa := addresses[s] + if !fa { + addresses[s] = struct{}{} + } + b, err := getAddressBalance(t.addrID) if err != nil { return err } if b != nil { - + // subtract number of txs only once + if !fa { + b.txs-- + } + b.sentSat.Sub(&b.sentSat, &t.valueSat) + if b.sentSat.Sign() < 0 { + d.resetValueSatToZero(&b.sentSat, t.addrID, "sent amount") + } + b.balanceSat.Add(&b.balanceSat, &t.valueSat) } } - + for _, t := range txa.outputs { + s := string(t.addrID) + _, fa := addresses[s] + if !fa { + addresses[s] = struct{}{} + } + b, err := getAddressBalance(t.addrID) + if err != nil { + return err + } + if b != nil { + // subtract number of txs only once + if !fa { + b.txs-- + } + b.balanceSat.Sub(&b.balanceSat, &t.valueSat) + if b.balanceSat.Sign() < 0 { + d.resetValueSatToZero(&b.balanceSat, t.addrID, "balance") + } + } + } + for a := range addresses { + key := packAddressKey([]byte(a), height) + wb.DeleteCF(d.cfh[cfAddresses], key) + } return nil } @@ -883,7 +984,7 @@ func (d *RocksDB) DisconnectBlockRangeUTXO(lower uint32, higher uint32) error { } s := string(txid) txsToDelete[s] = struct{}{} - if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, txsToDelete, balances); err != nil { + if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, balances); err != nil { return err } } diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 4a96e769..fe841e82 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -432,8 +432,8 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) { keyPair{ txidB2T1, "02" + - addressToPubKeyHexWithLength(addr3, t, d) + bigintToHex(satB1T2A3) + - addressToPubKeyHexWithLength(addr2, t, d) + bigintToHex(satB1T1A2) + + addressToPubKeyHexWithLength(addr3, t, d) + bigintToHex(satB1T2A3) + "00" + + addressToPubKeyHexWithLength(addr2, t, d) + bigintToHex(satB1T1A2) + "01" + "02" + spentAddressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) + addressToPubKeyHexWithLength(addr7, t, d) + bigintToHex(satB2T1A7), @@ -442,8 +442,8 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) { keyPair{ txidB2T2, "02" + - addressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) + - addressToPubKeyHexWithLength(addr4, t, d) + bigintToHex(satB1T2A4) + + addressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) + "00" + + addressToPubKeyHexWithLength(addr4, t, d) + bigintToHex(satB1T2A4) + "01" + "02" + addressToPubKeyHexWithLength(addr8, t, d) + bigintToHex(satB2T2A8) + addressToPubKeyHexWithLength(addr9, t, d) + bigintToHex(satB2T2A9), @@ -452,7 +452,7 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) { keyPair{ txidB2T3, "01" + - addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB1T2A5) + + addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB1T2A5) + "02" + "01" + addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB2T3A5), nil, @@ -548,7 +548,7 @@ func testTxCache(t *testing.T, d *RocksDB, b *bchain.Block, tx *bchain.Tx) { // 3) GetBestBlock, GetBlockHash // 4) Test tx caching functionality // 5) Disconnect block 2 - expect error -// 6) Disconnect the block 2 using blockaddresses column +// 6) Disconnect the block 2 using BlockTxids column // 7) Reconnect block 2 and disconnect blocks 1 and 2 using full scan - expect error // After each step, the content of DB is examined and any difference against expected state is regarded as failure func TestRocksDB_Index_UTXO(t *testing.T) { @@ -762,19 +762,21 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) { }{ { name: "1", - hex: "022c001443aac20a116e09ea4f7914be1c55e4c17aa600b7002c001454633aa8bd2e552bd4e89c01e73c1b7905eb58460811207cb68a199872012d001443aac20a116e09ea4f7914be1c55e4c17aa600b70101", + hex: "022c001443aac20a116e09ea4f7914be1c55e4c17aa600b700e0392c001454633aa8bd2e552bd4e89c01e73c1b7905eb58460811207cb68a19987283bb55012d001443aac20a116e09ea4f7914be1c55e4c17aa600b70101", data: &txAddresses{ - inputs: []txAddress{ + inputs: []txInput{ { addrID: addressToOutput("tb1qgw4vyzs3dcy75nmezjlpc40yc9a2vq9hghdyt2", parser), valueSat: *big.NewInt(0), + vout: 12345, }, { addrID: addressToOutput("tb1q233n429a9e2jh48gnsq7w0qm0yz7kkzx0qczw8", parser), valueSat: *big.NewInt(1234123421342341234), + vout: 56789, }, }, - outputs: []txAddress{ + outputs: []txOutput{ { addrID: addressToOutput("tb1qgw4vyzs3dcy75nmezjlpc40yc9a2vq9hghdyt2", parser), valueSat: *big.NewInt(1), @@ -785,23 +787,26 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) { }, { name: "2", - hex: "032ea9149eb21980dc9d413d8eac27314938b9da920ee53e8705021918f2c02ea91409f70b896169c37981d2b54b371df0d81a136a2c870501dd7e28c02ea914e371782582a4addb541362c55565d2cdf56f6498870501a1e35ec0052fa9141d9ca71efa36d814424ea6ca1437e67287aebe348705012aadcac02ea91424fbc77cdc62702ade74dcf989c15e5d3f9240bc870501664894c02fa914afbfb74ee994c7d45f6698738bc4226d065266f7870501a1e35ec03276a914d2a37ce20ac9ec4f15dd05a7c6e8e9fbdb99850e88ac043b9943603376a9146b2044146a4438e6e5bfbc65f147afeb64d14fbb88ac05012a05f200", + hex: "032ea9149eb21980dc9d413d8eac27314938b9da920ee53e8705021918f2c0012ea91409f70b896169c37981d2b54b371df0d81a136a2c870501dd7e28c0022ea914e371782582a4addb541362c55565d2cdf56f6498870501a1e35ec003052fa9141d9ca71efa36d814424ea6ca1437e67287aebe348705012aadcac02ea91424fbc77cdc62702ade74dcf989c15e5d3f9240bc870501664894c02fa914afbfb74ee994c7d45f6698738bc4226d065266f7870501a1e35ec03276a914d2a37ce20ac9ec4f15dd05a7c6e8e9fbdb99850e88ac043b9943603376a9146b2044146a4438e6e5bfbc65f147afeb64d14fbb88ac05012a05f200", data: &txAddresses{ - inputs: []txAddress{ + inputs: []txInput{ { addrID: addressToOutput("2N7iL7AvS4LViugwsdjTB13uN4T7XhV1bCP", parser), valueSat: *big.NewInt(9011000000), + vout: 1, }, { addrID: addressToOutput("2Mt9v216YiNBAzobeNEzd4FQweHrGyuRHze", parser), valueSat: *big.NewInt(8011000000), + vout: 2, }, { addrID: addressToOutput("2NDyqJpHvHnqNtL1F9xAeCWMAW8WLJmEMyD", parser), valueSat: *big.NewInt(7011000000), + vout: 3, }, }, - outputs: []txAddress{ + outputs: []txOutput{ { addrID: addressToOutput("2MuwoFGwABMakU7DCpdGDAKzyj2nTyRagDP", parser), valueSat: *big.NewInt(5011000000), @@ -830,15 +835,16 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) { }, { name: "empty address", - hex: "01000204d2020002162e010162", + hex: "01000204d201020002162e010162", data: &txAddresses{ - inputs: []txAddress{ + inputs: []txInput{ { addrID: []byte{}, valueSat: *big.NewInt(1234), + vout: 1, }, }, - outputs: []txAddress{ + outputs: []txOutput{ { addrID: []byte{}, valueSat: *big.NewInt(5678), @@ -855,8 +861,8 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) { name: "empty", hex: "0000", data: &txAddresses{ - inputs: []txAddress{}, - outputs: []txAddress{}, + inputs: []txInput{}, + outputs: []txOutput{}, }, }, }