From ed027a68c9d63592437a0635347d81dc8bff010c Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 24 Apr 2018 12:00:24 +0200 Subject: [PATCH] Implement DisconnectBlocks in index v2 - WIP --- db/rocksdb.go | 93 ++++++++++++++++++++++++++++++++-------------- db/rocksdb_test.go | 76 +++++++++++++++++++++++++++---------- 2 files changed, 122 insertions(+), 47 deletions(-) diff --git a/db/rocksdb.go b/db/rocksdb.go index 38e30e0c..64552f6f 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -248,15 +248,23 @@ type outpoint struct { vout int32 } -func packBlockAddress(addrID []byte) []byte { +func (d *RocksDB) packBlockAddress(addrID []byte, removedUnspentTxs map[string][]outpoint) []byte { vBuf := make([]byte, vlq.MaxLen32) vl := packVarint(int32(len(addrID)), vBuf) blockAddress := append([]byte(nil), vBuf[:vl]...) blockAddress = append(blockAddress, addrID...) + if removedUnspentTxs == nil { + } else { + addrUnspentTxs := removedUnspentTxs[string(addrID)] + vl = packVarint(int32(len(addrUnspentTxs)), vBuf) + blockAddress = append(blockAddress, vBuf[:vl]...) + buf := d.packOutpoints(addrUnspentTxs) + blockAddress = append(blockAddress, buf...) + } return blockAddress } -func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint) error { +func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint, removedUnspentTxs map[string][]outpoint) error { keep := d.chainParser.KeepBlockAddresses() blockAddresses := make([]byte, 0) for addrID, outpoints := range records { @@ -268,16 +276,12 @@ func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Bl } switch op { case opInsert: - val, err := d.packOutpoints(outpoints) - if err != nil { - glog.Warningf("rocksdb: packOutputValue: %v", err) - continue - } + val := d.packOutpoints(outpoints) wb.PutCF(d.cfh[cfAddresses], key, val) if keep > 0 { // collect all addresses be stored in blockaddresses // they are used in disconnect blocks - blockAddress := packBlockAddress(baddrID) + blockAddress := d.packBlockAddress(baddrID, removedUnspentTxs) blockAddresses = append(blockAddresses, blockAddress...) } case opDelete: @@ -380,6 +384,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo } addresses := make(map[string][]outpoint) unspentTxs := make(map[string][]byte) + removedUnspentTxs := make(map[string][]outpoint) btxIDs := make([][]byte, len(block.Txs)) // first process all outputs, build mapping of addresses to outpoints and mappings of unspent txs to addresses for txi, tx := range block.Txs { @@ -438,6 +443,9 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo glog.Warningf("rocksdb: height %d, tx %v vin %v in inputs but missing in unspentTxs", block.Height, tx.Txid, i) continue } + rut := removedUnspentTxs[string(addrID)] + rut = append(rut, outpoint{btxID, int32(input.Vout)}) + removedUnspentTxs[string(addrID)] = rut err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height) if err != nil { return err @@ -445,7 +453,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo unspentTxs[stxID] = unspentAddrs } } - if err := d.writeAddressRecords(wb, block, op, addresses); err != nil { + if err := d.writeAddressRecords(wb, block, op, addresses, removedUnspentTxs); err != nil { return err } // save unspent txs from current block @@ -495,27 +503,34 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain. } } } - return d.writeAddressRecords(wb, block, op, addresses) + return d.writeAddressRecords(wb, block, op, addresses, nil) } -func unpackBlockAddresses(buf []byte) ([][]byte, error) { +func (d *RocksDB) unpackBlockAddresses(buf []byte) ([][]byte, [][]outpoint, error) { addresses := make([][]byte, 0) + outpointsArray := make([][]outpoint, 0) // the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints for i := 0; i < len(buf); { l, lv := unpackVarint(buf[i:]) j := i + int(l) + lv if j > len(buf) { glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf)) - return nil, errors.New("Inconsistent data in blockAddresses") + return nil, nil, errors.New("Inconsistent data in blockAddresses") } addrID := append([]byte(nil), buf[i+lv:j]...) + outpoints, ol, err := d.unpackNOutpoints(buf[j:]) + if err != nil { + glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf)) + return nil, nil, errors.New("Inconsistent data in blockAddresses") + } addresses = append(addresses, addrID) - i = j + outpointsArray = append(outpointsArray, outpoints) + i = j + ol } - return addresses, nil + return addresses, outpointsArray, nil } -func (d *RocksDB) packOutpoints(outpoints []outpoint) ([]byte, error) { +func (d *RocksDB) packOutpoints(outpoints []outpoint) []byte { buf := make([]byte, 0) bvout := make([]byte, vlq.MaxLen32) for _, o := range outpoints { @@ -523,25 +538,45 @@ func (d *RocksDB) packOutpoints(outpoints []outpoint) ([]byte, error) { buf = append(buf, []byte(o.btxID)...) buf = append(buf, bvout[:l]...) } - return buf, nil + return buf } func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) { txidUnpackedLen := d.chainParser.PackedTxidLen() outpoints := make([]outpoint, 0) for i := 0; i < len(buf); { - btxid := buf[i : i+txidUnpackedLen] + btxID := buf[i : i+txidUnpackedLen] i += txidUnpackedLen vout, voutLen := unpackVarint(buf[i:]) i += voutLen outpoints = append(outpoints, outpoint{ - btxID: btxid, + btxID: btxID, vout: vout, }) } return outpoints, nil } +func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) { + txidUnpackedLen := d.chainParser.PackedTxidLen() + n, p := unpackVarint(buf) + outpoints := make([]outpoint, n) + for i := int32(0); i < n; i++ { + if p+txidUnpackedLen >= len(buf) { + return nil, 0, errors.New("Inconsistent data in unpackNOutpoints") + } + btxID := buf[p : p+txidUnpackedLen] + p += txidUnpackedLen + vout, voutLen := unpackVarint(buf[p:]) + p += voutLen + outpoints[i] = outpoint{ + btxID: btxID, + vout: vout, + } + } + return outpoints, p, nil +} + func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) { btxid, err := d.chainParser.PackTxid(txid) if err != nil { @@ -611,17 +646,17 @@ func (d *RocksDB) writeHeight( return nil } -func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, error) { +func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, [][]outpoint, error) { b, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key) if err != nil { - return nil, err + return nil, nil, err } defer b.Free() // block is missing in DB if b.Data() == nil { - return nil, errors.New("Block addresses missing") + return nil, nil, errors.New("Block addresses missing") } - return unpackBlockAddresses(b.Data()) + return d.unpackBlockAddresses(b.Data()) } func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) { @@ -673,21 +708,25 @@ func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][] // it finds the data in blockaddresses column if available, // otherwise by doing quite slow full scan of addresses column func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { + + // TODO - it is still a mess + glog.Infof("db: disconnecting blocks %d-%d", lower, higher) addrKeys := [][]byte{} addrValues := [][]byte{} + addrUnspentOutpoints := [][]outpoint{} keep := d.chainParser.KeepBlockAddresses() var err error doFullScan := true if keep > 0 { for height := lower; height <= higher; height++ { key := packUint(height) - addresses, err := d.getBlockAddresses(key) + addresses, unspentOutpoints, err := d.getBlockAddresses(key) if err != nil { glog.Error(err) goto GoFullScan } - for _, addrID := range addresses { + for i, addrID := range addresses { addrKey := append(addrID, key...) val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey) if err != nil { @@ -695,6 +734,7 @@ func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { } addrKeys = append(addrKeys, addrKey) addrValue := append([]byte(nil), val.Data()...) + addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i]) val.Free() addrValues = append(addrValues, addrValue) } @@ -717,10 +757,7 @@ func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { glog.Info("address ", hex.EncodeToString(addrKey)) } wb.DeleteCF(d.cfh[cfAddresses], addrKey) - outpoints, err := d.unpackOutpoints(addrValues[i]) - if err != nil { - return err - } + outpoints := addrUnspentOutpoints[i] addrID, height, err := unpackAddressKey(addrKey) if err != nil { return err diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index d37be34c..d354643d 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -16,6 +16,10 @@ import ( "github.com/juju/errors" ) +// simplified explanation of signed varint packing, used in many index data structures +// for number n, the packing is: 2*n if n>=0 else 2*(-n)-1 +// take only 1 byte if abs(n)<127 + func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB { tmp, err := ioutil.TempDir("", "testdb") if err != nil { @@ -45,7 +49,6 @@ func addressToPubKeyHex(addr string, t *testing.T, d *RocksDB) string { func addressToPubKeyHexWithLength(addr string, t *testing.T, d *RocksDB) string { h := addressToPubKeyHex(addr, t, d) - // length is signed varint, therefore 2 times big, we can take len(h) as the correct value return strconv.FormatInt(int64(len(h)), 16) + h } @@ -56,14 +59,18 @@ type keyPair struct { CompareFunc func(string) bool } -func compareFuncBlockAddresses(v string, expected []string) bool { +func compareFuncBlockAddresses(t *testing.T, v string, expected []string) bool { for _, e := range expected { lb := len(v) v = strings.Replace(v, e, "", 1) if lb == len(v) { + t.Error(e, " not found in ", v) return false } } + if len(v) != 0 { + t.Error("not expected content ", v) + } return len(v) == 0 } @@ -262,11 +269,11 @@ func verifyAfterUTXOBlock1(t *testing.T, d *RocksDB, noBlockAddresses bool) { blockAddressesKp = []keyPair{ keyPair{"000370d5", "", func(v string) bool { - return compareFuncBlockAddresses(v, []string{ - addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d), - addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d), - addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d), - addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d), + return compareFuncBlockAddresses(t, v, []string{ + addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d) + "00", + addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d) + "00", + addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d) + "00", + addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d) + "00", }) }, }, @@ -329,14 +336,14 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) { if err := checkColumn(d, cfBlockAddresses, []keyPair{ keyPair{"000370d6", "", func(v string) bool { - return compareFuncBlockAddresses(v, []string{ - addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d), - addressToPubKeyHexWithLength("mtR97eM2HPWVM6c8FGLGcukgaHHQv7THoL", t, d), - addressToPubKeyHexWithLength("mwwoKQE5Lb1G4picHSHDQKg8jw424PF9SC", t, d), - addressToPubKeyHexWithLength("mmJx9Y8ayz9h14yd9fgCW1bUKoEpkBAquP", t, d), - addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d), - addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d), - addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d), + return compareFuncBlockAddresses(t, v, []string{ + addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d) + "02" + "7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25" + "00", + addressToPubKeyHexWithLength("mtR97eM2HPWVM6c8FGLGcukgaHHQv7THoL", t, d) + "00", + addressToPubKeyHexWithLength("mwwoKQE5Lb1G4picHSHDQKg8jw424PF9SC", t, d) + "00", + addressToPubKeyHexWithLength("mmJx9Y8ayz9h14yd9fgCW1bUKoEpkBAquP", t, d) + "00", + addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d) + "02" + "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75" + "00", + addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d) + "02" + "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840" + "02", + addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d) + "02" + "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75" + "02", }) }, }, @@ -570,7 +577,14 @@ func Test_findAndRemoveUnspentAddr(t *testing.T) { } } +type hexoutpoint struct { + txID string + vout int32 +} + func Test_unpackBlockAddresses(t *testing.T) { + d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}}) + defer closeAnddestroyRocksDB(t, d) type args struct { buf string } @@ -578,12 +592,25 @@ func Test_unpackBlockAddresses(t *testing.T) { name string args args want []string + want2 [][]hexoutpoint wantErr bool }{ { name: "1", - args: args{"029c10517a011588745287127093935888939356870e646351670068680e765193518800870a7b7b0115873276a9144150837fb91d9461d6b95059842ab85262c2923f88ac08636751680457870291"}, - want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087", "7b7b011587", "76a9144150837fb91d9461d6b95059842ab85262c2923f88ac", "63675168", "5787", "91"}, + args: args{"029c0010517a011588745287047c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d250000b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400612709393588893935687000e64635167006868000e7651935188008702effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7502"}, + want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087"}, + want2: [][]hexoutpoint{ + []hexoutpoint{}, + []hexoutpoint{ + hexoutpoint{"7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25", 0}, + hexoutpoint{"00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840", 3}, + }, + []hexoutpoint{}, + []hexoutpoint{}, + []hexoutpoint{ + hexoutpoint{"effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75", 1}, + }, + }, }, } for _, tt := range tests { @@ -592,7 +619,7 @@ func Test_unpackBlockAddresses(t *testing.T) { if err != nil { panic(err) } - got, err := unpackBlockAddresses(b) + got, got2, err := d.unpackBlockAddresses(b) if (err != nil) != tt.wantErr { t.Errorf("unpackBlockAddresses() error = %v, wantErr %v", err, tt.wantErr) return @@ -602,7 +629,18 @@ func Test_unpackBlockAddresses(t *testing.T) { h[i] = hex.EncodeToString(g) } if !reflect.DeepEqual(h, tt.want) { - t.Errorf("unpackBlockAddresses() = %v, want %v", got, tt.want) + t.Errorf("unpackBlockAddresses() = %v, want %v", h, tt.want) + } + h2 := make([][]hexoutpoint, len(got2)) + for i, g := range got2 { + ho := make([]hexoutpoint, len(g)) + for j, o := range g { + ho[j] = hexoutpoint{hex.EncodeToString(o.btxID), o.vout} + } + h2[i] = ho + } + if !reflect.DeepEqual(h2, tt.want2) { + t.Errorf("unpackBlockAddresses() = %v, want %v", h2, tt.want2) } }) }