Change the way UTXO addresses are indexed - WIP

Columns before:
outputs: saddress+block height -> outpoints
inputs: txid+vout -> spending txid+vout

Columns after change:
addresses: address+block height -> input or output outpoints
unspenttxs: txid -> addresses+indexes
This commit is contained in:
Martin Boehm 2018-04-17 23:50:01 +02:00
parent 26de94495c
commit 296eee828f

View File

@ -5,12 +5,9 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"math"
"os"
"path/filepath"
"github.com/juju/errors"
"github.com/bsm/go-vlq"
"github.com/golang/glog"
@ -40,12 +37,12 @@ type RocksDB struct {
const (
cfDefault = iota
cfHeight
cfOutputs
cfInputs
cfAddresses
cfUnspentTxs
cfTransactions
)
var cfNames = []string{"default", "height", "outputs", "inputs", "transactions"}
var cfNames = []string{"default", "height", "addresses", "unspenttxs", "transactions"}
func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
c := gorocksdb.NewLRUCache(8 << 30) // 8GB
@ -156,11 +153,9 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f
return err
}
it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs])
it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses])
defer it.Close()
isUTXO := d.chainParser.IsUTXOChain()
for it.Seek(kstart); it.Valid(); it.Next() {
key := it.Key().Data()
val := it.Value().Data()
@ -187,20 +182,6 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f
if err := fn(o.txid, vout, isOutput); err != nil {
return err
}
if isUTXO {
stxid, so, err := d.GetSpentOutput(o.txid, o.vout)
if err != nil {
return err
}
if stxid != "" {
if glog.V(2) {
glog.Infof("rocksdb: input %s/%d: %s/%d", o.txid, o.vout, stxid, so)
}
if err := fn(stxid, uint32(so), false); err != nil {
return err
}
}
}
}
}
return nil
@ -237,11 +218,12 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
if err := d.writeHeight(wb, block, op); err != nil {
return err
}
if err := d.writeOutputs(wb, block, op, isUTXO); err != nil {
return err
}
if isUTXO {
if err := d.writeInputs(wb, block, op); err != nil {
if err := d.writeAddressesUTXO(wb, block, op); err != nil {
return err
}
} else {
if err := d.writeAddressesNonUTXO(wb, block, op); err != nil {
return err
}
}
@ -249,40 +231,149 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
return d.db.Write(d.wo, wb)
}
// Output Index
// Addresses index
type outpoint struct {
txid string
vout int32
}
func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, txid string, vout int32, bh uint32) error {
func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint) error {
for addrID, outpoints := range records {
key, err := packOutputKey([]byte(addrID), block.Height)
if err != nil {
glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID)
continue
}
switch op {
case opInsert:
val, err := d.packOutputValue(outpoints)
if err != nil {
glog.Warningf("rocksdb: packOutputValue: %v", err)
continue
}
wb.PutCF(d.cfh[cfAddresses], key, val)
case opDelete:
wb.DeleteCF(d.cfh[cfAddresses], key)
}
}
return nil
}
func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, btxid []byte, vout int32, bh uint32) error {
if len(addrID) > 0 {
if len(addrID) > 1024 {
glog.Infof("block %d, skipping addrID of length %d", bh, len(addrID))
} else {
strAddrID := string(addrID)
records[strAddrID] = append(records[strAddrID], outpoint{
txid: txid,
txid: string(btxid),
vout: vout,
})
if op == opDelete {
// remove transactions from cache
b, err := d.chainParser.PackTxid(txid)
if err != nil {
return err
}
wb.DeleteCF(d.cfh[cfTransactions], b)
wb.DeleteCF(d.cfh[cfTransactions], btxid)
}
}
}
return nil
}
func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, isUTXO bool) error {
records := make(map[string][]outpoint)
func (d *RocksDB) getUnspentTx(btxID []byte) ([]byte, error) {
// find it in db, in the column cfUnspentTxs
val, err := d.db.GetCF(d.ro, d.cfh[cfUnspentTxs], btxID)
if err != nil {
return nil, err
}
defer val.Free()
data := append([]byte(nil), val.Data()...)
return data, nil
}
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
var err error
addresses := make(map[string][]outpoint)
unspentTxs := make(map[string][]byte)
btxIDs := make([][]byte, len(block.Txs))
// first process all outputs, build mapping of addresses to outpoints and mapppings of unspent txs to addresses
for txi, tx := range block.Txs {
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return err
}
btxIDs[txi] = btxID
// preallocate estimated size of addresses (32 bytes is 1 byte length of addrID, 25 bytes addrID, 1-2 bytes vout and reserve)
txAddrs := make([]byte, 0, len(tx.Vout)*32)
for i, output := range tx.Vout {
addrID, err := d.chainParser.GetAddrIDFromVout(&output)
if err != nil {
// do not log ErrAddressMissing, transactions can be without to address (for example eth contracts)
if err != bchain.ErrAddressMissing {
glog.Warningf("rocksdb: addrID: %v - height %d, tx %v, output %v", err, block.Height, tx.Txid, output)
}
continue
}
err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height)
if err != nil {
return err
}
// resize the addr buffer if necessary by a new estimate
if cap(txAddrs)-len(txAddrs) < 2*vlq.MaxLen32+len(addrID) {
txAddrs = append(txAddrs, make([]byte, vlq.MaxLen32+len(addrID)+(len(tx.Vout)-i)*32)...)[:len(txAddrs)]
}
// addrID is packed as number of bytes of the addrID + bytes of addrID + vout
lv := packVarint(int32(len(addrID)), txAddrs[len(txAddrs):])
txAddrs = txAddrs[:len(txAddrs)+lv]
txAddrs = append(txAddrs, addrID...)
lv = packVarint(int32(output.N), txAddrs[len(txAddrs):])
txAddrs = txAddrs[:len(txAddrs)+lv]
}
unspentTxs[tx.Txid] = txAddrs
// locate unspent txs/addresses and store them in format txid ^index
}
for txi, tx := range block.Txs {
btxID := btxIDs[txi]
// try to find the tx in current block
unspentAddrs, thisBlock := unspentTxs[string(btxID)]
if thisBlock {
} else {
unspentAddrs, err = d.getUnspentTx(btxID)
if err != nil {
return err
}
if unspentAddrs == nil {
glog.Warningf("rocksdb: height %d, tx %v in inputs but missing in unspentTxs", block.Height, tx.Txid)
continue
}
}
// for _, input := range tx.Vin {
// input.Vout
// }
}
if err := d.writeAddressRecords(wb, block, op, addresses); err != nil {
return err
}
// save unspent txs from current block
for tx, val := range unspentTxs {
switch op {
case opInsert:
wb.PutCF(d.cfh[cfUnspentTxs], []byte(tx), val)
case opDelete:
wb.DeleteCF(d.cfh[cfUnspentTxs], []byte(tx))
}
}
return nil
}
func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
addresses := make(map[string][]outpoint)
for _, tx := range block.Txs {
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return err
}
for _, output := range tx.Vout {
addrID, err := d.chainParser.GetAddrIDFromVout(&output)
if err != nil {
@ -292,50 +383,27 @@ func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op
}
continue
}
err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(output.N), block.Height)
err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(output.N), block.Height)
if err != nil {
return err
}
}
if !isUTXO {
// store inputs in output column in format txid ^index
for _, input := range tx.Vin {
for i, a := range input.Addresses {
addrID, err := d.chainParser.GetAddrIDFromAddress(a)
if err != nil {
glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID)
continue
}
err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(^i), block.Height)
if err != nil {
return err
}
// store inputs in format txid ^index
for _, input := range tx.Vin {
for i, a := range input.Addresses {
addrID, err := d.chainParser.GetAddrIDFromAddress(a)
if err != nil {
glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID)
continue
}
err = d.addAddrIDToRecords(op, wb, addresses, addrID, btxID, int32(^i), block.Height)
if err != nil {
return err
}
}
}
}
for addrID, outpoints := range records {
key, err := packOutputKey([]byte(addrID), block.Height)
if err != nil {
glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID)
continue
}
switch op {
case opInsert:
val, err := d.packOutputValue(outpoints)
if err != nil {
glog.Warningf("rocksdb: packOutputValue: %v", err)
continue
}
wb.PutCF(d.cfh[cfOutputs], key, val)
case opDelete:
wb.DeleteCF(d.cfh[cfOutputs], key)
}
}
return nil
return d.writeAddressRecords(wb, block, op, addresses)
}
func packOutputKey(outputScript []byte, height uint32) ([]byte, error) {
@ -353,9 +421,10 @@ func (d *RocksDB) packOutputValue(outpoints []outpoint) ([]byte, error) {
if err != nil {
return nil, err
}
bvout := packVarint(o.vout)
bvout := make([]byte, vlq.MaxLen32)
l := packVarint(o.vout, bvout)
buf = append(buf, btxid...)
buf = append(buf, bvout...)
buf = append(buf, bvout[:l]...)
}
return buf, nil
}
@ -379,46 +448,16 @@ func (d *RocksDB) unpackOutputValue(buf []byte) ([]outpoint, error) {
return outpoints, nil
}
// Input index
func (d *RocksDB) writeInputs(
wb *gorocksdb.WriteBatch,
block *bchain.Block,
op int,
) error {
for _, tx := range block.Txs {
for i, input := range tx.Vin {
if input.Coinbase != "" {
continue
}
key, err := d.packOutpoint(input.Txid, int32(input.Vout))
if err != nil {
return err
}
val, err := d.packOutpoint(tx.Txid, int32(i))
if err != nil {
return err
}
switch op {
case opInsert:
wb.PutCF(d.cfh[cfInputs], key, val)
case opDelete:
wb.DeleteCF(d.cfh[cfInputs], key)
}
}
}
return nil
}
func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) {
btxid, err := d.chainParser.PackTxid(txid)
if err != nil {
return nil, err
}
bvout := packVarint(vout)
buf := make([]byte, 0, len(btxid)+len(bvout))
bv := make([]byte, vlq.MaxLen32)
l := packVarint(vout, bv)
buf := make([]byte, 0, l+len(btxid))
buf = append(buf, btxid...)
buf = append(buf, bvout...)
buf = append(buf, bv[:l]...)
return buf, nil
}
@ -457,30 +496,6 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
return d.chainParser.UnpackBlockHash(val.Data())
}
// GetSpentOutput returns output which is spent by input tx
func (d *RocksDB) GetSpentOutput(txid string, i int32) (string, int32, error) {
b, err := d.packOutpoint(txid, i)
if err != nil {
return "", 0, err
}
val, err := d.db.GetCF(d.ro, d.cfh[cfInputs], b)
if err != nil {
return "", 0, err
}
defer val.Free()
p, err := d.unpackOutputValue(val.Data())
if err != nil {
return "", 0, err
}
var otxid string
var oi int32
for _, i := range p {
otxid, oi = i.txid, i.vout
break
}
return otxid, oi, nil
}
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *bchain.Block,
@ -512,7 +527,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error {
var seekKey []byte
for {
var key []byte
it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs])
it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses])
if totalOutputs == 0 {
it.SeekToFirst()
} else {
@ -552,7 +567,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error {
if glog.V(2) {
glog.Info("output ", hex.EncodeToString(outputKeys[i]))
}
wb.DeleteCF(d.cfh[cfOutputs], outputKeys[i])
wb.DeleteCF(d.cfh[cfAddresses], outputKeys[i])
outpoints, err := d.unpackOutputValue(outputValues[i])
if err != nil {
return err
@ -566,7 +581,7 @@ func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error {
if glog.V(2) {
glog.Info("input ", hex.EncodeToString(boutpoint))
}
wb.DeleteCF(d.cfh[cfInputs], boutpoint)
wb.DeleteCF(d.cfh[cfUnspentTxs], boutpoint)
// delete from txCache
b, err := d.chainParser.PackTxid(o.txid)
if err != nil {
@ -651,8 +666,6 @@ func (d *RocksDB) DeleteTx(txid string) error {
// Helpers
var ErrInvalidAddress = errors.New("invalid address")
func packUint(i uint32) []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, i)
@ -663,34 +676,11 @@ func unpackUint(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
func packFloat64(f float64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, math.Float64bits(f))
return buf
}
func unpackFloat64(buf []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(buf))
}
func packVarint(i int32) []byte {
buf := make([]byte, vlq.MaxLen32)
ofs := vlq.PutInt(buf, int64(i))
return buf[:ofs]
func packVarint(i int32, buf []byte) int {
return vlq.PutInt(buf, int64(i))
}
func unpackVarint(buf []byte) (int32, int) {
i, ofs := vlq.Uint(buf)
return int32(i), ofs
}
func packVarint64(i int64) []byte {
buf := make([]byte, vlq.MaxLen64)
ofs := vlq.PutInt(buf, i)
return buf[:ofs]
}
func unpackVarint64(buf []byte) (int64, int) {
i, ofs := vlq.Int(buf)
return i, ofs
}