From 22af986121ced0ee0efb215ac1f9cd5dbedbd627 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Thu, 26 Apr 2018 19:50:22 +0200 Subject: [PATCH] Implement DisconnectBlocks in index v2 - WIP --- Gopkg.lock | 2 +- db/rocksdb.go | 134 +++++++++++++++++++++------------------------ db/rocksdb_test.go | 32 +++++++++-- 3 files changed, 89 insertions(+), 79 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index a40eb931..9bd61bee 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -190,6 +190,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "e632a1e904953397e9eae00f30a86bffab2d303232c7bac47a16e1ce663043bf" + inputs-digest = "3e3bcaeb80d40bd8073342d32dbc57e4266fba7b8dfa00fc90bc6184e03ab96f" solver-name = "gps-cdcl" solver-version = 1 diff --git a/db/rocksdb.go b/db/rocksdb.go index 64552f6f..9c220b40 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -148,14 +148,8 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f return err } - kstart, err := packAddressKey(addrID, lower) - if err != nil { - return err - } - kstop, err := packAddressKey(addrID, higher) - if err != nil { - return err - } + kstart := packAddressKey(addrID, lower) + kstop := packAddressKey(addrID, higher) it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses]) defer it.Close() @@ -248,14 +242,14 @@ type outpoint struct { vout int32 } -func (d *RocksDB) packBlockAddress(addrID []byte, removedUnspentTxs map[string][]outpoint) []byte { +func (d *RocksDB) packBlockAddress(addrID []byte, spentTxs map[string][]outpoint) []byte { vBuf := make([]byte, vlq.MaxLen32) vl := packVarint(int32(len(addrID)), vBuf) blockAddress := append([]byte(nil), vBuf[:vl]...) blockAddress = append(blockAddress, addrID...) - if removedUnspentTxs == nil { + if spentTxs == nil { } else { - addrUnspentTxs := removedUnspentTxs[string(addrID)] + addrUnspentTxs := spentTxs[string(addrID)] vl = packVarint(int32(len(addrUnspentTxs)), vBuf) blockAddress = append(blockAddress, vBuf[:vl]...) buf := d.packOutpoints(addrUnspentTxs) @@ -264,16 +258,12 @@ func (d *RocksDB) packBlockAddress(addrID []byte, removedUnspentTxs map[string][ return blockAddress } -func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint, removedUnspentTxs map[string][]outpoint) error { +func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, addresses map[string][]outpoint, spentTxs map[string][]outpoint) error { keep := d.chainParser.KeepBlockAddresses() blockAddresses := make([]byte, 0) - for addrID, outpoints := range records { + for addrID, outpoints := range addresses { baddrID := []byte(addrID) - key, err := packAddressKey(baddrID, block.Height) - if err != nil { - glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID) - continue - } + key := packAddressKey(baddrID, block.Height) switch op { case opInsert: val := d.packOutpoints(outpoints) @@ -281,7 +271,7 @@ func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Bl if keep > 0 { // collect all addresses be stored in blockaddresses // they are used in disconnect blocks - blockAddress := d.packBlockAddress(baddrID, removedUnspentTxs) + blockAddress := d.packBlockAddress(baddrID, spentTxs) blockAddresses = append(blockAddresses, blockAddress...) } case opDelete: @@ -289,7 +279,7 @@ func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Bl } } if keep > 0 && op == opInsert { - // write new block address + // write new block address and txs spent in this block key := packUint(block.Height) wb.PutCF(d.cfh[cfBlockAddresses], key, blockAddresses) // cleanup old block address @@ -384,7 +374,6 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo } addresses := make(map[string][]outpoint) unspentTxs := make(map[string][]byte) - removedUnspentTxs := make(map[string][]outpoint) btxIDs := make([][]byte, len(block.Txs)) // first process all outputs, build mapping of addresses to outpoints and mappings of unspent txs to addresses for txi, tx := range block.Txs { @@ -412,7 +401,9 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo } unspentTxs[string(btxID)] = txAddrs } - // locate addresses spent by this tx and add them to addresses map them in format txid ^index + // locate addresses spent by this tx and remove them from unspent addresses + // keep them so that they be stored for DisconnectBlock functionality + spentTxs := make(map[string][]outpoint) for txi, tx := range block.Txs { spendingTxid := btxIDs[txi] for i, input := range tx.Vin { @@ -428,6 +419,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo stxID := string(btxID) unspentAddrs, inThisBlock := unspentTxs[stxID] if !inThisBlock { + // else find it in previous blocks unspentAddrs, err = d.getUnspentTx(btxID) if err != nil { return err @@ -443,9 +435,14 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo glog.Warningf("rocksdb: height %d, tx %v vin %v in inputs but missing in unspentTxs", block.Height, tx.Txid, i) continue } - rut := removedUnspentTxs[string(addrID)] - rut = append(rut, outpoint{btxID, int32(input.Vout)}) - removedUnspentTxs[string(addrID)] = rut + // record what was removed from unspentTx + // skip transactions that were created in this block + saddrID := string(addrID) + if _, exists := addresses[saddrID]; !exists { + rut := spentTxs[saddrID] + rut = append(rut, outpoint{btxID, int32(input.Vout)}) + spentTxs[saddrID] = rut + } err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height) if err != nil { return err @@ -453,7 +450,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo unspentTxs[stxID] = unspentAddrs } } - if err := d.writeAddressRecords(wb, block, op, addresses, removedUnspentTxs); err != nil { + if err := d.writeAddressRecords(wb, block, op, addresses, spentTxs); err != nil { return err } // save unspent txs from current block @@ -545,7 +542,7 @@ func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) { txidUnpackedLen := d.chainParser.PackedTxidLen() outpoints := make([]outpoint, 0) for i := 0; i < len(buf); { - btxID := buf[i : i+txidUnpackedLen] + btxID := append([]byte(nil), buf[i:i+txidUnpackedLen]...) i += txidUnpackedLen vout, voutLen := unpackVarint(buf[i:]) i += voutLen @@ -565,7 +562,7 @@ func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) { if p+txidUnpackedLen >= len(buf) { return nil, 0, errors.New("Inconsistent data in unpackNOutpoints") } - btxID := buf[p : p+txidUnpackedLen] + btxID := append([]byte(nil), buf[p:p+txidUnpackedLen]...) p += txidUnpackedLen vout, voutLen := unpackVarint(buf[p:]) p += voutLen @@ -659,7 +656,7 @@ func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, [][]outpoint, error) return d.unpackBlockAddresses(b.Data()) } -func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) { +func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) { glog.Infof("db: doing full scan of addresses column") addrKeys := [][]byte{} addrValues := [][]byte{} @@ -708,81 +705,74 @@ func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][] // it finds the data in blockaddresses column if available, // otherwise by doing quite slow full scan of addresses column func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { - - // TODO - it is still a mess - glog.Infof("db: disconnecting blocks %d-%d", lower, higher) addrKeys := [][]byte{} - addrValues := [][]byte{} + addrOutpoints := [][]byte{} addrUnspentOutpoints := [][]outpoint{} keep := d.chainParser.KeepBlockAddresses() var err error - doFullScan := true if keep > 0 { for height := lower; height <= higher; height++ { - key := packUint(height) - addresses, unspentOutpoints, err := d.getBlockAddresses(key) + addresses, unspentOutpoints, err := d.getBlockAddresses(packUint(height)) if err != nil { glog.Error(err) - goto GoFullScan + return err } for i, addrID := range addresses { - addrKey := append(addrID, key...) + addrKey := packAddressKey(addrID, height) val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey) if err != nil { - goto GoFullScan + glog.Error(err) + return err } addrKeys = append(addrKeys, addrKey) - addrValue := append([]byte(nil), val.Data()...) - addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i]) + av := append([]byte(nil), val.Data()...) val.Free() - addrValues = append(addrValues, addrValue) + addrOutpoints = append(addrOutpoints, av) + addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i]) } } - doFullScan = false - GoFullScan: - } - if doFullScan { - addrKeys, addrValues, err = d.fullAddressesScan(lower, higher) + } else { + addrKeys, addrOutpoints, err = d.allAddressesScan(lower, higher) if err != nil { return err } } + glog.Infof("rocksdb: about to disconnect %d addresses ", len(addrKeys)) wb := gorocksdb.NewWriteBatch() defer wb.Destroy() unspentTxs := make(map[string][]byte) - for i, addrKey := range addrKeys { + for addrIndex, addrKey := range addrKeys { if glog.V(2) { glog.Info("address ", hex.EncodeToString(addrKey)) } + // delete address:height from the index wb.DeleteCF(d.cfh[cfAddresses], addrKey) - outpoints := addrUnspentOutpoints[i] - addrID, height, err := unpackAddressKey(addrKey) + addrID, _, err := unpackAddressKey(addrKey) + if err != nil { + return err + } + // recreate unspentTxs, which were spent by this block (that is being disconnected) + for _, o := range addrUnspentOutpoints[addrIndex] { + stxID := string(o.btxID) + txAddrs, exists := unspentTxs[stxID] + if !exists { + txAddrs, err = d.getUnspentTx(o.btxID) + if err != nil { + return err + } + } + txAddrs = appendPackedAddrID(txAddrs, addrID, uint32(o.vout), 1) + unspentTxs[stxID] = txAddrs + } + // delete unspentTxs from this block + outpoints, err := d.unpackOutpoints(addrOutpoints[addrIndex]) if err != nil { return err } for _, o := range outpoints { - if glog.V(2) { - glog.Info("tx ", height, " ", hex.EncodeToString(o.btxID), " ", o.vout) - } - // recreate unspentTxs from inputs - if o.vout < 0 { - stxID := string(o.btxID) - txAddrs, exists := unspentTxs[stxID] - if !exists { - txAddrs, err = d.getUnspentTx(o.btxID) - if err != nil { - return err - } - } - txAddrs = appendPackedAddrID(txAddrs, addrID, uint32(^o.vout), 1) - unspentTxs[stxID] = txAddrs - } else { - // remove from cfUnspentTxs - wb.DeleteCF(d.cfh[cfUnspentTxs], o.btxID) - } - // delete cached transaction + wb.DeleteCF(d.cfh[cfUnspentTxs], o.btxID) wb.DeleteCF(d.cfh[cfTransactions], o.btxID) } } @@ -869,12 +859,12 @@ func (d *RocksDB) DeleteTx(txid string) error { // Helpers -func packAddressKey(addrID []byte, height uint32) ([]byte, error) { +func packAddressKey(addrID []byte, height uint32) []byte { bheight := packUint(height) buf := make([]byte, 0, len(addrID)+len(bheight)) buf = append(buf, addrID...) buf = append(buf, bheight...) - return buf, nil + return buf } func unpackAddressKey(key []byte) ([]byte, uint32, error) { diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index d354643d..4c99cebd 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -83,11 +83,11 @@ func checkColumn(d *RocksDB, col int, kp []keyPair) error { i := 0 for it.SeekToFirst(); it.Valid(); it.Next() { if i >= len(kp) { - return errors.Errorf("Expected less rows in column %v", col) + return errors.Errorf("Expected less rows in column %v", cfNames[col]) } key := hex.EncodeToString(it.Key().Data()) if key != kp[i].Key { - return errors.Errorf("Incorrect key %v found in column %v row %v, expecting %v", key, col, i, kp[i].Key) + return errors.Errorf("Incorrect key %v found in column %v row %v, expecting %v", key, cfNames[col], i, kp[i].Key) } val := hex.EncodeToString(it.Value().Data()) var valOK bool @@ -97,12 +97,12 @@ func checkColumn(d *RocksDB, col int, kp []keyPair) error { valOK = kp[i].CompareFunc(val) } if !valOK { - return errors.Errorf("Incorrect value %v found in column %v row %v, expecting %v", val, col, i, kp[i].Value) + return errors.Errorf("Incorrect value %v found in column %v row %v, expecting %v", val, cfNames[col], i, kp[i].Value) } i++ } if i != len(kp) { - return errors.Errorf("Expected more rows in column %v: got %v, expected %v", col, i, len(kp)) + return errors.Errorf("Expected more rows in column %v: got %v, expected %v", cfNames[col], i, len(kp)) } return nil } @@ -337,7 +337,7 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) { keyPair{"000370d6", "", func(v string) bool { return compareFuncBlockAddresses(t, v, []string{ - addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d) + "02" + "7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25" + "00", + addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d) + "00", //+ "7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25" + "00", addressToPubKeyHexWithLength("mtR97eM2HPWVM6c8FGLGcukgaHHQv7THoL", t, d) + "00", addressToPubKeyHexWithLength("mwwoKQE5Lb1G4picHSHDQKg8jw424PF9SC", t, d) + "00", addressToPubKeyHexWithLength("mmJx9Y8ayz9h14yd9fgCW1bUKoEpkBAquP", t, d) + "00", @@ -422,7 +422,7 @@ func testTxCache(t *testing.T, d *RocksDB, b *bchain.Block, tx *bchain.Tx) { // 4) Test tx caching functionality // 5) Disconnect block 2 - expect error // 6) Disconnect the block 2 using blockaddresses column -// 7) Reconnect block 2 and disconnect blocks 1 and 2 using full scan +// 7) Reconnect block 2 and disconnect blocks 1 and 2 using full scan - expect error // After each step, the content of DB is examined and any difference against expected state is regarded as failure func TestRocksDB_Index_UTXO(t *testing.T) { d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}}) @@ -612,6 +612,26 @@ func Test_unpackBlockAddresses(t *testing.T) { }, }, }, + { + name: "1", + args: args{"3276A914B434EB0C1A3B7A02E8A29CC616E791EF1E0BF51F88AC003276A9143F8BA3FDA3BA7B69F5818086E12223C6DD25E3C888AC003276A914A08EAE93007F22668AB5E4A9C83C8CD1C325E3E088AC02EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75003276A9148BDF0AA3C567AA5975C2E61321B8BEBBE7293DF688AC0200B2C06055E5E90E9C82BD4181FDE310104391A7FA4F289B1704E5D90CAA3840022EA9144A21DB08FB6882CB152E1FF06780A430740F77048702EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75023276A914CCAAAF374E1B06CB83118453D102587B4273D09588AC003276A9148D802C045445DF49613F6A70DDD2E48526F3701F88AC00"}, + want: []string{"76a914b434eb0c1a3b7a02e8a29cc616e791ef1e0bf51f88ac", "76a9143f8ba3fda3ba7b69f5818086e12223c6dd25e3c888ac", "76a914a08eae93007f22668ab5e4a9c83c8cd1c325e3e088ac", "76a9148bdf0aa3c567aa5975c2e61321b8bebbe7293df688ac", "a9144a21db08fb6882cb152e1ff06780a430740f770487", "76a914ccaaaf374e1b06cb83118453d102587b4273d09588ac", "76a9148d802c045445df49613f6a70ddd2e48526f3701f88ac"}, + want2: [][]hexoutpoint{ + []hexoutpoint{}, + []hexoutpoint{}, + []hexoutpoint{ + hexoutpoint{"effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75", 0}, + }, + []hexoutpoint{ + hexoutpoint{"00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840", 1}, + }, + []hexoutpoint{ + hexoutpoint{"effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75", 1}, + }, + []hexoutpoint{}, + []hexoutpoint{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {