From 8e3c7f851bb7770de9aa9e7dbe5949b32d98c670 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Fri, 3 Aug 2018 19:26:16 +0200 Subject: [PATCH] Clean up the rocksdb sources and disconnect UTXO blocks WIP --- db/rocksdb.go | 517 ++++++++++++--------------------------------- db/rocksdb_test.go | 156 +------------- db/sync.go | 14 +- 3 files changed, 148 insertions(+), 539 deletions(-) diff --git a/db/rocksdb.go b/db/rocksdb.go index ebee699b..7ce84b54 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -52,10 +52,6 @@ const ( cfAddressBalance cfBlockTxids cfTransactions - - // to be removed, kept temporarily so that the code is compilable - cfUnspentTxs - cfBlockAddresses ) var cfNames = []string{"default", "height", "addresses", "txAddresses", "addressBalance", "blockTxids", "transactions"} @@ -257,7 +253,12 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { return err } if isUTXO { - if err := d.writeAddressesUTXO(wb, block, op); err != nil { + if op == opDelete { + // block does not contain mapping tx-> input address, which is necessary to recreate + // unspentTxs; therefore it is not possible to DisconnectBlocks this way + return errors.New("DisconnectBlock is not supported for UTXO chains") + } + if err := d.writeAddressesUTXO(wb, block); err != nil { return err } } else { @@ -293,12 +294,7 @@ type addrBalance struct { balanceSat big.Int } -func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { - if op == opDelete { - // block does not contain mapping tx-> input address, which is necessary to recreate - // unspentTxs; therefore it is not possible to DisconnectBlocks this way - return errors.New("DisconnectBlock is not supported for UTXO chains") - } +func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error { addresses := make(map[string][]outpoint) blockTxids := make([][]byte, len(block.Txs)) txAddressesMap := make(map[string]*txAddresses) @@ -443,21 +439,16 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo ab.sentSat.Add(&ab.sentSat, &ot.valueSat) } } - if op == opInsert { - if err := d.storeAddresses(wb, block, addresses); err != nil { - return err - } - if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { - return err - } - if err := d.storeBalances(wb, balances); err != nil { - return err - } - if err := d.storeAndCleanupBlockTxids(wb, block, blockTxids); err != nil { - return err - } + if err := d.storeAddresses(wb, block, addresses); err != nil { + return err } - return nil + if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { + return err + } + if err := d.storeBalances(wb, balances); err != nil { + return err + } + return d.storeAndCleanupBlockTxids(wb, block, blockTxids) } func processedInTx(o []outpoint, btxID []byte) bool { @@ -493,12 +484,17 @@ func (d *RocksDB) storeBalances(wb *gorocksdb.WriteBatch, abm map[string]*addrBa // allocate buffer big enough for number of txs + 2 bigints buf := make([]byte, vlq.MaxLen32+2*maxPackedBigintBytes) for addrID, ab := range abm { - l := packVaruint(uint(ab.txs), buf) - ll := packBigint(&ab.sentSat, buf[l:]) - l += ll - ll = packBigint(&ab.balanceSat, buf[l:]) - l += ll - wb.PutCF(d.cfh[cfAddressBalance], []byte(addrID), buf[:l]) + // balance with 0 transactions is removed from db - happens in disconnect + if ab == nil || ab.txs <= 0 { + wb.DeleteCF(d.cfh[cfAddressBalance], []byte(addrID)) + } else { + l := packVaruint(uint(ab.txs), buf) + ll := packBigint(&ab.sentSat, buf[l:]) + l += ll + ll = packBigint(&ab.balanceSat, buf[l:]) + l += ll + wb.PutCF(d.cfh[cfAddressBalance], []byte(addrID), buf[:l]) + } } return nil } @@ -532,6 +528,22 @@ func (d *RocksDB) storeAndCleanupBlockTxids(wb *gorocksdb.WriteBatch, block *bch return nil } +func (d *RocksDB) getBlockTxids(height uint32) ([][]byte, error) { + pl := d.chainParser.PackedTxidLen() + val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxids], packUint(height)) + if err != nil { + return nil, err + } + defer val.Free() + buf := val.Data() + txids := make([][]byte, len(buf)/pl) + for i := 0; i < len(txids); i++ { + txid := make([]byte, pl) + copy(txid, buf[pl*i:]) + } + return txids, nil +} + func (d *RocksDB) getAddressBalance(addrID []byte) (*addrBalance, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfAddressBalance], addrID) if err != nil { @@ -651,70 +663,9 @@ func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) { return outpoints, nil } -//////////////////////// - -func (d *RocksDB) packBlockAddress(addrID []byte, spentTxs map[string][]outpoint) []byte { - vBuf := make([]byte, vlq.MaxLen32) - vl := packVarint(len(addrID), vBuf) - blockAddress := append([]byte(nil), vBuf[:vl]...) - blockAddress = append(blockAddress, addrID...) - if spentTxs == nil { - } else { - addrUnspentTxs := spentTxs[string(addrID)] - vl = packVarint(len(addrUnspentTxs), vBuf) - blockAddress = append(blockAddress, vBuf[:vl]...) - buf := d.packOutpoints(addrUnspentTxs) - blockAddress = append(blockAddress, buf...) - } - return blockAddress -} - -func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, addresses map[string][]outpoint, spentTxs map[string][]outpoint) error { - keep := d.chainParser.KeepBlockAddresses() - blockAddresses := make([]byte, 0) - for addrID, outpoints := range addresses { - baddrID := []byte(addrID) - key := packAddressKey(baddrID, block.Height) - switch op { - case opInsert: - val := d.packOutpoints(outpoints) - wb.PutCF(d.cfh[cfAddresses], key, val) - if keep > 0 { - // collect all addresses be stored in blockaddresses - // they are used in disconnect blocks - blockAddress := d.packBlockAddress(baddrID, spentTxs) - blockAddresses = append(blockAddresses, blockAddress...) - } - case opDelete: - wb.DeleteCF(d.cfh[cfAddresses], key) - } - } - if keep > 0 && op == opInsert { - // write new block address and txs spent in this block - key := packUint(block.Height) - wb.PutCF(d.cfh[cfBlockAddresses], key, blockAddresses) - // cleanup old block address - if block.Height > uint32(keep) { - for rh := block.Height - uint32(keep); rh < block.Height; rh-- { - key = packUint(rh) - val, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key) - if err != nil { - return err - } - if val.Size() == 0 { - break - } - val.Free() - d.db.DeleteCF(d.wo, d.cfh[cfBlockAddresses], key) - } - } - } - return nil -} - func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, btxid []byte, vout int32, bh uint32) error { if len(addrID) > 0 { - if len(addrID) > 1024 { + if len(addrID) > maxAddrIDLen { glog.Infof("rocksdb: block %d, skipping addrID of length %d", bh, len(addrID)) } else { strAddrID := string(addrID) @@ -731,153 +682,6 @@ func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records m return nil } -func (d *RocksDB) getUnspentTx(btxID []byte) ([]byte, error) { - // find it in db, in the column cfUnspentTxs - val, err := d.db.GetCF(d.ro, d.cfh[cfUnspentTxs], btxID) - if err != nil { - return nil, err - } - defer val.Free() - data := append([]byte(nil), val.Data()...) - return data, nil -} - -func appendPackedAddrID(txAddrs []byte, addrID []byte, n uint32, remaining int) []byte { - // resize the addr buffer if necessary by a new estimate - if cap(txAddrs)-len(txAddrs) < 2*vlq.MaxLen32+len(addrID) { - txAddrs = append(txAddrs, make([]byte, vlq.MaxLen32+len(addrID)+remaining*32)...)[:len(txAddrs)] - } - // addrID is packed as number of bytes of the addrID + bytes of addrID + vout - lv := packVarint(len(addrID), txAddrs[len(txAddrs):len(txAddrs)+vlq.MaxLen32]) - txAddrs = txAddrs[:len(txAddrs)+lv] - txAddrs = append(txAddrs, addrID...) - lv = packVarint(int(n), txAddrs[len(txAddrs):len(txAddrs)+vlq.MaxLen32]) - txAddrs = txAddrs[:len(txAddrs)+lv] - return txAddrs -} - -func findAndRemoveUnspentAddr(unspentAddrs []byte, vout uint32) ([]byte, []byte) { - // the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints - for i := 0; i < len(unspentAddrs); { - l, lv1 := unpackVarint(unspentAddrs[i:]) - // index of vout of address in unspentAddrs - j := i + int(l) + lv1 - if j >= len(unspentAddrs) { - glog.Error("rocksdb: Inconsistent data in unspentAddrs ", hex.EncodeToString(unspentAddrs), ", ", vout) - return nil, unspentAddrs - } - n, lv2 := unpackVarint(unspentAddrs[j:]) - if uint32(n) == vout { - addrID := append([]byte(nil), unspentAddrs[i+lv1:j]...) - unspentAddrs = append(unspentAddrs[:i], unspentAddrs[j+lv2:]...) - return addrID, unspentAddrs - } - i = j + lv2 - } - return nil, unspentAddrs -} - -func (d *RocksDB) writeAddressesUTXO_old(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { - if op == opDelete { - // block does not contain mapping tx-> input address, which is necessary to recreate - // unspentTxs; therefore it is not possible to DisconnectBlocks this way - return errors.New("DisconnectBlock is not supported for UTXO chains") - } - addresses := make(map[string][]outpoint) - unspentTxs := make(map[string][]byte) - thisBlockTxs := make(map[string]struct{}) - btxIDs := make([][]byte, len(block.Txs)) - // first process all outputs, build mapping of addresses to outpoints and mappings of unspent txs to addresses - for txi, tx := range block.Txs { - btxID, err := d.chainParser.PackTxid(tx.Txid) - if err != nil { - return err - } - btxIDs[txi] = btxID - // preallocate estimated size of addresses (32 bytes is 1 byte length of addrID, 25 bytes addrID, 1-2 bytes vout and reserve) - txAddrs := make([]byte, 0, len(tx.Vout)*32) - for i, output := range tx.Vout { - addrID, err := d.chainParser.GetAddrIDFromVout(&output) - if err != nil { - // do not log ErrAddressMissing, transactions can be without to address (for example eth contracts) - if err != bchain.ErrAddressMissing { - glog.Warningf("rocksdb: addrID: %v - height %d, tx %v, output %v", err, block.Height, tx.Txid, output) - } - continue - } - err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height) - if err != nil { - return err - } - txAddrs = appendPackedAddrID(txAddrs, addrID, output.N, len(tx.Vout)-i) - } - stxID := string(btxID) - unspentTxs[stxID] = txAddrs - thisBlockTxs[stxID] = struct{}{} - } - // locate addresses spent by this tx and remove them from unspent addresses - // keep them so that they be stored for DisconnectBlock functionality - spentTxs := make(map[string][]outpoint) - for txi, tx := range block.Txs { - spendingTxid := btxIDs[txi] - for i, input := range tx.Vin { - btxID, err := d.chainParser.PackTxid(input.Txid) - if err != nil { - // do not process inputs without input txid - if err == bchain.ErrTxidMissing { - continue - } - return err - } - // find the tx in current block or already processed - stxID := string(btxID) - unspentAddrs, exists := unspentTxs[stxID] - if !exists { - // else find it in previous blocks - unspentAddrs, err = d.getUnspentTx(btxID) - if err != nil { - return err - } - if unspentAddrs == nil { - glog.Warningf("rocksdb: height %d, tx %v, input tx %v vin %v %v missing in unspentTxs", block.Height, tx.Txid, input.Txid, input.Vout, i) - continue - } - } - var addrID []byte - addrID, unspentAddrs = findAndRemoveUnspentAddr(unspentAddrs, input.Vout) - if addrID == nil { - glog.Warningf("rocksdb: height %d, tx %v, input tx %v vin %v %v not found in unspentAddrs", block.Height, tx.Txid, input.Txid, input.Vout, i) - continue - } - // record what was spent in this tx - // skip transactions that were created in this block - if _, exists := thisBlockTxs[stxID]; !exists { - saddrID := string(addrID) - rut := spentTxs[saddrID] - rut = append(rut, outpoint{btxID, int32(input.Vout)}) - spentTxs[saddrID] = rut - } - err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height) - if err != nil { - return err - } - unspentTxs[stxID] = unspentAddrs - } - } - if err := d.writeAddressRecords(wb, block, op, addresses, spentTxs); err != nil { - return err - } - // save unspent txs from current block - for tx, val := range unspentTxs { - if len(val) == 0 { - wb.DeleteCF(d.cfh[cfUnspentTxs], []byte(tx)) - } else { - wb.PutCF(d.cfh[cfUnspentTxs], []byte(tx), val) - } - } - return nil -} - func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { addresses := make(map[string][]outpoint) for _, tx := range block.Txs { @@ -914,75 +718,19 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain. } } } - return d.writeAddressRecords(wb, block, op, addresses, nil) -} - -func (d *RocksDB) unpackBlockAddresses(buf []byte) ([][]byte, [][]outpoint, error) { - addresses := make([][]byte, 0) - outpointsArray := make([][]outpoint, 0) - // the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints - for i := 0; i < len(buf); { - l, lv := unpackVarint(buf[i:]) - j := i + int(l) + lv - if j > len(buf) { - glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf)) - return nil, nil, errors.New("Inconsistent data in blockAddresses") - } - addrID := append([]byte(nil), buf[i+lv:j]...) - outpoints, ol, err := d.unpackNOutpoints(buf[j:]) - if err != nil { - glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf)) - return nil, nil, errors.New("Inconsistent data in blockAddresses") - } - addresses = append(addresses, addrID) - outpointsArray = append(outpointsArray, outpoints) - i = j + ol - } - return addresses, outpointsArray, nil -} - -func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) { - txidUnpackedLen := d.chainParser.PackedTxidLen() - n, p := unpackVarint32(buf) - outpoints := make([]outpoint, n) - for i := int32(0); i < n; i++ { - if p+txidUnpackedLen >= len(buf) { - return nil, 0, errors.New("Inconsistent data in unpackNOutpoints") - } - btxID := append([]byte(nil), buf[p:p+txidUnpackedLen]...) - p += txidUnpackedLen - vout, voutLen := unpackVarint32(buf[p:]) - p += voutLen - outpoints[i] = outpoint{ - btxID: btxID, - index: vout, + for addrID, outpoints := range addresses { + key := packAddressKey([]byte(addrID), block.Height) + switch op { + case opInsert: + val := d.packOutpoints(outpoints) + wb.PutCF(d.cfh[cfAddresses], key, val) + case opDelete: + wb.DeleteCF(d.cfh[cfAddresses], key) } } - return outpoints, p, nil + return nil } -func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) { - btxid, err := d.chainParser.PackTxid(txid) - if err != nil { - return nil, err - } - bv := make([]byte, vlq.MaxLen32) - l := packVarint32(vout, bv) - buf := make([]byte, 0, l+len(btxid)) - buf = append(buf, btxid...) - buf = append(buf, bv[:l]...) - return buf, nil -} - -func (d *RocksDB) unpackOutpoint(buf []byte) (string, int32, int) { - txidUnpackedLen := d.chainParser.PackedTxidLen() - txid, _ := d.chainParser.UnpackTxid(buf[:txidUnpackedLen]) - vout, o := unpackVarint32(buf[txidUnpackedLen:]) - return txid, vout, txidUnpackedLen + o -} - -////////////////////////////////// - // Block index // GetBestBlock returns the block hash of the block with highest height in the db @@ -1032,18 +780,7 @@ func (d *RocksDB) writeHeight( return nil } -func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, [][]outpoint, error) { - b, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key) - if err != nil { - return nil, nil, err - } - defer b.Free() - // block is missing in DB - if b.Data() == nil { - return nil, nil, errors.New("Block addresses missing") - } - return d.unpackBlockAddresses(b.Data()) -} +// Disconnect blocks func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) { glog.Infof("db: doing full scan of addresses column") @@ -1090,93 +827,107 @@ func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]b return addrKeys, addrValues, nil } -// DisconnectBlockRange removes all data belonging to blocks in range lower-higher -// it finds the data in blockaddresses column if available, -// otherwise by doing quite slow full scan of addresses column -func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error { - glog.Infof("db: disconnecting blocks %d-%d", lower, higher) - addrKeys := [][]byte{} - addrOutpoints := [][]byte{} - addrUnspentOutpoints := [][]outpoint{} - keep := d.chainParser.KeepBlockAddresses() - var err error - if keep > 0 { - for height := lower; height <= higher; height++ { - addresses, unspentOutpoints, err := d.getBlockAddresses(packUint(height)) +func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses, txAddressesToUpdate map[string]*txAddresses, txsToDelete map[string]struct{}, balances map[string]*addrBalance) error { + findBalance := func(addrID []byte) (*addrBalance, error) { + var err error + s := string(addrID) + b, found := balances[s] + if !found { + b, err = d.getAddressBalance(addrID) if err != nil { - glog.Error(err) - return err - } - for i, addrID := range addresses { - addrKey := packAddressKey(addrID, height) - val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey) - if err != nil { - glog.Error(err) - return err - } - addrKeys = append(addrKeys, addrKey) - av := append([]byte(nil), val.Data()...) - val.Free() - addrOutpoints = append(addrOutpoints, av) - addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i]) + return nil, err } + balances[s] = b } - } else { - addrKeys, addrOutpoints, err = d.allAddressesScan(lower, higher) + return b, nil + } + for _, t := range txa.inputs { + b, err := findBalance(t.addrID) if err != nil { return err } + if b != nil { + + } } + return nil +} + +// DisconnectBlockRangeUTXO removes all data belonging to blocks in range lower-higher +// if they are in the range kept in the cfBlockTxs column +func (d *RocksDB) DisconnectBlockRangeUTXO(lower uint32, higher uint32) error { + glog.Infof("db: disconnecting blocks %d-%d", lower, higher) + blocksTxids := make([][][]byte, higher-lower+1) + for height := lower; height <= higher; height++ { + blockTxids, err := d.getBlockTxids(height) + if err != nil { + return err + } + if len(blockTxids) == 0 { + return errors.Errorf("Cannot disconnect blocks with height %v and lower. It is necessary to rebuild index.", height) + } + blocksTxids[height-lower] = blockTxids + } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + txAddressesToUpdate := make(map[string]*txAddresses) + txsToDelete := make(map[string]struct{}) + balances := make(map[string]*addrBalance) + for height := higher; height >= lower; height-- { + blockTxids := blocksTxids[height-lower] + for _, txid := range blockTxids { + txa, err := d.getTxAddresses(txid) + if err != nil { + return err + } + s := string(txid) + txsToDelete[s] = struct{}{} + if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, txsToDelete, balances); err != nil { + return err + } + } + key := packUint(height) + wb.DeleteCF(d.cfh[cfBlockTxids], key) + wb.DeleteCF(d.cfh[cfHeight], key) + } + d.storeTxAddresses(wb, txAddressesToUpdate) + d.storeBalances(wb, balances) + for s := range txsToDelete { + b := []byte(s) + wb.DeleteCF(d.cfh[cfTransactions], b) + wb.DeleteCF(d.cfh[cfTxAddresses], b) + } + err := d.db.Write(d.wo, wb) + if err == nil { + glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) + } + return err +} + +// DisconnectBlockRangeNonUTXO performs full range scan to remove a range of blocks +// it is very slow operation +func (d *RocksDB) DisconnectBlockRangeNonUTXO(lower uint32, higher uint32) error { + glog.Infof("db: disconnecting blocks %d-%d", lower, higher) + addrKeys, _, err := d.allAddressesScan(lower, higher) + if err != nil { + return err + } glog.Infof("rocksdb: about to disconnect %d addresses ", len(addrKeys)) wb := gorocksdb.NewWriteBatch() defer wb.Destroy() - unspentTxs := make(map[string][]byte) - for addrIndex, addrKey := range addrKeys { + for _, addrKey := range addrKeys { if glog.V(2) { glog.Info("address ", hex.EncodeToString(addrKey)) } // delete address:height from the index wb.DeleteCF(d.cfh[cfAddresses], addrKey) - addrID, _, err := unpackAddressKey(addrKey) - if err != nil { - return err - } - // recreate unspentTxs, which were spent by this block (that is being disconnected) - for _, o := range addrUnspentOutpoints[addrIndex] { - stxID := string(o.btxID) - txAddrs, exists := unspentTxs[stxID] - if !exists { - txAddrs, err = d.getUnspentTx(o.btxID) - if err != nil { - return err - } - } - txAddrs = appendPackedAddrID(txAddrs, addrID, uint32(o.index), 1) - unspentTxs[stxID] = txAddrs - } - // delete unspentTxs from this block - outpoints, err := d.unpackOutpoints(addrOutpoints[addrIndex]) - if err != nil { - return err - } - for _, o := range outpoints { - wb.DeleteCF(d.cfh[cfUnspentTxs], o.btxID) - d.internalDeleteTx(wb, o.btxID) - } - } - for key, val := range unspentTxs { - wb.PutCF(d.cfh[cfUnspentTxs], []byte(key), val) } for height := lower; height <= higher; height++ { if glog.V(2) { glog.Info("height ", height) } - key := packUint(height) - if keep > 0 { - wb.DeleteCF(d.cfh[cfBlockAddresses], key) - } - wb.DeleteCF(d.cfh[cfHeight], key) + wb.DeleteCF(d.cfh[cfHeight], packUint(height)) } err = d.db.Write(d.wo, wb) if err == nil { diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 52aa3b29..4a96e769 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -632,9 +632,16 @@ func TestRocksDB_Index_UTXO(t *testing.T) { } verifyAfterUTXOBlock2(t, d) + // try to disconnect both blocks, however only the last one is kept, it is not possible + err = d.DisconnectBlockRangeUTXO(225493, 225494) + if err == nil || err.Error() != "Cannot disconnect blocks with height 225493 and lower. It is necessary to rebuild index." { + t.Fatal(err) + } + verifyAfterUTXOBlock2(t, d) + // disconnect the 2nd block, verify that the db contains only data from the 1st block with restored unspentTxs // and that the cached tx is removed - err = d.DisconnectBlockRange(225494, 225494) + err = d.DisconnectBlockRangeUTXO(225494, 225494) if err != nil { t.Fatal(err) } @@ -648,153 +655,6 @@ func TestRocksDB_Index_UTXO(t *testing.T) { } -func Test_findAndRemoveUnspentAddr(t *testing.T) { - type args struct { - unspentAddrs string - vout uint32 - } - tests := []struct { - name string - args args - want string - want2 string - }{ - { - name: "3", - args: args{ - unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114", - vout: 3, - }, - want: "64635167006868", - want2: "029c0010517a0115887452870212709393588893935687040e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114", - }, - { - name: "10", - args: args{ - unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114", - vout: 10, - }, - want: "61", - want2: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112", - }, - { - name: "not there", - args: args{ - unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114", - vout: 11, - }, - want: "", - want2: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - b, err := hex.DecodeString(tt.args.unspentAddrs) - if err != nil { - panic(err) - } - got, got2 := findAndRemoveUnspentAddr(b, tt.args.vout) - h := hex.EncodeToString(got) - if !reflect.DeepEqual(h, tt.want) { - t.Errorf("findAndRemoveUnspentAddr() got = %v, want %v", h, tt.want) - } - h2 := hex.EncodeToString(got2) - if !reflect.DeepEqual(h2, tt.want2) { - t.Errorf("findAndRemoveUnspentAddr() got2 = %v, want %v", h2, tt.want2) - } - }) - } -} - -type hexoutpoint struct { - txID string - vout int32 -} - -func Test_unpackBlockAddresses(t *testing.T) { - d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}}) - defer closeAndDestroyRocksDB(t, d) - type args struct { - buf string - } - tests := []struct { - name string - args args - want []string - want2 [][]hexoutpoint - wantErr bool - }{ - { - name: "1", - args: args{"029c0010517a011588745287047c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d250000b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400612709393588893935687000e64635167006868000e7651935188008702effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7502"}, - want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087"}, - want2: [][]hexoutpoint{ - []hexoutpoint{}, - []hexoutpoint{ - hexoutpoint{txidB2T1, 0}, - hexoutpoint{txidB1T1, 3}, - }, - []hexoutpoint{}, - []hexoutpoint{}, - []hexoutpoint{ - hexoutpoint{txidB1T2, 1}, - }, - }, - }, - { - name: "1", - args: args{"3276A914B434EB0C1A3B7A02E8A29CC616E791EF1E0BF51F88AC003276A9143F8BA3FDA3BA7B69F5818086E12223C6DD25E3C888AC003276A914A08EAE93007F22668AB5E4A9C83C8CD1C325E3E088AC02EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75003276A9148BDF0AA3C567AA5975C2E61321B8BEBBE7293DF688AC0200B2C06055E5E90E9C82BD4181FDE310104391A7FA4F289B1704E5D90CAA3840022EA9144A21DB08FB6882CB152E1FF06780A430740F77048702EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75023276A914CCAAAF374E1B06CB83118453D102587B4273D09588AC003276A9148D802C045445DF49613F6A70DDD2E48526F3701F88AC00"}, - want: []string{"76a914b434eb0c1a3b7a02e8a29cc616e791ef1e0bf51f88ac", "76a9143f8ba3fda3ba7b69f5818086e12223c6dd25e3c888ac", "76a914a08eae93007f22668ab5e4a9c83c8cd1c325e3e088ac", "76a9148bdf0aa3c567aa5975c2e61321b8bebbe7293df688ac", "a9144a21db08fb6882cb152e1ff06780a430740f770487", "76a914ccaaaf374e1b06cb83118453d102587b4273d09588ac", "76a9148d802c045445df49613f6a70ddd2e48526f3701f88ac"}, - want2: [][]hexoutpoint{ - []hexoutpoint{}, - []hexoutpoint{}, - []hexoutpoint{ - hexoutpoint{txidB1T2, 0}, - }, - []hexoutpoint{ - hexoutpoint{txidB1T1, 1}, - }, - []hexoutpoint{ - hexoutpoint{txidB1T2, 1}, - }, - []hexoutpoint{}, - []hexoutpoint{}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - b, err := hex.DecodeString(tt.args.buf) - if err != nil { - panic(err) - } - got, got2, err := d.unpackBlockAddresses(b) - if (err != nil) != tt.wantErr { - t.Errorf("unpackBlockAddresses() error = %v, wantErr %v", err, tt.wantErr) - return - } - h := make([]string, len(got)) - for i, g := range got { - h[i] = hex.EncodeToString(g) - } - if !reflect.DeepEqual(h, tt.want) { - t.Errorf("unpackBlockAddresses() = %v, want %v", h, tt.want) - } - h2 := make([][]hexoutpoint, len(got2)) - for i, g := range got2 { - ho := make([]hexoutpoint, len(g)) - for j, o := range g { - ho[j] = hexoutpoint{hex.EncodeToString(o.btxID), o.index} - } - h2[i] = ho - } - if !reflect.DeepEqual(h2, tt.want2) { - t.Errorf("unpackBlockAddresses() = %v, want %v", h2, tt.want2) - } - }) - } -} - func Test_packBigint_unpackBigint(t *testing.T) { bigbig1, _ := big.NewInt(0).SetString("123456789123456789012345", 10) bigbig2, _ := big.NewInt(0).SetString("12345678912345678901234512389012345123456789123456789012345123456789123456789012345", 10) diff --git a/db/sync.go b/db/sync.go index a664e94f..fecfa09c 100644 --- a/db/sync.go +++ b/db/sync.go @@ -346,25 +346,23 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) { } // DisconnectBlocks removes all data belonging to blocks in range lower-higher, -// using block data from blockchain, if they are available, -// otherwise doing full scan func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []string) error { glog.Infof("sync: disconnecting blocks %d-%d", lower, higher) - // if the chain uses Block to Addresses mapping, always use DisconnectBlockRange - if w.chain.GetChainParser().KeepBlockAddresses() > 0 { - return w.db.DisconnectBlockRange(lower, higher) + // if the chain is UTXO, always use DisconnectBlockRange + if w.chain.GetChainParser().IsUTXOChain() { + return w.db.DisconnectBlockRangeUTXO(lower, higher) } blocks := make([]*bchain.Block, len(hashes)) var err error - // get all blocks first to see if we can avoid full scan + // try to get all blocks first to see if we can avoid full scan for i, hash := range hashes { blocks[i], err = w.chain.GetBlock(hash, 0) if err != nil { // cannot get a block, we must do full range scan - return w.db.DisconnectBlockRange(lower, higher) + return w.db.DisconnectBlockRangeNonUTXO(lower, higher) } } - // then disconnect one after another + // got all blocks to be disconnected, disconnect them one after another for i, block := range blocks { glog.Info("Disconnecting block ", (int(higher) - i), " ", block.Hash) if err = w.db.DisconnectBlock(block); err != nil {