Clean up the rocksdb sources and disconnect UTXO blocks WIP
This commit is contained in:
parent
7e11a4e615
commit
8e3c7f851b
517
db/rocksdb.go
517
db/rocksdb.go
@ -52,10 +52,6 @@ const (
|
||||
cfAddressBalance
|
||||
cfBlockTxids
|
||||
cfTransactions
|
||||
|
||||
// to be removed, kept temporarily so that the code is compilable
|
||||
cfUnspentTxs
|
||||
cfBlockAddresses
|
||||
)
|
||||
|
||||
var cfNames = []string{"default", "height", "addresses", "txAddresses", "addressBalance", "blockTxids", "transactions"}
|
||||
@ -257,7 +253,12 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
||||
return err
|
||||
}
|
||||
if isUTXO {
|
||||
if err := d.writeAddressesUTXO(wb, block, op); err != nil {
|
||||
if op == opDelete {
|
||||
// block does not contain mapping tx-> input address, which is necessary to recreate
|
||||
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
||||
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
||||
}
|
||||
if err := d.writeAddressesUTXO(wb, block); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
@ -293,12 +294,7 @@ type addrBalance struct {
|
||||
balanceSat big.Int
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
|
||||
if op == opDelete {
|
||||
// block does not contain mapping tx-> input address, which is necessary to recreate
|
||||
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
||||
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
||||
}
|
||||
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error {
|
||||
addresses := make(map[string][]outpoint)
|
||||
blockTxids := make([][]byte, len(block.Txs))
|
||||
txAddressesMap := make(map[string]*txAddresses)
|
||||
@ -443,21 +439,16 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
||||
ab.sentSat.Add(&ab.sentSat, &ot.valueSat)
|
||||
}
|
||||
}
|
||||
if op == opInsert {
|
||||
if err := d.storeAddresses(wb, block, addresses); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeBalances(wb, balances); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeAndCleanupBlockTxids(wb, block, blockTxids); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeAddresses(wb, block, addresses); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.storeBalances(wb, balances); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.storeAndCleanupBlockTxids(wb, block, blockTxids)
|
||||
}
|
||||
|
||||
func processedInTx(o []outpoint, btxID []byte) bool {
|
||||
@ -493,12 +484,17 @@ func (d *RocksDB) storeBalances(wb *gorocksdb.WriteBatch, abm map[string]*addrBa
|
||||
// allocate buffer big enough for number of txs + 2 bigints
|
||||
buf := make([]byte, vlq.MaxLen32+2*maxPackedBigintBytes)
|
||||
for addrID, ab := range abm {
|
||||
l := packVaruint(uint(ab.txs), buf)
|
||||
ll := packBigint(&ab.sentSat, buf[l:])
|
||||
l += ll
|
||||
ll = packBigint(&ab.balanceSat, buf[l:])
|
||||
l += ll
|
||||
wb.PutCF(d.cfh[cfAddressBalance], []byte(addrID), buf[:l])
|
||||
// balance with 0 transactions is removed from db - happens in disconnect
|
||||
if ab == nil || ab.txs <= 0 {
|
||||
wb.DeleteCF(d.cfh[cfAddressBalance], []byte(addrID))
|
||||
} else {
|
||||
l := packVaruint(uint(ab.txs), buf)
|
||||
ll := packBigint(&ab.sentSat, buf[l:])
|
||||
l += ll
|
||||
ll = packBigint(&ab.balanceSat, buf[l:])
|
||||
l += ll
|
||||
wb.PutCF(d.cfh[cfAddressBalance], []byte(addrID), buf[:l])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -532,6 +528,22 @@ func (d *RocksDB) storeAndCleanupBlockTxids(wb *gorocksdb.WriteBatch, block *bch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) getBlockTxids(height uint32) ([][]byte, error) {
|
||||
pl := d.chainParser.PackedTxidLen()
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxids], packUint(height))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer val.Free()
|
||||
buf := val.Data()
|
||||
txids := make([][]byte, len(buf)/pl)
|
||||
for i := 0; i < len(txids); i++ {
|
||||
txid := make([]byte, pl)
|
||||
copy(txid, buf[pl*i:])
|
||||
}
|
||||
return txids, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) getAddressBalance(addrID []byte) (*addrBalance, error) {
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressBalance], addrID)
|
||||
if err != nil {
|
||||
@ -651,70 +663,9 @@ func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) {
|
||||
return outpoints, nil
|
||||
}
|
||||
|
||||
////////////////////////
|
||||
|
||||
func (d *RocksDB) packBlockAddress(addrID []byte, spentTxs map[string][]outpoint) []byte {
|
||||
vBuf := make([]byte, vlq.MaxLen32)
|
||||
vl := packVarint(len(addrID), vBuf)
|
||||
blockAddress := append([]byte(nil), vBuf[:vl]...)
|
||||
blockAddress = append(blockAddress, addrID...)
|
||||
if spentTxs == nil {
|
||||
} else {
|
||||
addrUnspentTxs := spentTxs[string(addrID)]
|
||||
vl = packVarint(len(addrUnspentTxs), vBuf)
|
||||
blockAddress = append(blockAddress, vBuf[:vl]...)
|
||||
buf := d.packOutpoints(addrUnspentTxs)
|
||||
blockAddress = append(blockAddress, buf...)
|
||||
}
|
||||
return blockAddress
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, addresses map[string][]outpoint, spentTxs map[string][]outpoint) error {
|
||||
keep := d.chainParser.KeepBlockAddresses()
|
||||
blockAddresses := make([]byte, 0)
|
||||
for addrID, outpoints := range addresses {
|
||||
baddrID := []byte(addrID)
|
||||
key := packAddressKey(baddrID, block.Height)
|
||||
switch op {
|
||||
case opInsert:
|
||||
val := d.packOutpoints(outpoints)
|
||||
wb.PutCF(d.cfh[cfAddresses], key, val)
|
||||
if keep > 0 {
|
||||
// collect all addresses be stored in blockaddresses
|
||||
// they are used in disconnect blocks
|
||||
blockAddress := d.packBlockAddress(baddrID, spentTxs)
|
||||
blockAddresses = append(blockAddresses, blockAddress...)
|
||||
}
|
||||
case opDelete:
|
||||
wb.DeleteCF(d.cfh[cfAddresses], key)
|
||||
}
|
||||
}
|
||||
if keep > 0 && op == opInsert {
|
||||
// write new block address and txs spent in this block
|
||||
key := packUint(block.Height)
|
||||
wb.PutCF(d.cfh[cfBlockAddresses], key, blockAddresses)
|
||||
// cleanup old block address
|
||||
if block.Height > uint32(keep) {
|
||||
for rh := block.Height - uint32(keep); rh < block.Height; rh-- {
|
||||
key = packUint(rh)
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val.Size() == 0 {
|
||||
break
|
||||
}
|
||||
val.Free()
|
||||
d.db.DeleteCF(d.wo, d.cfh[cfBlockAddresses], key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, btxid []byte, vout int32, bh uint32) error {
|
||||
if len(addrID) > 0 {
|
||||
if len(addrID) > 1024 {
|
||||
if len(addrID) > maxAddrIDLen {
|
||||
glog.Infof("rocksdb: block %d, skipping addrID of length %d", bh, len(addrID))
|
||||
} else {
|
||||
strAddrID := string(addrID)
|
||||
@ -731,153 +682,6 @@ func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records m
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) getUnspentTx(btxID []byte) ([]byte, error) {
|
||||
// find it in db, in the column cfUnspentTxs
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfUnspentTxs], btxID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer val.Free()
|
||||
data := append([]byte(nil), val.Data()...)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func appendPackedAddrID(txAddrs []byte, addrID []byte, n uint32, remaining int) []byte {
|
||||
// resize the addr buffer if necessary by a new estimate
|
||||
if cap(txAddrs)-len(txAddrs) < 2*vlq.MaxLen32+len(addrID) {
|
||||
txAddrs = append(txAddrs, make([]byte, vlq.MaxLen32+len(addrID)+remaining*32)...)[:len(txAddrs)]
|
||||
}
|
||||
// addrID is packed as number of bytes of the addrID + bytes of addrID + vout
|
||||
lv := packVarint(len(addrID), txAddrs[len(txAddrs):len(txAddrs)+vlq.MaxLen32])
|
||||
txAddrs = txAddrs[:len(txAddrs)+lv]
|
||||
txAddrs = append(txAddrs, addrID...)
|
||||
lv = packVarint(int(n), txAddrs[len(txAddrs):len(txAddrs)+vlq.MaxLen32])
|
||||
txAddrs = txAddrs[:len(txAddrs)+lv]
|
||||
return txAddrs
|
||||
}
|
||||
|
||||
func findAndRemoveUnspentAddr(unspentAddrs []byte, vout uint32) ([]byte, []byte) {
|
||||
// the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints
|
||||
for i := 0; i < len(unspentAddrs); {
|
||||
l, lv1 := unpackVarint(unspentAddrs[i:])
|
||||
// index of vout of address in unspentAddrs
|
||||
j := i + int(l) + lv1
|
||||
if j >= len(unspentAddrs) {
|
||||
glog.Error("rocksdb: Inconsistent data in unspentAddrs ", hex.EncodeToString(unspentAddrs), ", ", vout)
|
||||
return nil, unspentAddrs
|
||||
}
|
||||
n, lv2 := unpackVarint(unspentAddrs[j:])
|
||||
if uint32(n) == vout {
|
||||
addrID := append([]byte(nil), unspentAddrs[i+lv1:j]...)
|
||||
unspentAddrs = append(unspentAddrs[:i], unspentAddrs[j+lv2:]...)
|
||||
return addrID, unspentAddrs
|
||||
}
|
||||
i = j + lv2
|
||||
}
|
||||
return nil, unspentAddrs
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressesUTXO_old(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
|
||||
if op == opDelete {
|
||||
// block does not contain mapping tx-> input address, which is necessary to recreate
|
||||
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
||||
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
||||
}
|
||||
addresses := make(map[string][]outpoint)
|
||||
unspentTxs := make(map[string][]byte)
|
||||
thisBlockTxs := make(map[string]struct{})
|
||||
btxIDs := make([][]byte, len(block.Txs))
|
||||
// first process all outputs, build mapping of addresses to outpoints and mappings of unspent txs to addresses
|
||||
for txi, tx := range block.Txs {
|
||||
btxID, err := d.chainParser.PackTxid(tx.Txid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
btxIDs[txi] = btxID
|
||||
// preallocate estimated size of addresses (32 bytes is 1 byte length of addrID, 25 bytes addrID, 1-2 bytes vout and reserve)
|
||||
txAddrs := make([]byte, 0, len(tx.Vout)*32)
|
||||
for i, output := range tx.Vout {
|
||||
addrID, err := d.chainParser.GetAddrIDFromVout(&output)
|
||||
if err != nil {
|
||||
// do not log ErrAddressMissing, transactions can be without to address (for example eth contracts)
|
||||
if err != bchain.ErrAddressMissing {
|
||||
glog.Warningf("rocksdb: addrID: %v - height %d, tx %v, output %v", err, block.Height, tx.Txid, output)
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
txAddrs = appendPackedAddrID(txAddrs, addrID, output.N, len(tx.Vout)-i)
|
||||
}
|
||||
stxID := string(btxID)
|
||||
unspentTxs[stxID] = txAddrs
|
||||
thisBlockTxs[stxID] = struct{}{}
|
||||
}
|
||||
// locate addresses spent by this tx and remove them from unspent addresses
|
||||
// keep them so that they be stored for DisconnectBlock functionality
|
||||
spentTxs := make(map[string][]outpoint)
|
||||
for txi, tx := range block.Txs {
|
||||
spendingTxid := btxIDs[txi]
|
||||
for i, input := range tx.Vin {
|
||||
btxID, err := d.chainParser.PackTxid(input.Txid)
|
||||
if err != nil {
|
||||
// do not process inputs without input txid
|
||||
if err == bchain.ErrTxidMissing {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
// find the tx in current block or already processed
|
||||
stxID := string(btxID)
|
||||
unspentAddrs, exists := unspentTxs[stxID]
|
||||
if !exists {
|
||||
// else find it in previous blocks
|
||||
unspentAddrs, err = d.getUnspentTx(btxID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if unspentAddrs == nil {
|
||||
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vin %v %v missing in unspentTxs", block.Height, tx.Txid, input.Txid, input.Vout, i)
|
||||
continue
|
||||
}
|
||||
}
|
||||
var addrID []byte
|
||||
addrID, unspentAddrs = findAndRemoveUnspentAddr(unspentAddrs, input.Vout)
|
||||
if addrID == nil {
|
||||
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vin %v %v not found in unspentAddrs", block.Height, tx.Txid, input.Txid, input.Vout, i)
|
||||
continue
|
||||
}
|
||||
// record what was spent in this tx
|
||||
// skip transactions that were created in this block
|
||||
if _, exists := thisBlockTxs[stxID]; !exists {
|
||||
saddrID := string(addrID)
|
||||
rut := spentTxs[saddrID]
|
||||
rut = append(rut, outpoint{btxID, int32(input.Vout)})
|
||||
spentTxs[saddrID] = rut
|
||||
}
|
||||
err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unspentTxs[stxID] = unspentAddrs
|
||||
}
|
||||
}
|
||||
if err := d.writeAddressRecords(wb, block, op, addresses, spentTxs); err != nil {
|
||||
return err
|
||||
}
|
||||
// save unspent txs from current block
|
||||
for tx, val := range unspentTxs {
|
||||
if len(val) == 0 {
|
||||
wb.DeleteCF(d.cfh[cfUnspentTxs], []byte(tx))
|
||||
} else {
|
||||
wb.PutCF(d.cfh[cfUnspentTxs], []byte(tx), val)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
|
||||
addresses := make(map[string][]outpoint)
|
||||
for _, tx := range block.Txs {
|
||||
@ -914,75 +718,19 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.
|
||||
}
|
||||
}
|
||||
}
|
||||
return d.writeAddressRecords(wb, block, op, addresses, nil)
|
||||
}
|
||||
|
||||
func (d *RocksDB) unpackBlockAddresses(buf []byte) ([][]byte, [][]outpoint, error) {
|
||||
addresses := make([][]byte, 0)
|
||||
outpointsArray := make([][]outpoint, 0)
|
||||
// the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints
|
||||
for i := 0; i < len(buf); {
|
||||
l, lv := unpackVarint(buf[i:])
|
||||
j := i + int(l) + lv
|
||||
if j > len(buf) {
|
||||
glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf))
|
||||
return nil, nil, errors.New("Inconsistent data in blockAddresses")
|
||||
}
|
||||
addrID := append([]byte(nil), buf[i+lv:j]...)
|
||||
outpoints, ol, err := d.unpackNOutpoints(buf[j:])
|
||||
if err != nil {
|
||||
glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf))
|
||||
return nil, nil, errors.New("Inconsistent data in blockAddresses")
|
||||
}
|
||||
addresses = append(addresses, addrID)
|
||||
outpointsArray = append(outpointsArray, outpoints)
|
||||
i = j + ol
|
||||
}
|
||||
return addresses, outpointsArray, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) {
|
||||
txidUnpackedLen := d.chainParser.PackedTxidLen()
|
||||
n, p := unpackVarint32(buf)
|
||||
outpoints := make([]outpoint, n)
|
||||
for i := int32(0); i < n; i++ {
|
||||
if p+txidUnpackedLen >= len(buf) {
|
||||
return nil, 0, errors.New("Inconsistent data in unpackNOutpoints")
|
||||
}
|
||||
btxID := append([]byte(nil), buf[p:p+txidUnpackedLen]...)
|
||||
p += txidUnpackedLen
|
||||
vout, voutLen := unpackVarint32(buf[p:])
|
||||
p += voutLen
|
||||
outpoints[i] = outpoint{
|
||||
btxID: btxID,
|
||||
index: vout,
|
||||
for addrID, outpoints := range addresses {
|
||||
key := packAddressKey([]byte(addrID), block.Height)
|
||||
switch op {
|
||||
case opInsert:
|
||||
val := d.packOutpoints(outpoints)
|
||||
wb.PutCF(d.cfh[cfAddresses], key, val)
|
||||
case opDelete:
|
||||
wb.DeleteCF(d.cfh[cfAddresses], key)
|
||||
}
|
||||
}
|
||||
return outpoints, p, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) {
|
||||
btxid, err := d.chainParser.PackTxid(txid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bv := make([]byte, vlq.MaxLen32)
|
||||
l := packVarint32(vout, bv)
|
||||
buf := make([]byte, 0, l+len(btxid))
|
||||
buf = append(buf, btxid...)
|
||||
buf = append(buf, bv[:l]...)
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) unpackOutpoint(buf []byte) (string, int32, int) {
|
||||
txidUnpackedLen := d.chainParser.PackedTxidLen()
|
||||
txid, _ := d.chainParser.UnpackTxid(buf[:txidUnpackedLen])
|
||||
vout, o := unpackVarint32(buf[txidUnpackedLen:])
|
||||
return txid, vout, txidUnpackedLen + o
|
||||
}
|
||||
|
||||
//////////////////////////////////
|
||||
|
||||
// Block index
|
||||
|
||||
// GetBestBlock returns the block hash of the block with highest height in the db
|
||||
@ -1032,18 +780,7 @@ func (d *RocksDB) writeHeight(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, [][]outpoint, error) {
|
||||
b, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer b.Free()
|
||||
// block is missing in DB
|
||||
if b.Data() == nil {
|
||||
return nil, nil, errors.New("Block addresses missing")
|
||||
}
|
||||
return d.unpackBlockAddresses(b.Data())
|
||||
}
|
||||
// Disconnect blocks
|
||||
|
||||
func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) {
|
||||
glog.Infof("db: doing full scan of addresses column")
|
||||
@ -1090,93 +827,107 @@ func (d *RocksDB) allAddressesScan(lower uint32, higher uint32) ([][]byte, [][]b
|
||||
return addrKeys, addrValues, nil
|
||||
}
|
||||
|
||||
// DisconnectBlockRange removes all data belonging to blocks in range lower-higher
|
||||
// it finds the data in blockaddresses column if available,
|
||||
// otherwise by doing quite slow full scan of addresses column
|
||||
func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error {
|
||||
glog.Infof("db: disconnecting blocks %d-%d", lower, higher)
|
||||
addrKeys := [][]byte{}
|
||||
addrOutpoints := [][]byte{}
|
||||
addrUnspentOutpoints := [][]outpoint{}
|
||||
keep := d.chainParser.KeepBlockAddresses()
|
||||
var err error
|
||||
if keep > 0 {
|
||||
for height := lower; height <= higher; height++ {
|
||||
addresses, unspentOutpoints, err := d.getBlockAddresses(packUint(height))
|
||||
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, txa *txAddresses, txAddressesToUpdate map[string]*txAddresses, txsToDelete map[string]struct{}, balances map[string]*addrBalance) error {
|
||||
findBalance := func(addrID []byte) (*addrBalance, error) {
|
||||
var err error
|
||||
s := string(addrID)
|
||||
b, found := balances[s]
|
||||
if !found {
|
||||
b, err = d.getAddressBalance(addrID)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
for i, addrID := range addresses {
|
||||
addrKey := packAddressKey(addrID, height)
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
addrKeys = append(addrKeys, addrKey)
|
||||
av := append([]byte(nil), val.Data()...)
|
||||
val.Free()
|
||||
addrOutpoints = append(addrOutpoints, av)
|
||||
addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i])
|
||||
return nil, err
|
||||
}
|
||||
balances[s] = b
|
||||
}
|
||||
} else {
|
||||
addrKeys, addrOutpoints, err = d.allAddressesScan(lower, higher)
|
||||
return b, nil
|
||||
}
|
||||
for _, t := range txa.inputs {
|
||||
b, err := findBalance(t.addrID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != nil {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DisconnectBlockRangeUTXO removes all data belonging to blocks in range lower-higher
|
||||
// if they are in the range kept in the cfBlockTxs column
|
||||
func (d *RocksDB) DisconnectBlockRangeUTXO(lower uint32, higher uint32) error {
|
||||
glog.Infof("db: disconnecting blocks %d-%d", lower, higher)
|
||||
blocksTxids := make([][][]byte, higher-lower+1)
|
||||
for height := lower; height <= higher; height++ {
|
||||
blockTxids, err := d.getBlockTxids(height)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blockTxids) == 0 {
|
||||
return errors.Errorf("Cannot disconnect blocks with height %v and lower. It is necessary to rebuild index.", height)
|
||||
}
|
||||
blocksTxids[height-lower] = blockTxids
|
||||
}
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
txAddressesToUpdate := make(map[string]*txAddresses)
|
||||
txsToDelete := make(map[string]struct{})
|
||||
balances := make(map[string]*addrBalance)
|
||||
for height := higher; height >= lower; height-- {
|
||||
blockTxids := blocksTxids[height-lower]
|
||||
for _, txid := range blockTxids {
|
||||
txa, err := d.getTxAddresses(txid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s := string(txid)
|
||||
txsToDelete[s] = struct{}{}
|
||||
if err := d.disconnectTxAddresses(wb, height, s, txa, txAddressesToUpdate, txsToDelete, balances); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
key := packUint(height)
|
||||
wb.DeleteCF(d.cfh[cfBlockTxids], key)
|
||||
wb.DeleteCF(d.cfh[cfHeight], key)
|
||||
}
|
||||
d.storeTxAddresses(wb, txAddressesToUpdate)
|
||||
d.storeBalances(wb, balances)
|
||||
for s := range txsToDelete {
|
||||
b := []byte(s)
|
||||
wb.DeleteCF(d.cfh[cfTransactions], b)
|
||||
wb.DeleteCF(d.cfh[cfTxAddresses], b)
|
||||
}
|
||||
err := d.db.Write(d.wo, wb)
|
||||
if err == nil {
|
||||
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// DisconnectBlockRangeNonUTXO performs full range scan to remove a range of blocks
|
||||
// it is very slow operation
|
||||
func (d *RocksDB) DisconnectBlockRangeNonUTXO(lower uint32, higher uint32) error {
|
||||
glog.Infof("db: disconnecting blocks %d-%d", lower, higher)
|
||||
addrKeys, _, err := d.allAddressesScan(lower, higher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("rocksdb: about to disconnect %d addresses ", len(addrKeys))
|
||||
wb := gorocksdb.NewWriteBatch()
|
||||
defer wb.Destroy()
|
||||
unspentTxs := make(map[string][]byte)
|
||||
for addrIndex, addrKey := range addrKeys {
|
||||
for _, addrKey := range addrKeys {
|
||||
if glog.V(2) {
|
||||
glog.Info("address ", hex.EncodeToString(addrKey))
|
||||
}
|
||||
// delete address:height from the index
|
||||
wb.DeleteCF(d.cfh[cfAddresses], addrKey)
|
||||
addrID, _, err := unpackAddressKey(addrKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// recreate unspentTxs, which were spent by this block (that is being disconnected)
|
||||
for _, o := range addrUnspentOutpoints[addrIndex] {
|
||||
stxID := string(o.btxID)
|
||||
txAddrs, exists := unspentTxs[stxID]
|
||||
if !exists {
|
||||
txAddrs, err = d.getUnspentTx(o.btxID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
txAddrs = appendPackedAddrID(txAddrs, addrID, uint32(o.index), 1)
|
||||
unspentTxs[stxID] = txAddrs
|
||||
}
|
||||
// delete unspentTxs from this block
|
||||
outpoints, err := d.unpackOutpoints(addrOutpoints[addrIndex])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, o := range outpoints {
|
||||
wb.DeleteCF(d.cfh[cfUnspentTxs], o.btxID)
|
||||
d.internalDeleteTx(wb, o.btxID)
|
||||
}
|
||||
}
|
||||
for key, val := range unspentTxs {
|
||||
wb.PutCF(d.cfh[cfUnspentTxs], []byte(key), val)
|
||||
}
|
||||
for height := lower; height <= higher; height++ {
|
||||
if glog.V(2) {
|
||||
glog.Info("height ", height)
|
||||
}
|
||||
key := packUint(height)
|
||||
if keep > 0 {
|
||||
wb.DeleteCF(d.cfh[cfBlockAddresses], key)
|
||||
}
|
||||
wb.DeleteCF(d.cfh[cfHeight], key)
|
||||
wb.DeleteCF(d.cfh[cfHeight], packUint(height))
|
||||
}
|
||||
err = d.db.Write(d.wo, wb)
|
||||
if err == nil {
|
||||
|
||||
@ -632,9 +632,16 @@ func TestRocksDB_Index_UTXO(t *testing.T) {
|
||||
}
|
||||
verifyAfterUTXOBlock2(t, d)
|
||||
|
||||
// try to disconnect both blocks, however only the last one is kept, it is not possible
|
||||
err = d.DisconnectBlockRangeUTXO(225493, 225494)
|
||||
if err == nil || err.Error() != "Cannot disconnect blocks with height 225493 and lower. It is necessary to rebuild index." {
|
||||
t.Fatal(err)
|
||||
}
|
||||
verifyAfterUTXOBlock2(t, d)
|
||||
|
||||
// disconnect the 2nd block, verify that the db contains only data from the 1st block with restored unspentTxs
|
||||
// and that the cached tx is removed
|
||||
err = d.DisconnectBlockRange(225494, 225494)
|
||||
err = d.DisconnectBlockRangeUTXO(225494, 225494)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -648,153 +655,6 @@ func TestRocksDB_Index_UTXO(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func Test_findAndRemoveUnspentAddr(t *testing.T) {
|
||||
type args struct {
|
||||
unspentAddrs string
|
||||
vout uint32
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want string
|
||||
want2 string
|
||||
}{
|
||||
{
|
||||
name: "3",
|
||||
args: args{
|
||||
unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114",
|
||||
vout: 3,
|
||||
},
|
||||
want: "64635167006868",
|
||||
want2: "029c0010517a0115887452870212709393588893935687040e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114",
|
||||
},
|
||||
{
|
||||
name: "10",
|
||||
args: args{
|
||||
unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114",
|
||||
vout: 10,
|
||||
},
|
||||
want: "61",
|
||||
want2: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112",
|
||||
},
|
||||
{
|
||||
name: "not there",
|
||||
args: args{
|
||||
unspentAddrs: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114",
|
||||
vout: 11,
|
||||
},
|
||||
want: "",
|
||||
want2: "029c0010517a0115887452870212709393588893935687040e64635167006868060e76519351880087080a7b7b0115870a3276a9144150837fb91d9461d6b95059842ab85262c2923f88ac0c08636751680e04578710029112026114",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
b, err := hex.DecodeString(tt.args.unspentAddrs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
got, got2 := findAndRemoveUnspentAddr(b, tt.args.vout)
|
||||
h := hex.EncodeToString(got)
|
||||
if !reflect.DeepEqual(h, tt.want) {
|
||||
t.Errorf("findAndRemoveUnspentAddr() got = %v, want %v", h, tt.want)
|
||||
}
|
||||
h2 := hex.EncodeToString(got2)
|
||||
if !reflect.DeepEqual(h2, tt.want2) {
|
||||
t.Errorf("findAndRemoveUnspentAddr() got2 = %v, want %v", h2, tt.want2)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type hexoutpoint struct {
|
||||
txID string
|
||||
vout int32
|
||||
}
|
||||
|
||||
func Test_unpackBlockAddresses(t *testing.T) {
|
||||
d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}})
|
||||
defer closeAndDestroyRocksDB(t, d)
|
||||
type args struct {
|
||||
buf string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []string
|
||||
want2 [][]hexoutpoint
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "1",
|
||||
args: args{"029c0010517a011588745287047c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d250000b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400612709393588893935687000e64635167006868000e7651935188008702effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7502"},
|
||||
want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087"},
|
||||
want2: [][]hexoutpoint{
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{txidB2T1, 0},
|
||||
hexoutpoint{txidB1T1, 3},
|
||||
},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{txidB1T2, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "1",
|
||||
args: args{"3276A914B434EB0C1A3B7A02E8A29CC616E791EF1E0BF51F88AC003276A9143F8BA3FDA3BA7B69F5818086E12223C6DD25E3C888AC003276A914A08EAE93007F22668AB5E4A9C83C8CD1C325E3E088AC02EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75003276A9148BDF0AA3C567AA5975C2E61321B8BEBBE7293DF688AC0200B2C06055E5E90E9C82BD4181FDE310104391A7FA4F289B1704E5D90CAA3840022EA9144A21DB08FB6882CB152E1FF06780A430740F77048702EFFD9EF509383D536B1C8AF5BF434C8EFBF521A4F2BEFD4022BBD68694B4AC75023276A914CCAAAF374E1B06CB83118453D102587B4273D09588AC003276A9148D802C045445DF49613F6A70DDD2E48526F3701F88AC00"},
|
||||
want: []string{"76a914b434eb0c1a3b7a02e8a29cc616e791ef1e0bf51f88ac", "76a9143f8ba3fda3ba7b69f5818086e12223c6dd25e3c888ac", "76a914a08eae93007f22668ab5e4a9c83c8cd1c325e3e088ac", "76a9148bdf0aa3c567aa5975c2e61321b8bebbe7293df688ac", "a9144a21db08fb6882cb152e1ff06780a430740f770487", "76a914ccaaaf374e1b06cb83118453d102587b4273d09588ac", "76a9148d802c045445df49613f6a70ddd2e48526f3701f88ac"},
|
||||
want2: [][]hexoutpoint{
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{txidB1T2, 0},
|
||||
},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{txidB1T1, 1},
|
||||
},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{txidB1T2, 1},
|
||||
},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
b, err := hex.DecodeString(tt.args.buf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
got, got2, err := d.unpackBlockAddresses(b)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("unpackBlockAddresses() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
h := make([]string, len(got))
|
||||
for i, g := range got {
|
||||
h[i] = hex.EncodeToString(g)
|
||||
}
|
||||
if !reflect.DeepEqual(h, tt.want) {
|
||||
t.Errorf("unpackBlockAddresses() = %v, want %v", h, tt.want)
|
||||
}
|
||||
h2 := make([][]hexoutpoint, len(got2))
|
||||
for i, g := range got2 {
|
||||
ho := make([]hexoutpoint, len(g))
|
||||
for j, o := range g {
|
||||
ho[j] = hexoutpoint{hex.EncodeToString(o.btxID), o.index}
|
||||
}
|
||||
h2[i] = ho
|
||||
}
|
||||
if !reflect.DeepEqual(h2, tt.want2) {
|
||||
t.Errorf("unpackBlockAddresses() = %v, want %v", h2, tt.want2)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_packBigint_unpackBigint(t *testing.T) {
|
||||
bigbig1, _ := big.NewInt(0).SetString("123456789123456789012345", 10)
|
||||
bigbig2, _ := big.NewInt(0).SetString("12345678912345678901234512389012345123456789123456789012345123456789123456789012345", 10)
|
||||
|
||||
14
db/sync.go
14
db/sync.go
@ -346,25 +346,23 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) {
|
||||
}
|
||||
|
||||
// DisconnectBlocks removes all data belonging to blocks in range lower-higher,
|
||||
// using block data from blockchain, if they are available,
|
||||
// otherwise doing full scan
|
||||
func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []string) error {
|
||||
glog.Infof("sync: disconnecting blocks %d-%d", lower, higher)
|
||||
// if the chain uses Block to Addresses mapping, always use DisconnectBlockRange
|
||||
if w.chain.GetChainParser().KeepBlockAddresses() > 0 {
|
||||
return w.db.DisconnectBlockRange(lower, higher)
|
||||
// if the chain is UTXO, always use DisconnectBlockRange
|
||||
if w.chain.GetChainParser().IsUTXOChain() {
|
||||
return w.db.DisconnectBlockRangeUTXO(lower, higher)
|
||||
}
|
||||
blocks := make([]*bchain.Block, len(hashes))
|
||||
var err error
|
||||
// get all blocks first to see if we can avoid full scan
|
||||
// try to get all blocks first to see if we can avoid full scan
|
||||
for i, hash := range hashes {
|
||||
blocks[i], err = w.chain.GetBlock(hash, 0)
|
||||
if err != nil {
|
||||
// cannot get a block, we must do full range scan
|
||||
return w.db.DisconnectBlockRange(lower, higher)
|
||||
return w.db.DisconnectBlockRangeNonUTXO(lower, higher)
|
||||
}
|
||||
}
|
||||
// then disconnect one after another
|
||||
// got all blocks to be disconnected, disconnect them one after another
|
||||
for i, block := range blocks {
|
||||
glog.Info("Disconnecting block ", (int(higher) - i), " ", block.Hash)
|
||||
if err = w.db.DisconnectBlock(block); err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user