diff --git a/bchain/coins/btc/bitcoinparser.go b/bchain/coins/btc/bitcoinparser.go index 692d1170..45f6b17a 100644 --- a/bchain/coins/btc/bitcoinparser.go +++ b/bchain/coins/btc/bitcoinparser.go @@ -33,22 +33,14 @@ func GetChainParams(chain string) *chaincfg.Params { return &chaincfg.MainNetParams } -func (p *BitcoinBlockParser) GetUIDFromVout(output *bchain.Vout) string { - return output.ScriptPubKey.Hex +func (p *BitcoinBlockParser) GetAddrIDFromVout(output *bchain.Vout) ([]byte, error) { + return hex.DecodeString(output.ScriptPubKey.Hex) } -func (p *BitcoinBlockParser) GetUIDFromAddress(address string) ([]byte, error) { +func (p *BitcoinBlockParser) GetAddrIDFromAddress(address string) ([]byte, error) { return p.AddressToOutputScript(address) } -func (p *BitcoinBlockParser) PackUID(str string) ([]byte, error) { - return hex.DecodeString(str) -} - -func (p *BitcoinBlockParser) UnpackUID(buf []byte) string { - return hex.EncodeToString(buf) -} - // AddressToOutputScript converts bitcoin address to ScriptPubKey func (p *BitcoinBlockParser) AddressToOutputScript(address string) ([]byte, error) { da, err := btcutil.DecodeAddress(address, p.Params) @@ -177,3 +169,7 @@ func (p *BitcoinBlockParser) UnpackTx(buf []byte) (*bchain.Tx, uint32, error) { tx.Blocktime = bt return tx, height, nil } + +func (p *BitcoinBlockParser) IsUTXOChain() bool { + return true +} diff --git a/bchain/coins/zec/zcashparser.go b/bchain/coins/zec/zcashparser.go index fab2b5ba..834817db 100644 --- a/bchain/coins/zec/zcashparser.go +++ b/bchain/coins/zec/zcashparser.go @@ -26,21 +26,13 @@ func GetChainParams(chain string) *chaincfg.Params { return &chaincfg.MainNetParams } -func (p *ZCashBlockParser) GetUIDFromVout(output *bchain.Vout) string { +func (p *ZCashBlockParser) GetAddrIDFromVout(output *bchain.Vout) ([]byte, error) { if len(output.ScriptPubKey.Addresses) != 1 { - return "" + return nil, nil } - return output.ScriptPubKey.Addresses[0] + return []byte(output.ScriptPubKey.Addresses[0]), nil } -func (p *ZCashBlockParser) GetUIDFromAddress(address string) ([]byte, error) { - return p.PackUID(address) -} - -func (p *ZCashBlockParser) PackUID(str string) ([]byte, error) { - return []byte(str), nil -} - -func (p *ZCashBlockParser) UnpackUID(buf []byte) string { - return string(buf) +func (p *ZCashBlockParser) GetAddrIDFromAddress(address string) ([]byte, error) { + return []byte(address), nil } diff --git a/bchain/mempool.go b/bchain/mempool.go index 9b88c51c..00c26766 100644 --- a/bchain/mempool.go +++ b/bchain/mempool.go @@ -28,7 +28,7 @@ type Mempool struct { chain BlockChain mux sync.Mutex txToInputOutput map[string]inputOutput - scriptToTx map[string][]outpoint // TODO rename all occurences + addrIDToTx map[string][]outpoint inputs map[outpoint]string } @@ -42,12 +42,11 @@ func (m *Mempool) GetTransactions(address string) ([]string, error) { m.mux.Lock() defer m.mux.Unlock() parser := m.chain.GetChainParser() - buf, err := parser.GetUIDFromAddress(address) + addrID, err := parser.GetAddrIDFromAddress(address) if err != nil { return nil, err } - outid := parser.UnpackUID(buf) - outpoints := m.scriptToTx[outid] + outpoints := m.addrIDToTx[string(addrID)] txs := make([]string, 0, len(outpoints)+len(outpoints)/2) for _, o := range outpoints { txs = append(txs, o.txid) @@ -69,7 +68,7 @@ func (m *Mempool) updateMappings(newTxToInputOutput map[string]inputOutput, newS m.mux.Lock() defer m.mux.Unlock() m.txToInputOutput = newTxToInputOutput - m.scriptToTx = newScriptToTx + m.addrIDToTx = newScriptToTx m.inputs = newInputs } @@ -85,7 +84,7 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error { } parser := m.chain.GetChainParser() newTxToInputOutput := make(map[string]inputOutput, len(m.txToInputOutput)+1) - newScriptToTx := make(map[string][]outpoint, len(m.scriptToTx)+1) + newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+1) newInputs := make(map[outpoint]string, len(m.inputs)+1) for _, txid := range txs { io, exists := m.txToInputOutput[txid] @@ -97,9 +96,13 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error { } io.outputs = make([]scriptIndex, 0, len(tx.Vout)) for _, output := range tx.Vout { - outid := parser.GetUIDFromVout(&output) - if outid != "" { - io.outputs = append(io.outputs, scriptIndex{outid, output.N}) + addrID, err := parser.GetAddrIDFromVout(&output) + if err != nil { + glog.Error("error in addrID in ", txid, " ", output.N, ": ", err) + continue + } + if len(addrID) > 0 { + io.outputs = append(io.outputs, scriptIndex{string(addrID), output.N}) } if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) @@ -115,13 +118,13 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error { } newTxToInputOutput[txid] = io for _, si := range io.outputs { - newScriptToTx[si.script] = append(newScriptToTx[si.script], outpoint{txid, si.n}) + newAddrIDToTx[si.script] = append(newAddrIDToTx[si.script], outpoint{txid, si.n}) } for _, i := range io.inputs { newInputs[i] = txid } } - m.updateMappings(newTxToInputOutput, newScriptToTx, newInputs) + m.updateMappings(newTxToInputOutput, newAddrIDToTx, newInputs) glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") return nil } diff --git a/bchain/types.go b/bchain/types.go index acd880aa..d6af63fb 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -113,14 +113,20 @@ type BlockChain interface { } type BlockChainParser interface { - GetUIDFromVout(output *Vout) string - GetUIDFromAddress(address string) ([]byte, error) - PackUID(script string) ([]byte, error) - UnpackUID(buf []byte) string + // self description + // UTXO chains need "inputs" column in db, that map transactions to transactions that spend them + // non UTXO chains have mapping of address to input and output transactions directly in "outputs" column in db + IsUTXOChain() bool + // address id conversions + GetAddrIDFromVout(output *Vout) ([]byte, error) + GetAddrIDFromAddress(address string) ([]byte, error) + // address to output script conversions AddressToOutputScript(address string) ([]byte, error) OutputScriptToAddresses(script []byte) ([]string, error) + // transactions ParseTx(b []byte) (*Tx, error) - ParseBlock(b []byte) (*Block, error) PackTx(tx *Tx, height uint32, blockTime int64) ([]byte, error) UnpackTx(buf []byte) (*Tx, uint32, error) + // blocks + ParseBlock(b []byte) (*Block, error) } diff --git a/db/rocksdb.go b/db/rocksdb.go index 50ce4438..759f3f6b 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -142,13 +142,13 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f if glog.V(1) { glog.Infof("rocksdb: address get %s %d-%d ", address, lower, higher) } - outid, err := d.chainParser.GetUIDFromAddress(address) + addrID, err := d.chainParser.GetAddrIDFromAddress(address) - kstart, err := packOutputKey(outid, lower) + kstart, err := packOutputKey(addrID, lower) if err != nil { return err } - kstop, err := packOutputKey(outid, higher) + kstop, err := packOutputKey(addrID, higher) if err != nil { return err } @@ -156,6 +156,8 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs]) defer it.Close() + isUTXO := d.chainParser.IsUTXOChain() + for it.Seek(kstart); it.Valid(); it.Next() { key := it.Key().Data() val := it.Value().Data() @@ -170,20 +172,31 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f glog.Infof("rocksdb: output %s: %s", hex.EncodeToString(key), hex.EncodeToString(val)) } for _, o := range outpoints { - if err := fn(o.txid, o.vout, true); err != nil { + var vout uint32 + var isOutput bool + if o.vout < 0 { + vout = uint32(^o.vout) + isOutput = false + } else { + vout = uint32(o.vout) + isOutput = true + } + if err := fn(o.txid, vout, isOutput); err != nil { return err } - stxid, so, err := d.GetSpentOutput(o.txid, o.vout) - if err != nil { - return err - } - if stxid != "" { - if glog.V(2) { - glog.Infof("rocksdb: input %s/%d: %s/%d", o.txid, o.vout, stxid, so) - } - if err := fn(stxid, so, false); err != nil { + if isUTXO { + stxid, so, err := d.GetSpentOutput(o.txid, o.vout) + if err != nil { return err } + if stxid != "" { + if glog.V(2) { + glog.Infof("rocksdb: input %s/%d: %s/%d", o.txid, o.vout, stxid, so) + } + if err := fn(stxid, uint32(so), false); err != nil { + return err + } + } } } } @@ -216,14 +229,18 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { } } + isUTXO := d.chainParser.IsUTXOChain() + if err := d.writeHeight(wb, block, op); err != nil { return err } - if err := d.writeOutputs(wb, block, op); err != nil { + if err := d.writeOutputs(wb, block, op, isUTXO); err != nil { return err } - if err := d.writeInputs(wb, block, op); err != nil { - return err + if isUTXO { + if err := d.writeInputs(wb, block, op); err != nil { + return err + } } return d.db.Write(d.wo, wb) @@ -233,55 +250,79 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { type outpoint struct { txid string - vout uint32 + vout int32 } -func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { +func (d *RocksDB) addAddrIDToRecords(op int, wb *gorocksdb.WriteBatch, records map[string][]outpoint, addrID []byte, txid string, vout int32, bh uint32) error { + if len(addrID) > 0 { + if len(addrID) > 1024 { + glog.Infof("block %d, skipping addrID of length %d", bh, len(addrID)) + } else { + strAddrID := string(addrID) + records[strAddrID] = append(records[strAddrID], outpoint{ + txid: txid, + vout: vout, + }) + if op == opDelete { + // remove transactions from cache + b, err := packTxid(txid) + if err != nil { + return err + } + wb.DeleteCF(d.cfh[cfTransactions], b) + } + } + } + return nil +} + +func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, isUTXO bool) error { records := make(map[string][]outpoint) for _, tx := range block.Txs { for _, output := range tx.Vout { - outid := d.chainParser.GetUIDFromVout(&output) - if outid != "" { - if len(outid) > 1024 { - glog.Infof("block %d, skipping outid of length %d", block.Height, len(outid)/2) - } else { - records[outid] = append(records[outid], outpoint{ - txid: tx.Txid, - vout: output.N, - }) - if op == opDelete { - // remove transactions from cache - b, err := packTxid(tx.Txid) - if err != nil { - return err - } - wb.DeleteCF(d.cfh[cfTransactions], b) + addrID, err := d.chainParser.GetAddrIDFromVout(&output) + if err != nil { + glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID) + continue + } + err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(output.N), block.Height) + if err != nil { + return err + } + } + if !isUTXO { + // store inputs in output column in format txid ^index + for _, input := range tx.Vin { + for i, a := range input.Addresses { + addrID, err := d.chainParser.GetAddrIDFromAddress(a) + if err != nil { + glog.Warningf("rocksdb: addrID: %v - %d %s", err, block.Height, addrID) + continue + } + err = d.addAddrIDToRecords(op, wb, records, addrID, tx.Txid, int32(^i), block.Height) + if err != nil { + return err } } } } } - for outid, outpoints := range records { - bOutid, err := d.chainParser.PackUID(outid) + for addrID, outpoints := range records { + key, err := packOutputKey([]byte(addrID), block.Height) if err != nil { - glog.Warningf("rocksdb: packUID: %v - %d %s", err, block.Height, outid) - continue - } - key, err := packOutputKey(bOutid, block.Height) - if err != nil { - glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, outid) - continue - } - val, err := packOutputValue(outpoints) - if err != nil { - glog.Warningf("rocksdb: packOutputValue: %v", err) + glog.Warningf("rocksdb: packOutputKey: %v - %d %s", err, block.Height, addrID) continue } switch op { case opInsert: + val, err := packOutputValue(outpoints) + if err != nil { + glog.Warningf("rocksdb: packOutputValue: %v", err) + continue + } wb.PutCF(d.cfh[cfOutputs], key, val) case opDelete: wb.DeleteCF(d.cfh[cfOutputs], key) @@ -306,7 +347,7 @@ func packOutputValue(outpoints []outpoint) ([]byte, error) { if err != nil { return nil, err } - bvout := packVaruint(o.vout) + bvout := packVarint(o.vout) buf = append(buf, btxid...) buf = append(buf, bvout...) } @@ -321,7 +362,7 @@ func unpackOutputValue(buf []byte) ([]outpoint, error) { return nil, err } i += txIdUnpackedLen - vout, voutLen := unpackVaruint(buf[i:]) + vout, voutLen := unpackVarint(buf[i:]) i += voutLen outpoints = append(outpoints, outpoint{ txid: txid, @@ -343,11 +384,11 @@ func (d *RocksDB) writeInputs( if input.Coinbase != "" { continue } - key, err := packOutpoint(input.Txid, input.Vout) + key, err := packOutpoint(input.Txid, int32(input.Vout)) if err != nil { return err } - val, err := packOutpoint(tx.Txid, uint32(i)) + val, err := packOutpoint(tx.Txid, int32(i)) if err != nil { return err } @@ -362,21 +403,21 @@ func (d *RocksDB) writeInputs( return nil } -func packOutpoint(txid string, vout uint32) ([]byte, error) { +func packOutpoint(txid string, vout int32) ([]byte, error) { btxid, err := packTxid(txid) if err != nil { return nil, err } - bvout := packVaruint(vout) + bvout := packVarint(vout) buf := make([]byte, 0, len(btxid)+len(bvout)) buf = append(buf, btxid...) buf = append(buf, bvout...) return buf, nil } -func unpackOutpoint(buf []byte) (string, uint32, int) { +func unpackOutpoint(buf []byte) (string, int32, int) { txid, _ := unpackTxid(buf[:txIdUnpackedLen]) - vout, o := unpackVaruint(buf[txIdUnpackedLen:]) + vout, o := unpackVarint(buf[txIdUnpackedLen:]) return txid, vout, txIdUnpackedLen + o } @@ -409,7 +450,7 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) { } // GetSpentOutput returns output which is spent by input tx -func (d *RocksDB) GetSpentOutput(txid string, i uint32) (string, uint32, error) { +func (d *RocksDB) GetSpentOutput(txid string, i int32) (string, int32, error) { b, err := packOutpoint(txid, i) if err != nil { return "", 0, err @@ -424,7 +465,7 @@ func (d *RocksDB) GetSpentOutput(txid string, i uint32) (string, uint32, error) return "", 0, err } var otxid string - var oi uint32 + var oi int32 for _, i := range p { otxid, oi = i.txid, i.vout break @@ -627,15 +668,15 @@ func unpackFloat64(buf []byte) float64 { return math.Float64frombits(binary.BigEndian.Uint64(buf)) } -func packVaruint(i uint32) []byte { +func packVarint(i int32) []byte { buf := make([]byte, vlq.MaxLen32) - ofs := vlq.PutUint(buf, uint64(i)) + ofs := vlq.PutInt(buf, int64(i)) return buf[:ofs] } -func unpackVaruint(buf []byte) (uint32, int) { +func unpackVarint(buf []byte) (int32, int) { i, ofs := vlq.Uint(buf) - return uint32(i), ofs + return int32(i), ofs } func packVarint64(i int64) []byte {