Implement index v3 WIP

This commit is contained in:
Martin Boehm 2018-08-15 19:22:26 +02:00
parent 78d2c085a9
commit f5b29b3a17
2 changed files with 167 additions and 60 deletions

View File

@ -277,15 +277,21 @@ type outpoint struct {
index int32
}
type txAddress struct {
type txInput struct {
addrID []byte
vout uint32
valueSat big.Int
}
type txOutput struct {
addrID []byte
spent bool
valueSat big.Int
}
type txAddresses struct {
inputs []txAddress
outputs []txAddress
inputs []txInput
outputs []txOutput
}
type addrBalance struct {
@ -294,6 +300,17 @@ type addrBalance struct {
balanceSat big.Int
}
func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText string) {
ad, err := d.chainParser.OutputScriptToAddresses(addrID)
had := hex.EncodeToString(addrID)
if err != nil {
glog.Warningf("rocksdb: unparsable address hex '%v' reached negative %s %v, resetting to 0. Parser error %v", had, logText, valueSat.String(), err)
} else {
glog.Warningf("rocksdb: address %v hex '%v' reached negative %s %v, resetting to 0", ad, had, logText, valueSat.String())
}
valueSat.SetInt64(0)
}
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error {
addresses := make(map[string][]outpoint)
blockTxids := make([][]byte, len(block.Txs))
@ -307,7 +324,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
}
blockTxids[txi] = btxID
ta := txAddresses{}
ta.outputs = make([]txAddress, len(tx.Vout))
ta.outputs = make([]txOutput, len(tx.Vout))
txAddressesMap[string(btxID)] = &ta
for i, output := range tx.Vout {
tao := &ta.outputs[i]
@ -358,7 +375,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
for txi, tx := range block.Txs {
spendingTxid := blockTxids[txi]
ta := txAddressesMap[string(spendingTxid)]
ta.inputs = make([]txAddress, len(tx.Vin))
ta.inputs = make([]txInput, len(tx.Vin))
for i, input := range tx.Vin {
tai := &ta.inputs[i]
btxID, err := d.chainParser.PackTxid(input.Txid)
@ -392,6 +409,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
continue
}
tai.addrID = ot.addrID
tai.vout = input.Vout
tai.valueSat = ot.valueSat
// mark the output as spent in tx
ot.spent = true
@ -427,14 +445,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
}
ab.balanceSat.Sub(&ab.balanceSat, &ot.valueSat)
if ab.balanceSat.Sign() < 0 {
ad, err := d.chainParser.OutputScriptToAddresses(ot.addrID)
had := hex.EncodeToString(ot.addrID)
if err != nil {
glog.Warningf("rocksdb: unparsable address hex '%v' reached negative balance %v, resetting to 0. Parser error %v", had, ab.balanceSat.String(), err)
} else {
glog.Warningf("rocksdb: address %v hex '%v' reached negative balance %v, resetting to 0", ad, had, ab.balanceSat.String())
}
ab.balanceSat.SetInt64(0)
d.resetValueSatToZero(&ab.balanceSat, ot.addrID, "balance")
}
ab.sentSat.Add(&ab.sentSat, &ot.valueSat)
}
@ -584,25 +595,37 @@ func packTxAddresses(ta *txAddresses, buf []byte, varBuf []byte) []byte {
l := packVaruint(uint(len(ta.inputs)), varBuf)
buf = append(buf, varBuf[:l]...)
for i := range ta.inputs {
buf = appendTxAddress(&ta.inputs[i], buf, varBuf)
buf = appendTxInput(&ta.inputs[i], buf, varBuf)
}
l = packVaruint(uint(len(ta.outputs)), varBuf)
buf = append(buf, varBuf[:l]...)
for i := range ta.outputs {
buf = appendTxAddress(&ta.outputs[i], buf, varBuf)
buf = appendTxOutput(&ta.outputs[i], buf, varBuf)
}
return buf
}
func appendTxAddress(txa *txAddress, buf []byte, varBuf []byte) []byte {
la := len(txa.addrID)
if txa.spent {
func appendTxInput(txi *txInput, buf []byte, varBuf []byte) []byte {
la := len(txi.addrID)
l := packVarint(la, varBuf)
buf = append(buf, varBuf[:l]...)
buf = append(buf, txi.addrID...)
l = packBigint(&txi.valueSat, varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(txi.vout), varBuf)
buf = append(buf, varBuf[:l]...)
return buf
}
func appendTxOutput(txo *txOutput, buf []byte, varBuf []byte) []byte {
la := len(txo.addrID)
if txo.spent {
la = ^la
}
l := packVarint(la, varBuf)
buf = append(buf, varBuf[:l]...)
buf = append(buf, txa.addrID...)
l = packBigint(&txa.valueSat, varBuf)
buf = append(buf, txo.addrID...)
l = packBigint(&txo.valueSat, varBuf)
buf = append(buf, varBuf[:l]...)
return buf
}
@ -610,29 +633,41 @@ func appendTxAddress(txa *txAddress, buf []byte, varBuf []byte) []byte {
func unpackTxAddresses(buf []byte) (*txAddresses, error) {
ta := txAddresses{}
inputs, l := unpackVaruint(buf)
ta.inputs = make([]txAddress, inputs)
ta.inputs = make([]txInput, inputs)
for i := uint(0); i < inputs; i++ {
l += unpackTxAddress(&ta.inputs[i], buf[l:])
l += unpackTxInput(&ta.inputs[i], buf[l:])
}
outputs, ll := unpackVaruint(buf[l:])
l += ll
ta.outputs = make([]txAddress, outputs)
ta.outputs = make([]txOutput, outputs)
for i := uint(0); i < outputs; i++ {
l += unpackTxAddress(&ta.outputs[i], buf[l:])
l += unpackTxOutput(&ta.outputs[i], buf[l:])
}
return &ta, nil
}
func unpackTxAddress(ta *txAddress, buf []byte) int {
func unpackTxInput(ti *txInput, buf []byte) int {
al, l := unpackVarint(buf)
ti.addrID = make([]byte, al)
copy(ti.addrID, buf[l:l+al])
al += l
ti.valueSat, l = unpackBigint(buf[al:])
al += l
v, l := unpackVaruint(buf[al:])
ti.vout = uint32(v)
return l + al
}
func unpackTxOutput(to *txOutput, buf []byte) int {
al, l := unpackVarint(buf)
if al < 0 {
ta.spent = true
to.spent = true
al = ^al
}
ta.addrID = make([]byte, al)
copy(ta.addrID, buf[l:l+al])
to.addrID = make([]byte, al)
copy(to.addrID, buf[l:l+al])
al += l
ta.valueSat, l = unpackBigint(buf[al:])
to.valueSat, l = unpackBigint(buf[al:])
return l + al
}
@ -759,11 +794,7 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
return d.chainParser.UnpackBlockHash(val.Data())
}
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *bchain.Block,
op int,
) error {
func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
key := packUint(block.Height)
switch op {
@ -776,7 +807,6 @@ func (d *RocksDB) writeHeight(
case opDelete:
wb.DeleteCF(d.cfh[cfHeight], key)
}
return nil
}
@ -827,12 +857,46 @@ func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]b
return addrKeys, addrValues, nil
}
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses, txAddressesToUpdate map[string]*txAddresses, txsToDelete map[string]struct{}, balances map[string]*addrBalance) error {
findBalance := func(addrID []byte) (*addrBalance, error) {
// get all transactions of the given address and match it to input to find spent output
func (d *RocksDB) findSpentTx(ti *txInput) ([]byte, *txAddresses, error) {
start := packAddressKey(ti.addrID, 0)
stop := packAddressKey(ti.addrID, ^uint32(0))
it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses])
defer it.Close()
for it.Seek(start); it.Valid(); it.Next() {
key := it.Key().Data()
val := it.Value().Data()
if bytes.Compare(key, stop) > 0 {
break
}
outpoints, err := d.unpackOutpoints(val)
if err != nil {
return nil, nil, err
}
for _, o := range outpoints {
// process only outputs that match
if o.index >= 0 && uint32(o.index) == ti.vout {
a, err := d.getTxAddresses(o.btxID)
if err != nil {
return nil, nil, err
}
if bytes.Equal(a.outputs[o.index].addrID, ti.addrID) {
return o.btxID, a, nil
}
}
}
}
return nil, nil, nil
}
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses,
txAddressesToUpdate map[string]*txAddresses, balances map[string]*addrBalance) error {
addresses := make(map[string]struct{})
getAddressBalance := func(addrID []byte) (*addrBalance, error) {
var err error
s := string(addrID)
b, found := balances[s]
if !found {
b, fb := balances[s]
if !fb {
b, err = d.getAddressBalance(addrID)
if err != nil {
return nil, err
@ -842,15 +906,52 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
return b, nil
}
for _, t := range txa.inputs {
b, err := findBalance(t.addrID)
s := string(t.addrID)
_, fa := addresses[s]
if !fa {
addresses[s] = struct{}{}
}
b, err := getAddressBalance(t.addrID)
if err != nil {
return err
}
if b != nil {
// subtract number of txs only once
if !fa {
b.txs--
}
b.sentSat.Sub(&b.sentSat, &t.valueSat)
if b.sentSat.Sign() < 0 {
d.resetValueSatToZero(&b.sentSat, t.addrID, "sent amount")
}
b.balanceSat.Add(&b.balanceSat, &t.valueSat)
}
}
for _, t := range txa.outputs {
s := string(t.addrID)
_, fa := addresses[s]
if !fa {
addresses[s] = struct{}{}
}
b, err := getAddressBalance(t.addrID)
if err != nil {
return err
}
if b != nil {
// subtract number of txs only once
if !fa {
b.txs--
}
b.balanceSat.Sub(&b.balanceSat, &t.valueSat)
if b.balanceSat.Sign() < 0 {
d.resetValueSatToZero(&b.balanceSat, t.addrID, "balance")
}
}
}
for a := range addresses {
key := packAddressKey([]byte(a), height)
wb.DeleteCF(d.cfh[cfAddresses], key)
}
return nil
}
@ -883,7 +984,7 @@ func (d *RocksDB) DisconnectBlockRangeUTXO(lower uint32, higher uint32) error {
}
s := string(txid)
txsToDelete[s] = struct{}{}
if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, txsToDelete, balances); err != nil {
if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, balances); err != nil {
return err
}
}

View File

@ -432,8 +432,8 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) {
keyPair{
txidB2T1,
"02" +
addressToPubKeyHexWithLength(addr3, t, d) + bigintToHex(satB1T2A3) +
addressToPubKeyHexWithLength(addr2, t, d) + bigintToHex(satB1T1A2) +
addressToPubKeyHexWithLength(addr3, t, d) + bigintToHex(satB1T2A3) + "00" +
addressToPubKeyHexWithLength(addr2, t, d) + bigintToHex(satB1T1A2) + "01" +
"02" +
spentAddressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) +
addressToPubKeyHexWithLength(addr7, t, d) + bigintToHex(satB2T1A7),
@ -442,8 +442,8 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) {
keyPair{
txidB2T2,
"02" +
addressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) +
addressToPubKeyHexWithLength(addr4, t, d) + bigintToHex(satB1T2A4) +
addressToPubKeyHexWithLength(addr6, t, d) + bigintToHex(satB2T1A6) + "00" +
addressToPubKeyHexWithLength(addr4, t, d) + bigintToHex(satB1T2A4) + "01" +
"02" +
addressToPubKeyHexWithLength(addr8, t, d) + bigintToHex(satB2T2A8) +
addressToPubKeyHexWithLength(addr9, t, d) + bigintToHex(satB2T2A9),
@ -452,7 +452,7 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) {
keyPair{
txidB2T3,
"01" +
addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB1T2A5) +
addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB1T2A5) + "02" +
"01" +
addressToPubKeyHexWithLength(addr5, t, d) + bigintToHex(satB2T3A5),
nil,
@ -548,7 +548,7 @@ func testTxCache(t *testing.T, d *RocksDB, b *bchain.Block, tx *bchain.Tx) {
// 3) GetBestBlock, GetBlockHash
// 4) Test tx caching functionality
// 5) Disconnect block 2 - expect error
// 6) Disconnect the block 2 using blockaddresses column
// 6) Disconnect the block 2 using BlockTxids column
// 7) Reconnect block 2 and disconnect blocks 1 and 2 using full scan - expect error
// After each step, the content of DB is examined and any difference against expected state is regarded as failure
func TestRocksDB_Index_UTXO(t *testing.T) {
@ -762,19 +762,21 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) {
}{
{
name: "1",
hex: "022c001443aac20a116e09ea4f7914be1c55e4c17aa600b7002c001454633aa8bd2e552bd4e89c01e73c1b7905eb58460811207cb68a199872012d001443aac20a116e09ea4f7914be1c55e4c17aa600b70101",
hex: "022c001443aac20a116e09ea4f7914be1c55e4c17aa600b700e0392c001454633aa8bd2e552bd4e89c01e73c1b7905eb58460811207cb68a19987283bb55012d001443aac20a116e09ea4f7914be1c55e4c17aa600b70101",
data: &txAddresses{
inputs: []txAddress{
inputs: []txInput{
{
addrID: addressToOutput("tb1qgw4vyzs3dcy75nmezjlpc40yc9a2vq9hghdyt2", parser),
valueSat: *big.NewInt(0),
vout: 12345,
},
{
addrID: addressToOutput("tb1q233n429a9e2jh48gnsq7w0qm0yz7kkzx0qczw8", parser),
valueSat: *big.NewInt(1234123421342341234),
vout: 56789,
},
},
outputs: []txAddress{
outputs: []txOutput{
{
addrID: addressToOutput("tb1qgw4vyzs3dcy75nmezjlpc40yc9a2vq9hghdyt2", parser),
valueSat: *big.NewInt(1),
@ -785,23 +787,26 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) {
},
{
name: "2",
hex: "032ea9149eb21980dc9d413d8eac27314938b9da920ee53e8705021918f2c02ea91409f70b896169c37981d2b54b371df0d81a136a2c870501dd7e28c02ea914e371782582a4addb541362c55565d2cdf56f6498870501a1e35ec0052fa9141d9ca71efa36d814424ea6ca1437e67287aebe348705012aadcac02ea91424fbc77cdc62702ade74dcf989c15e5d3f9240bc870501664894c02fa914afbfb74ee994c7d45f6698738bc4226d065266f7870501a1e35ec03276a914d2a37ce20ac9ec4f15dd05a7c6e8e9fbdb99850e88ac043b9943603376a9146b2044146a4438e6e5bfbc65f147afeb64d14fbb88ac05012a05f200",
hex: "032ea9149eb21980dc9d413d8eac27314938b9da920ee53e8705021918f2c0012ea91409f70b896169c37981d2b54b371df0d81a136a2c870501dd7e28c0022ea914e371782582a4addb541362c55565d2cdf56f6498870501a1e35ec003052fa9141d9ca71efa36d814424ea6ca1437e67287aebe348705012aadcac02ea91424fbc77cdc62702ade74dcf989c15e5d3f9240bc870501664894c02fa914afbfb74ee994c7d45f6698738bc4226d065266f7870501a1e35ec03276a914d2a37ce20ac9ec4f15dd05a7c6e8e9fbdb99850e88ac043b9943603376a9146b2044146a4438e6e5bfbc65f147afeb64d14fbb88ac05012a05f200",
data: &txAddresses{
inputs: []txAddress{
inputs: []txInput{
{
addrID: addressToOutput("2N7iL7AvS4LViugwsdjTB13uN4T7XhV1bCP", parser),
valueSat: *big.NewInt(9011000000),
vout: 1,
},
{
addrID: addressToOutput("2Mt9v216YiNBAzobeNEzd4FQweHrGyuRHze", parser),
valueSat: *big.NewInt(8011000000),
vout: 2,
},
{
addrID: addressToOutput("2NDyqJpHvHnqNtL1F9xAeCWMAW8WLJmEMyD", parser),
valueSat: *big.NewInt(7011000000),
vout: 3,
},
},
outputs: []txAddress{
outputs: []txOutput{
{
addrID: addressToOutput("2MuwoFGwABMakU7DCpdGDAKzyj2nTyRagDP", parser),
valueSat: *big.NewInt(5011000000),
@ -830,15 +835,16 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) {
},
{
name: "empty address",
hex: "01000204d2020002162e010162",
hex: "01000204d201020002162e010162",
data: &txAddresses{
inputs: []txAddress{
inputs: []txInput{
{
addrID: []byte{},
valueSat: *big.NewInt(1234),
vout: 1,
},
},
outputs: []txAddress{
outputs: []txOutput{
{
addrID: []byte{},
valueSat: *big.NewInt(5678),
@ -855,8 +861,8 @@ func Test_packTxAddresses_unpackTxAddresses(t *testing.T) {
name: "empty",
hex: "0000",
data: &txAddresses{
inputs: []txAddress{},
outputs: []txAddress{},
inputs: []txInput{},
outputs: []txOutput{},
},
},
}