From 296eee828f48ff9e38530c2d068e9b6ac7bd676a Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 17 Apr 2018 23:50:01 +0200 Subject: [PATCH] Change the way UTXO addresses are indexed - WIP Columns before: outputs: saddress+block height -> outpoints inputs: txid+vout -> spending txid+vout Columns after change: addresses: address+block height -> input or output outpoints unspenttxs: txid -> addresses+indexes --- db/rocksdb.go | 316 ++++++++++++++++++++++++-------------------------- 1 file changed, 153 insertions(+), 163 deletions(-) diff --git a/db/rocksdb.go b/db/rocksdb.go index 959fd498..1dda824d 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -5,12 +5,9 @@ import ( "bytes" "encoding/binary" "encoding/hex" - "math" "os" "path/filepath" - "github.com/juju/errors" - "github.com/bsm/go-vlq" "github.com/golang/glog" @@ -40,12 +37,12 @@ type RocksDB struct { const ( cfDefault = iota cfHeight - cfOutputs - cfInputs + cfAddresses + cfUnspentTxs cfTransactions ) -var cfNames = []string{"default", "height", "outputs", "inputs", "transactions"} +var cfNames = []string{"default", "height", "addresses", "unspenttxs", "transactions"} func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { c := gorocksdb.NewLRUCache(8 << 30) // 8GB @@ -156,11 +153,9 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f return err } - it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs]) + it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses]) defer it.Close() - isUTXO := d.chainParser.IsUTXOChain() - for it.Seek(kstart); it.Valid(); it.Next() { key := it.Key().Data() val := it.Value().Data() @@ -187,20 +182,6 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f if err := fn(o.txid, vout, isOutput); err != nil { return err } - if isUTXO { - stxid, so, err := d.GetSpentOutput(o.txid, o.vout) - if err != nil { - return err - } - if stxid != "" { - if glog.V(2) { - glog.Infof("rocksdb: input %s/%d: %s/%d", o.txid, o.vout, stxid, so) - } - if err := fn(stxid, uint32(so), false); err != nil { - return err - } - } - } } } return nil @@ -237,11 +218,12 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { if err := d.writeHeight(wb, block, op); err != nil { return err } - if err := d.writeOutputs(wb, block, op, isUTXO); err != nil { - return err - } if isUTXO { - if err := d.writeInputs(wb, block, op); err != nil { + if err := d.writeAddressesUTXO(wb, block, op); err != nil { + return err + } + } else { + if err := d.writeAddressesNonUTXO(wb, block, op); err != nil { return err } } @@ -249,40 +231,149 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { return d.db.Write(d.wo, wb) } -// Output Index +// Addresses index type outpoint struct { txid string vout int32 } -func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, txid string, vout int32, bh uint32) error { +func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint) error { + for addrID, outpoints := range records { + key, err := packOutputKey([]byte(addrID), block.Height) + if err != nil { + glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID) + continue + } + switch op { + case opInsert: + val, err := d.packOutputValue(outpoints) + if err != nil { + glog.Warningf("rocksdb: packOutputValue: %v", err) + continue + } + wb.PutCF(d.cfh[cfAddresses], key, val) + case opDelete: + wb.DeleteCF(d.cfh[cfAddresses], key) + } + } + return nil +} + +func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, btxid []byte, vout int32, bh uint32) error { if len(addrID) > 0 { if len(addrID) > 1024 { glog.Infof("block %d, skipping addrID of length %d", bh, len(addrID)) } else { strAddrID := string(addrID) records[strAddrID] = append(records[strAddrID], outpoint{ - txid: txid, + txid: string(btxid), vout: vout, }) if op == opDelete { // remove transactions from cache - b, err := d.chainParser.PackTxid(txid) - if err != nil { - return err - } - wb.DeleteCF(d.cfh[cfTransactions], b) + wb.DeleteCF(d.cfh[cfTransactions], btxid) } } } return nil } -func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, isUTXO bool) error { - records := make(map[string][]outpoint) +func (d *RocksDB) getUnspentTx(btxID []byte) ([]byte, error) { + // find it in db, in the column cfUnspentTxs + val, err := d.db.GetCF(d.ro, d.cfh[cfUnspentTxs], btxID) + if err != nil { + return nil, err + } + defer val.Free() + data := append([]byte(nil), val.Data()...) + return data, nil +} +func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { + var err error + addresses := make(map[string][]outpoint) + unspentTxs := make(map[string][]byte) + btxIDs := make([][]byte, len(block.Txs)) + // first process all outputs, build mapping of addresses to outpoints and mapppings of unspent txs to addresses + for txi, tx := range block.Txs { + btxID, err := d.chainParser.PackTxid(tx.Txid) + if err != nil { + return err + } + btxIDs[txi] = btxID + // preallocate estimated size of addresses (32 bytes is 1 byte length of addrID, 25 bytes addrID, 1-2 bytes vout and reserve) + txAddrs := make([]byte, 0, len(tx.Vout)*32) + for i, output := range tx.Vout { + addrID, err := d.chainParser.GetAddrIDFromVout(&output) + if err != nil { + // do not log ErrAddressMissing, transactions can be without to address (for example eth contracts) + if err != bchain.ErrAddressMissing { + glog.Warningf("rocksdb: addrID: %v - height %d, tx %v, output %v", err, block.Height, tx.Txid, output) + } + continue + } + err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height) + if err != nil { + return err + } + // resize the addr buffer if necessary by a new estimate + if cap(txAddrs)-len(txAddrs) < 2*vlq.MaxLen32+len(addrID) { + txAddrs = append(txAddrs, make([]byte, vlq.MaxLen32+len(addrID)+(len(tx.Vout)-i)*32)...)[:len(txAddrs)] + } + // addrID is packed as number of bytes of the addrID + bytes of addrID + vout + lv := packVarint(int32(len(addrID)), txAddrs[len(txAddrs):]) + txAddrs = txAddrs[:len(txAddrs)+lv] + txAddrs = append(txAddrs, addrID...) + lv = packVarint(int32(output.N), txAddrs[len(txAddrs):]) + txAddrs = txAddrs[:len(txAddrs)+lv] + } + unspentTxs[tx.Txid] = txAddrs + // locate unspent txs/addresses and store them in format txid ^index + } + for txi, tx := range block.Txs { + btxID := btxIDs[txi] + // try to find the tx in current block + unspentAddrs, thisBlock := unspentTxs[string(btxID)] + if thisBlock { + + } else { + unspentAddrs, err = d.getUnspentTx(btxID) + if err != nil { + return err + } + if unspentAddrs == nil { + glog.Warningf("rocksdb: height %d, tx %v in inputs but missing in unspentTxs", block.Height, tx.Txid) + continue + } + } + + // for _, input := range tx.Vin { + // input.Vout + // } + } + if err := d.writeAddressRecords(wb, block, op, addresses); err != nil { + return err + } + // save unspent txs from current block + for tx, val := range unspentTxs { + switch op { + case opInsert: + wb.PutCF(d.cfh[cfUnspentTxs], []byte(tx), val) + case opDelete: + wb.DeleteCF(d.cfh[cfUnspentTxs], []byte(tx)) + } + } + return nil +} + +func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { + addresses := make(map[string][]outpoint) for _, tx := range block.Txs { + btxID, err := d.chainParser.PackTxid(tx.Txid) + if err != nil { + return err + } for _, output := range tx.Vout { addrID, err := d.chainParser.GetAddrIDFromVout(&output) if err != nil { @@ -292,50 +383,27 @@ func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op } continue } - err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(output.N), block.Height) + err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height) if err != nil { return err } } - if !isUTXO { - // store inputs in output column in format txid ^index - for _, input := range tx.Vin { - for i, a := range input.Addresses { - addrID, err := d.chainParser.GetAddrIDFromAddress(a) - if err != nil { - glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID) - continue - } - err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(^i), block.Height) - if err != nil { - return err - } + // store inputs in format txid ^index + for _, input := range tx.Vin { + for i, a := range input.Addresses { + addrID, err := d.chainParser.GetAddrIDFromAddress(a) + if err != nil { + glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID) + continue + } + err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(^i), block.Height) + if err != nil { + return err } } } } - - for addrID, outpoints := range records { - key, err := packOutputKey([]byte(addrID), block.Height) - if err != nil { - glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID) - continue - } - - switch op { - case opInsert: - val, err := d.packOutputValue(outpoints) - if err != nil { - glog.Warningf("rocksdb: packOutputValue: %v", err) - continue - } - wb.PutCF(d.cfh[cfOutputs], key, val) - case opDelete: - wb.DeleteCF(d.cfh[cfOutputs], key) - } - } - - return nil + return d.writeAddressRecords(wb, block, op, addresses) } func packOutputKey(outputScript []byte, height uint32) ([]byte, error) { @@ -353,9 +421,10 @@ func (d *RocksDB) packOutputValue(outpoints []outpoint) ([]byte, error) { if err != nil { return nil, err } - bvout := packVarint(o.vout) + bvout := make([]byte, vlq.MaxLen32) + l := packVarint(o.vout, bvout) buf = append(buf, btxid...) - buf = append(buf, bvout...) + buf = append(buf, bvout[:l]...) } return buf, nil } @@ -379,46 +448,16 @@ func (d *RocksDB) unpackOutputValue(buf []byte) ([]outpoint, error) { return outpoints, nil } -// Input index - -func (d *RocksDB) writeInputs( - wb *gorocksdb.WriteBatch, - block *bchain.Block, - op int, -) error { - for _, tx := range block.Txs { - for i, input := range tx.Vin { - if input.Coinbase != "" { - continue - } - key, err := d.packOutpoint(input.Txid, int32(input.Vout)) - if err != nil { - return err - } - val, err := d.packOutpoint(tx.Txid, int32(i)) - if err != nil { - return err - } - switch op { - case opInsert: - wb.PutCF(d.cfh[cfInputs], key, val) - case opDelete: - wb.DeleteCF(d.cfh[cfInputs], key) - } - } - } - return nil -} - func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) { btxid, err := d.chainParser.PackTxid(txid) if err != nil { return nil, err } - bvout := packVarint(vout) - buf := make([]byte, 0, len(btxid)+len(bvout)) + bv := make([]byte, vlq.MaxLen32) + l := packVarint(vout, bv) + buf := make([]byte, 0, l+len(btxid)) buf = append(buf, btxid...) - buf = append(buf, bvout...) + buf = append(buf, bv[:l]...) return buf, nil } @@ -457,30 +496,6 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) { return d.chainParser.UnpackBlockHash(val.Data()) } -// GetSpentOutput returns output which is spent by input tx -func (d *RocksDB) GetSpentOutput(txid string, i int32) (string, int32, error) { - b, err := d.packOutpoint(txid, i) - if err != nil { - return "", 0, err - } - val, err := d.db.GetCF(d.ro, d.cfh[cfInputs], b) - if err != nil { - return "", 0, err - } - defer val.Free() - p, err := d.unpackOutputValue(val.Data()) - if err != nil { - return "", 0, err - } - var otxid string - var oi int32 - for _, i := range p { - otxid, oi = i.txid, i.vout - break - } - return otxid, oi, nil -} - func (d *RocksDB) writeHeight( wb *gorocksdb.WriteBatch, block *bchain.Block, @@ -512,7 +527,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { var seekKey []byte for { var key []byte - it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs]) + it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses]) if totalOutputs == 0 { it.SeekToFirst() } else { @@ -552,7 +567,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { if glog.V(2) { glog.Info("output ", hex.EncodeToString(outputKeys[i])) } - wb.DeleteCF(d.cfh[cfOutputs], outputKeys[i]) + wb.DeleteCF(d.cfh[cfAddresses], outputKeys[i]) outpoints, err := d.unpackOutputValue(outputValues[i]) if err != nil { return err @@ -566,7 +581,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { if glog.V(2) { glog.Info("input ", hex.EncodeToString(boutpoint)) } - wb.DeleteCF(d.cfh[cfInputs], boutpoint) + wb.DeleteCF(d.cfh[cfUnspentTxs], boutpoint) // delete from txCache b, err := d.chainParser.PackTxid(o.txid) if err != nil { @@ -651,8 +666,6 @@ func (d *RocksDB) DeleteTx(txid string) error { // Helpers -var ErrInvalidAddress = errors.New("invalid address") - func packUint(i uint32) []byte { buf := make([]byte, 4) binary.BigEndian.PutUint32(buf, i) @@ -663,34 +676,11 @@ func unpackUint(buf []byte) uint32 { return binary.BigEndian.Uint32(buf) } -func packFloat64(f float64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, math.Float64bits(f)) - return buf -} - -func unpackFloat64(buf []byte) float64 { - return math.Float64frombits(binary.BigEndian.Uint64(buf)) -} - -func packVarint(i int32) []byte { - buf := make([]byte, vlq.MaxLen32) - ofs := vlq.PutInt(buf, int64(i)) - return buf[:ofs] +func packVarint(i int32, buf []byte) int { + return vlq.PutInt(buf, int64(i)) } func unpackVarint(buf []byte) (int32, int) { i, ofs := vlq.Uint(buf) return int32(i), ofs } - -func packVarint64(i int64) []byte { - buf := make([]byte, vlq.MaxLen64) - ofs := vlq.PutInt(buf, i) - return buf[:ofs] -} - -func unpackVarint64(buf []byte) (int64, int) { - i, ofs := vlq.Int(buf) - return i, ofs -}