diff --git a/db/rocksdb.go b/db/rocksdb.go index fd2f33e8..38e30e0c 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -18,7 +18,9 @@ import ( // iterator creates snapshot, which takes lots of resources // when doing huge scan, it is better to close it and reopen from time to time to free the resources const disconnectBlocksRefreshIterator = uint64(1000000) +const packedHeightBytes = 4 +// RepairRocksDB calls RocksDb db repair function func RepairRocksDB(name string) error { glog.Infof("rocksdb: repair") opts := gorocksdb.NewDefaultOptions() @@ -146,11 +148,11 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f return err } - kstart, err := packOutputKey(addrID, lower) + kstart, err := packAddressKey(addrID, lower) if err != nil { return err } - kstop, err := packOutputKey(addrID, higher) + kstop, err := packAddressKey(addrID, higher) if err != nil { return err } @@ -164,7 +166,7 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f if bytes.Compare(key, kstop) > 0 { break } - outpoints, err := d.unpackOutputValue(val) + outpoints, err := d.unpackOutpoints(val) if err != nil { return err } @@ -181,7 +183,11 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f vout = uint32(o.vout) isOutput = true } - if err := fn(o.txid, vout, isOutput); err != nil { + tx, err := d.chainParser.UnpackTxid(o.btxID) + if err != nil { + return err + } + if err := fn(tx, vout, isOutput); err != nil { return err } } @@ -194,10 +200,12 @@ const ( opDelete = 1 ) +// ConnectBlock indexes addresses in the block and stores them in db func (d *RocksDB) ConnectBlock(block *bchain.Block) error { return d.writeBlock(block, opInsert) } +// DisconnectBlock removes addresses in the block from the db func (d *RocksDB) DisconnectBlock(block *bchain.Block) error { return d.writeBlock(block, opDelete) } @@ -236,33 +244,41 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { // Addresses index type outpoint struct { - txid string - vout int32 + btxID []byte + vout int32 +} + +func packBlockAddress(addrID []byte) []byte { + vBuf := make([]byte, vlq.MaxLen32) + vl := packVarint(int32(len(addrID)), vBuf) + blockAddress := append([]byte(nil), vBuf[:vl]...) + blockAddress = append(blockAddress, addrID...) + return blockAddress } func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint) error { keep := d.chainParser.KeepBlockAddresses() blockAddresses := make([]byte, 0) - vBuf := make([]byte, vlq.MaxLen32) for addrID, outpoints := range records { - key, err := packOutputKey([]byte(addrID), block.Height) + baddrID := []byte(addrID) + key, err := packAddressKey(baddrID, block.Height) if err != nil { glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID) continue } switch op { case opInsert: - val, err := d.packOutputValue(outpoints) + val, err := d.packOutpoints(outpoints) if err != nil { glog.Warningf("rocksdb: packOutputValue: %v", err) continue } wb.PutCF(d.cfh[cfAddresses], key, val) if keep > 0 { - // collect all addresses to be stored in blockaddresses - vl := packVarint(int32(len([]byte(addrID))), vBuf) - blockAddresses = append(blockAddresses, vBuf[0:vl]...) - blockAddresses = append(blockAddresses, []byte(addrID)...) + // collect all addresses be stored in blockaddresses + // they are used in disconnect blocks + blockAddress := packBlockAddress(baddrID) + blockAddresses = append(blockAddresses, blockAddress...) } case opDelete: wb.DeleteCF(d.cfh[cfAddresses], key) @@ -298,8 +314,8 @@ func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records m } else { strAddrID := string(addrID) records[strAddrID] = append(records[strAddrID], outpoint{ - txid: string(btxid), - vout: vout, + btxID: btxid, + vout: vout, }) if op == opDelete { // remove transactions from cache @@ -404,7 +420,8 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo return err } // try to find the tx in current block - unspentAddrs, inThisBlock := unspentTxs[string(btxID)] + stxID := string(btxID) + unspentAddrs, inThisBlock := unspentTxs[stxID] if !inThisBlock { unspentAddrs, err = d.getUnspentTx(btxID) if err != nil { @@ -418,14 +435,14 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo var addrID []byte addrID, unspentAddrs = findAndRemoveUnspentAddr(unspentAddrs, input.Vout) if addrID == nil { - glog.Warningf("rocksdb: height %d, tx %v vout %v in inputs but missing in unspentTxs", block.Height, tx.Txid, input.Vout) + glog.Warningf("rocksdb: height %d, tx %v vin %v in inputs but missing in unspentTxs", block.Height, tx.Txid, i) continue } err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height) if err != nil { return err } - unspentTxs[string(btxID)] = unspentAddrs + unspentTxs[stxID] = unspentAddrs } } if err := d.writeAddressRecords(wb, block, op, addresses); err != nil { @@ -481,39 +498,45 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain. return d.writeAddressRecords(wb, block, op, addresses) } -func packOutputKey(outputScript []byte, height uint32) ([]byte, error) { - bheight := packUint(height) - buf := make([]byte, 0, len(outputScript)+len(bheight)) - buf = append(buf, outputScript...) - buf = append(buf, bheight...) - return buf, nil +func unpackBlockAddresses(buf []byte) ([][]byte, error) { + addresses := make([][]byte, 0) + // the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints + for i := 0; i < len(buf); { + l, lv := unpackVarint(buf[i:]) + j := i + int(l) + lv + if j > len(buf) { + glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf)) + return nil, errors.New("Inconsistent data in blockAddresses") + } + addrID := append([]byte(nil), buf[i+lv:j]...) + addresses = append(addresses, addrID) + i = j + } + return addresses, nil } -func (d *RocksDB) packOutputValue(outpoints []outpoint) ([]byte, error) { +func (d *RocksDB) packOutpoints(outpoints []outpoint) ([]byte, error) { buf := make([]byte, 0) + bvout := make([]byte, vlq.MaxLen32) for _, o := range outpoints { - bvout := make([]byte, vlq.MaxLen32) l := packVarint(o.vout, bvout) - buf = append(buf, []byte(o.txid)...) + buf = append(buf, []byte(o.btxID)...) buf = append(buf, bvout[:l]...) } return buf, nil } -func (d *RocksDB) unpackOutputValue(buf []byte) ([]outpoint, error) { +func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) { txidUnpackedLen := d.chainParser.PackedTxidLen() outpoints := make([]outpoint, 0) for i := 0; i < len(buf); { - txid, err := d.chainParser.UnpackTxid(buf[i : i+txidUnpackedLen]) - if err != nil { - return nil, err - } + btxid := buf[i : i+txidUnpackedLen] i += txidUnpackedLen vout, voutLen := unpackVarint(buf[i:]) i += voutLen outpoints = append(outpoints, outpoint{ - txid: txid, - vout: vout, + btxID: btxid, + vout: vout, }) } return outpoints, nil @@ -588,12 +611,23 @@ func (d *RocksDB) writeHeight( return nil } -// DisconnectBlocksFullScan removes all data belonging to blocks in range lower-higher -// it finds the data by doing full scan of outputs column, therefore it is quite slow -func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { - glog.Infof("db: disconnecting blocks %d-%d using full scan", lower, higher) - outputKeys := [][]byte{} - outputValues := [][]byte{} +func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, error) { + b, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key) + if err != nil { + return nil, err + } + defer b.Free() + // block is missing in DB + if b.Data() == nil { + return nil, errors.New("Block addresses missing") + } + return unpackBlockAddresses(b.Data()) +} + +func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) { + glog.Infof("db: doing full scan of addresses column") + addrKeys := [][]byte{} + addrValues := [][]byte{} var totalOutputs, count uint64 var seekKey []byte for { @@ -610,16 +644,16 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { count++ key = it.Key().Data() l := len(key) - if l > 4 { - height := unpackUint(key[l-4 : l]) + if l > packedHeightBytes { + height := unpackUint(key[l-packedHeightBytes : l]) if height >= lower && height <= higher { - outputKey := make([]byte, len(key)) - copy(outputKey, key) - outputKeys = append(outputKeys, outputKey) + addrKey := make([]byte, len(key)) + copy(addrKey, key) + addrKeys = append(addrKeys, addrKey) value := it.Value().Data() - outputValue := make([]byte, len(value)) - copy(outputValue, value) - outputValues = append(outputValues, outputValue) + addrValue := make([]byte, len(value)) + copy(addrValue, value) + addrValues = append(addrValues, addrValue) } } } @@ -631,43 +665,104 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { break } } - glog.Infof("rocksdb: about to disconnect %d outputs from %d", len(outputKeys), totalOutputs) + glog.Infof("rocksdb: scanned %d addresses, found %d to disconnect", totalOutputs, len(addrKeys)) + return addrKeys, addrValues, nil +} + +// DisconnectBlockRange removes all data belonging to blocks in range lower-higher +// it finds the data in blockaddresses column if available, +// otherwise by doing quite slow full scan of addresses column +func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { + glog.Infof("db: disconnecting blocks %d-%d", lower, higher) + addrKeys := [][]byte{} + addrValues := [][]byte{} + keep := d.chainParser.KeepBlockAddresses() + var err error + doFullScan := true + if keep > 0 { + for height := lower; height <= higher; height++ { + key := packUint(height) + addresses, err := d.getBlockAddresses(key) + if err != nil { + glog.Error(err) + goto GoFullScan + } + for _, addrID := range addresses { + addrKey := append(addrID, key...) + val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey) + if err != nil { + goto GoFullScan + } + addrKeys = append(addrKeys, addrKey) + addrValue := append([]byte(nil), val.Data()...) + val.Free() + addrValues = append(addrValues, addrValue) + } + } + doFullScan = false + GoFullScan: + } + if doFullScan { + addrKeys, addrValues, err = d.fullAddressesScan(lower, higher) + if err != nil { + return err + } + } + glog.Infof("rocksdb: about to disconnect %d addresses ", len(addrKeys)) wb := gorocksdb.NewWriteBatch() defer wb.Destroy() - for i := 0; i < len(outputKeys); i++ { + unspentTxs := make(map[string][]byte) + for i, addrKey := range addrKeys { if glog.V(2) { - glog.Info("output ", hex.EncodeToString(outputKeys[i])) + glog.Info("address ", hex.EncodeToString(addrKey)) } - wb.DeleteCF(d.cfh[cfAddresses], outputKeys[i]) - outpoints, err := d.unpackOutputValue(outputValues[i]) + wb.DeleteCF(d.cfh[cfAddresses], addrKey) + outpoints, err := d.unpackOutpoints(addrValues[i]) + if err != nil { + return err + } + addrID, height, err := unpackAddressKey(addrKey) if err != nil { return err } for _, o := range outpoints { - // delete from inputs - boutpoint, err := d.packOutpoint(o.txid, o.vout) - if err != nil { - return err - } if glog.V(2) { - glog.Info("input ", hex.EncodeToString(boutpoint)) + glog.Info("tx ", height, " ", hex.EncodeToString(o.btxID), " ", o.vout) } - wb.DeleteCF(d.cfh[cfUnspentTxs], boutpoint) - // delete from txCache - b, err := d.chainParser.PackTxid(o.txid) - if err != nil { - return err + // recreate unspentTxs from inputs + if o.vout < 0 { + stxID := string(o.btxID) + txAddrs, exists := unspentTxs[stxID] + if !exists { + txAddrs, err = d.getUnspentTx(o.btxID) + if err != nil { + return err + } + } + txAddrs = appendPackedAddrID(txAddrs, addrID, uint32(^o.vout), 1) + unspentTxs[stxID] = txAddrs + } else { + // remove from cfUnspentTxs + wb.DeleteCF(d.cfh[cfUnspentTxs], o.btxID) } - wb.DeleteCF(d.cfh[cfTransactions], b) + // delete cached transaction + wb.DeleteCF(d.cfh[cfTransactions], o.btxID) } } + for key, val := range unspentTxs { + wb.PutCF(d.cfh[cfUnspentTxs], []byte(key), val) + } for height := lower; height <= higher; height++ { if glog.V(2) { glog.Info("height ", height) } - wb.DeleteCF(d.cfh[cfHeight], packUint(height)) + key := packUint(height) + if keep > 0 { + wb.DeleteCF(d.cfh[cfBlockAddresses], key) + } + wb.DeleteCF(d.cfh[cfHeight], key) } - err := d.db.Write(d.wo, wb) + err = d.db.Write(d.wo, wb) if err == nil { glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) } @@ -737,6 +832,22 @@ func (d *RocksDB) DeleteTx(txid string) error { // Helpers +func packAddressKey(addrID []byte, height uint32) ([]byte, error) { + bheight := packUint(height) + buf := make([]byte, 0, len(addrID)+len(bheight)) + buf = append(buf, addrID...) + buf = append(buf, bheight...) + return buf, nil +} + +func unpackAddressKey(key []byte) ([]byte, uint32, error) { + i := len(key) - packedHeightBytes + if i <= 0 { + return nil, 0, errors.New("Invalid address key") + } + return key[:i], unpackUint(key[i : i+packedHeightBytes]), nil +} + func packUint(i uint32) []byte { buf := make([]byte, 4) binary.BigEndian.PutUint32(buf, i) diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index ea2ef808..d37be34c 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -218,7 +218,7 @@ func getTestUTXOBlock2(t *testing.T, d *RocksDB) *bchain.Block { } } -func verifyAfterUTXOBlock1(t *testing.T, d *RocksDB) { +func verifyAfterUTXOBlock1(t *testing.T, d *RocksDB, noBlockAddresses bool) { if err := checkColumn(d, cfHeight, []keyPair{ keyPair{"000370d5", "0000000076fbbed90fd75b0e18856aa35baa984e9c9d444cf746ad85e94e2997", nil}, }); err != nil { @@ -253,19 +253,26 @@ func verifyAfterUTXOBlock1(t *testing.T, d *RocksDB) { t.Fatal(err) } } - // the values in cfBlockAddresses has random order, must use CompareFunc - if err := checkColumn(d, cfBlockAddresses, []keyPair{ - keyPair{"000370d5", "", - func(v string) bool { - return compareFuncBlockAddresses(v, []string{ - addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d), - addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d), - addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d), - addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d), - }) + // after disconnect there are no blockaddresses for the previous block + var blockAddressesKp []keyPair + if noBlockAddresses { + blockAddressesKp = []keyPair{} + } else { + // the values in cfBlockAddresses have random order, must use CompareFunc + blockAddressesKp = []keyPair{ + keyPair{"000370d5", "", + func(v string) bool { + return compareFuncBlockAddresses(v, []string{ + addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d), + addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d), + addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d), + addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d), + }) + }, }, - }, - }); err != nil { + } + } + if err := checkColumn(d, cfBlockAddresses, blockAddressesKp); err != nil { { t.Fatal(err) } @@ -400,14 +407,15 @@ func testTxCache(t *testing.T, d *RocksDB, b *bchain.Block, tx *bchain.Tx) { } } -// TestRocksDB_Index_UTXO is a composite test probing the whole indexing functionality for UTXO chains +// TestRocksDB_Index_UTXO is an integration test probing the whole indexing functionality for UTXO chains // It does the following: // 1) Connect two blocks (inputs from 2nd block are spending some outputs from the 1st block) // 2) GetTransactions for various addresses / low-high ranges // 3) GetBestBlock, GetBlockHash // 4) Test tx caching functionality // 5) Disconnect block 2 - expect error -// 6) Disconnect the block 2 using full scan +// 6) Disconnect the block 2 using blockaddresses column +// 7) Reconnect block 2 and disconnect blocks 1 and 2 using full scan // After each step, the content of DB is examined and any difference against expected state is regarded as failure func TestRocksDB_Index_UTXO(t *testing.T) { d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}}) @@ -418,7 +426,7 @@ func TestRocksDB_Index_UTXO(t *testing.T) { if err := d.ConnectBlock(block1); err != nil { t.Fatal(err) } - verifyAfterUTXOBlock1(t, d) + verifyAfterUTXOBlock1(t, d, false) // connect 2nd block - use some outputs from the 1st block as the inputs and 1 input uses tx from the same block block2 := getTestUTXOBlock2(t, d) @@ -488,7 +496,19 @@ func TestRocksDB_Index_UTXO(t *testing.T) { } verifyAfterUTXOBlock2(t, d) - // disconnect the 2nd block, verify that the db contains only the 1st block + // disconnect the 2nd block, verify that the db contains only data from the 1st block with restored unspentTxs + // and that the cached tx is removed + err = d.DisconnectBlockRange(225494, 225494) + if err != nil { + t.Fatal(err) + } + + verifyAfterUTXOBlock1(t, d, true) + if err := checkColumn(d, cfTransactions, []keyPair{}); err != nil { + { + t.Fatal(err) + } + } } @@ -549,3 +569,41 @@ func Test_findAndRemoveUnspentAddr(t *testing.T) { }) } } + +func Test_unpackBlockAddresses(t *testing.T) { + type args struct { + buf string + } + tests := []struct { + name string + args args + want []string + wantErr bool + }{ + { + name: "1", + args: args{"029c10517a011588745287127093935888939356870e646351670068680e765193518800870a7b7b0115873276a9144150837fb91d9461d6b95059842ab85262c2923f88ac08636751680457870291"}, + want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087", "7b7b011587", "76a9144150837fb91d9461d6b95059842ab85262c2923f88ac", "63675168", "5787", "91"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b, err := hex.DecodeString(tt.args.buf) + if err != nil { + panic(err) + } + got, err := unpackBlockAddresses(b) + if (err != nil) != tt.wantErr { + t.Errorf("unpackBlockAddresses() error = %v, wantErr %v", err, tt.wantErr) + return + } + h := make([]string, len(got)) + for i, g := range got { + h[i] = hex.EncodeToString(g) + } + if !reflect.DeepEqual(h, tt.want) { + t.Errorf("unpackBlockAddresses() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/db/sync.go b/db/sync.go index 845a517d..e0216583 100644 --- a/db/sync.go +++ b/db/sync.go @@ -382,10 +382,9 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) { // otherwise doing full scan func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []string) error { glog.Infof("sync: disconnecting blocks %d-%d", lower, higher) - // if the chain uses Block to Addresses mapping, always use DisconnectBlocksFullScan - // the full scan will be optimized using the mapping + // if the chain uses Block to Addresses mapping, always use DisconnectBlockRange if w.chain.GetChainParser().KeepBlockAddresses() > 0 { - return w.db.DisconnectBlocksFullScan(lower, higher) + return w.db.DisconnectBlockRange(lower, higher) } blocks := make([]*bchain.Block, len(hashes)) var err error @@ -393,8 +392,8 @@ func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []stri for i, hash := range hashes { blocks[i], err = w.chain.GetBlock(hash, 0) if err != nil { - // cannot get block, do full range scan - return w.db.DisconnectBlocksFullScan(lower, higher) + // cannot get a block, we must do full range scan + return w.db.DisconnectBlockRange(lower, higher) } } // then disconnect one after another