From d1a047c667ce79de150fabd8a0a3e6850d446dc3 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 3 Apr 2018 15:51:38 +0200 Subject: [PATCH] Add non UTXO mempool implementation --- bchain/coins/btc/bitcoinrpc.go | 2 +- bchain/coins/zec/zcashrpc.go | 2 +- bchain/mempool_nonutxo.go | 105 +++++++++++++++++++++++++++++++++ bchain/mempool_utxo.go | 13 ++-- 4 files changed, 114 insertions(+), 8 deletions(-) create mode 100644 bchain/mempool_nonutxo.go diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index ecc25283..ce1e7f3a 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -73,7 +73,7 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT } func (b *BitcoinRPC) Initialize() error { - b.Mempool = bchain.NewMempool(b) + b.Mempool = bchain.NewUTXOMempool(b) chainName, err := b.GetBlockChainInfo() if err != nil { diff --git a/bchain/coins/zec/zcashrpc.go b/bchain/coins/zec/zcashrpc.go index d0243644..ec20eb1e 100644 --- a/bchain/coins/zec/zcashrpc.go +++ b/bchain/coins/zec/zcashrpc.go @@ -25,7 +25,7 @@ func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp } func (z *ZCashRPC) Initialize() error { - z.Mempool = bchain.NewMempool(z) + z.Mempool = bchain.NewUTXOMempool(z) z.Parser = &ZCashBlockParser{} z.Testnet = false z.Network = "livenet" diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go new file mode 100644 index 00000000..d30149f6 --- /dev/null +++ b/bchain/mempool_nonutxo.go @@ -0,0 +1,105 @@ +package bchain + +import ( + "sync" + "time" + + "github.com/golang/glog" +) + +// NonUTXOMempool is mempool handle of non UTXO chains +type NonUTXOMempool struct { + chain BlockChain + mux sync.Mutex + txToInputOutput map[string][]addrIndex + addrIDToTx map[string][]outpoint +} + +// NewNonUTXOMempool creates new mempool handler. +func NewNonUTXOMempool(chain BlockChain) *NonUTXOMempool { + return &NonUTXOMempool{chain: chain} +} + +// GetTransactions returns slice of mempool transactions for given address +func (m *NonUTXOMempool) GetTransactions(address string) ([]string, error) { + m.mux.Lock() + defer m.mux.Unlock() + parser := m.chain.GetChainParser() + addrID, err := parser.GetAddrIDFromAddress(address) + if err != nil { + return nil, err + } + outpoints := m.addrIDToTx[string(addrID)] + txs := make([]string, 0, len(outpoints)) + for _, o := range outpoints { + txs = append(txs, o.txid) + } + return txs, nil +} + +func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrIDToTx map[string][]outpoint) { + m.mux.Lock() + defer m.mux.Unlock() + m.txToInputOutput = newTxToInputOutput + m.addrIDToTx = newAddrIDToTx +} + +// Resync gets mempool transactions and maps outputs to transactions. +// Resync is not reentrant, it should be called from a single thread. +// Read operations (GetTransactions) are safe. +func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { + start := time.Now() + glog.V(1).Info("Mempool: resync") + txs, err := m.chain.GetMempool() + if err != nil { + return err + } + parser := m.chain.GetChainParser() + newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+1) + newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+1) + for _, txid := range txs { + io, exists := m.txToInputOutput[txid] + if !exists { + tx, err := m.chain.GetTransaction(txid) + if err != nil { + glog.Error("cannot get transaction ", txid, ": ", err) + continue + } + io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) + for _, output := range tx.Vout { + addrID, err := parser.GetAddrIDFromVout(&output) + if err != nil { + if err != ErrAddressMissing { + glog.Error("error in output addrID in ", txid, " ", output.N, ": ", err) + } + continue + } + if len(addrID) > 0 { + io = append(io, addrIndex{string(addrID), int32(output.N)}) + } + if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { + onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + } + } + for _, input := range tx.Vin { + for i, a := range input.Addresses { + if len(a) > 0 { + addrID, err := parser.GetAddrIDFromAddress(a) + if err != nil { + glog.Error("error in input addrID in ", txid, " ", a, ": ", err) + continue + } + io = append(io, addrIndex{string(addrID), int32(^i)}) + } + } + } + } + newTxToInputOutput[txid] = io + for _, si := range io { + newAddrIDToTx[si.addrID] = append(newAddrIDToTx[si.addrID], outpoint{txid, si.n}) + } + } + m.updateMappings(newTxToInputOutput, newAddrIDToTx) + glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") + return nil +} diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index e40f2560..fbd73724 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -7,14 +7,15 @@ import ( "github.com/golang/glog" ) +// addrIndex and outpoint are used also in nonutxo mempool type addrIndex struct { addrID string - n uint32 + n int32 } type outpoint struct { txid string - vout uint32 + vout int32 } type inputOutput struct { @@ -32,7 +33,7 @@ type UTXOMempool struct { } // NewMempool creates new mempool handler. -func NewMempool(chain BlockChain) *UTXOMempool { +func NewUTXOMempool(chain BlockChain) *UTXOMempool { return &UTXOMempool{chain: chain} } @@ -59,7 +60,7 @@ func (m *UTXOMempool) GetTransactions(address string) ([]string, error) { // GetSpentOutput returns transaction which spends given outpoint func (m *UTXOMempool) GetSpentOutput(outputTxid string, vout uint32) string { - o := outpoint{txid: outputTxid, vout: vout} + o := outpoint{txid: outputTxid, vout: int32(vout)} return m.inputs[o] } @@ -101,7 +102,7 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { continue } if len(addrID) > 0 { - io.outputs = append(io.outputs, addrIndex{string(addrID), output.N}) + io.outputs = append(io.outputs, addrIndex{string(addrID), int32(output.N)}) } if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) @@ -112,7 +113,7 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { if input.Coinbase != "" { continue } - io.inputs = append(io.inputs, outpoint{input.Txid, input.Vout}) + io.inputs = append(io.inputs, outpoint{input.Txid, int32(input.Vout)}) } } newTxToInputOutput[txid] = io