From f2dc4a56d8c476efd7bf163e34bac6a8de635d21 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 1 Apr 2019 17:00:34 +0200 Subject: [PATCH] Store time of mempool transaction --- bchain/coins/blockchain.go | 8 +-- bchain/coins/btc/bitcoinrpc.go | 3 +- bchain/coins/eth/ethrpc.go | 3 +- bchain/mempool_bitcoin_type.go | 44 ++++++------ bchain/mempool_ethereum_type.go | 123 +++++++++++++++++--------------- bchain/types.go | 4 +- blockbook.go | 10 +-- tests/dbtestdata/fakechain.go | 2 +- tests/integration.go | 2 +- tests/rpc/rpc.go | 2 +- 10 files changed, 109 insertions(+), 92 deletions(-) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 624b5d75..6f1913fc 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -152,8 +152,8 @@ func (c *blockChainWithMetrics) CreateMempool() (bchain.Mempool, error) { return c.b.CreateMempool() } -func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { - return c.b.InitializeMempool(addrDescForOutpoint) +func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { + return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr) } func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { @@ -293,9 +293,9 @@ func (c *mempoolWithMetrics) observeRPCLatency(method string, start time.Time, e c.m.RPCLatency.With(common.Labels{"method": method, "error": e}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds } -func (c *mempoolWithMetrics) Resync(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { +func (c *mempoolWithMetrics) Resync() (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) - count, err = c.mempool.Resync(onNewTxAddr) + count, err = c.mempool.Resync() if err == nil { c.m.MempoolSize.Set(float64(count)) } diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index a6c4cdf0..b3e17b75 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -139,11 +139,12 @@ func (b *BitcoinRPC) CreateMempool() (bchain.Mempool, error) { } // InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool -func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { +func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { if b.Mempool == nil { return errors.New("Mempool not created") } b.Mempool.AddrDescForOutpoint = addrDescForOutpoint + b.Mempool.OnNewTxAddr = onNewTxAddr if b.mq == nil { mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) if err != nil { diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 18643370..415174a4 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -172,10 +172,11 @@ func (b *EthereumRPC) CreateMempool() (bchain.Mempool, error) { } // InitializeMempool creates subscriptions to newHeads and newPendingTransactions -func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { +func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { if b.Mempool == nil { return errors.New("Mempool not created") } + b.Mempool.OnNewTxAddr = onNewTxAddr if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index ae13459e..887ecee1 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -12,6 +12,11 @@ type addrIndex struct { n int32 } +type txEntry struct { + addrIndexes []addrIndex + time uint32 +} + type txidio struct { txid string io []addrIndex @@ -21,11 +26,11 @@ type txidio struct { type MempoolBitcoinType struct { chain BlockChain mux sync.Mutex - txToInputOutput map[string][]addrIndex + txEntries map[string]txEntry addrDescToTx map[string][]Outpoint chanTxid chan string chanAddrIndex chan txidio - onNewTxAddr OnNewTxAddrFunc + OnNewTxAddr OnNewTxAddrFunc AddrDescForOutpoint AddrDescForOutpointFunc } @@ -79,10 +84,10 @@ func (m *MempoolBitcoinType) GetAddrDescTransactions(addrDesc AddressDescriptor) return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil } -func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) { +func (m *MempoolBitcoinType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) { m.mux.Lock() defer m.mux.Unlock() - m.txToInputOutput = newTxToInputOutput + m.txEntries = newTxEntries m.addrDescToTx = newAddrDescToTx } @@ -128,8 +133,8 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch if len(addrDesc) > 0 { io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } - if m.onNewTxAddr != nil { - m.onNewTxAddr(tx, addrDesc) + if m.OnNewTxAddr != nil { + m.OnNewTxAddr(tx, addrDesc) } } dispatched := 0 @@ -166,37 +171,37 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { +func (m *MempoolBitcoinType) Resync() (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") - m.onNewTxAddr = onNewTxAddr txs, err := m.chain.GetMempoolTransactions() if err != nil { return 0, err } glog.V(2).Info("mempool: resync ", len(txs), " txs") // allocate slightly larger capacity of the maps - newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) + newTxEntries := make(map[string]txEntry, len(m.txEntries)+5) newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) dispatched := 0 - onNewData := func(txid string, io []addrIndex) { - if len(io) > 0 { - newTxToInputOutput[txid] = io - for _, si := range io { + txTime := uint32(time.Now().Unix()) + onNewData := func(txid string, entry txEntry) { + if len(entry.addrIndexes) > 0 { + newTxEntries[txid] = entry + for _, si := range entry.addrIndexes { newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) } } } // get transaction in parallel using goroutines created in NewUTXOMempool for _, txid := range txs { - io, exists := m.txToInputOutput[txid] + io, exists := m.txEntries[txid] if !exists { loop: for { select { // store as many processed transactions as possible case tio := <-m.chanAddrIndex: - onNewData(tio.txid, tio.io) + onNewData(tio.txid, txEntry{tio.io, txTime}) dispatched-- // send transaction to be processed case m.chanTxid <- txid: @@ -210,10 +215,9 @@ func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { } for i := 0; i < dispatched; i++ { tio := <-m.chanAddrIndex - onNewData(tio.txid, tio.io) + onNewData(tio.txid, txEntry{tio.io, txTime}) } - m.updateMappings(newTxToInputOutput, newAddrDescToTx) - m.onNewTxAddr = nil - glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return len(m.txToInputOutput), nil + m.updateMappings(newTxEntries, newAddrDescToTx) + glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool") + return len(m.txEntries), nil } diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index 1f05c9c5..c6033de6 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -9,10 +9,11 @@ import ( // MempoolEthereumType is mempool handle of EthereumType chains type MempoolEthereumType struct { - chain BlockChain - mux sync.Mutex - txToInputOutput map[string][]addrIndex - addrDescToTx map[string][]Outpoint + chain BlockChain + mux sync.Mutex + txEntries map[string]txEntry + addrDescToTx map[string][]Outpoint + OnNewTxAddr OnNewTxAddrFunc } // NewMempoolEthereumType creates new mempool handler. @@ -37,10 +38,10 @@ func (m *MempoolEthereumType) GetAddrDescTransactions(addrDesc AddressDescriptor return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil } -func (m *MempoolEthereumType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) { +func (m *MempoolEthereumType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) { m.mux.Lock() defer m.mux.Unlock() - m.txToInputOutput = newTxToInputOutput + m.txEntries = newTxEntries m.addrDescToTx = newAddrDescToTx } @@ -56,73 +57,83 @@ func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) [ return io } +func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) { + tx, err := m.chain.GetTransactionForMempool(txid) + if err != nil { + if err != ErrTxNotFound { + glog.Warning("cannot get transaction ", txid, ": ", err) + } + return txEntry{}, false + } + parser := m.chain.GetChainParser() + addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) + for _, output := range tx.Vout { + addrDesc, err := parser.GetAddrDescFromVout(&output) + if err != nil { + if err != ErrAddressMissing { + glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err) + } + continue + } + if len(addrDesc) > 0 { + addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)}) + } + } + for _, input := range tx.Vin { + for i, a := range input.Addresses { + addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser) + } + } + t, err := parser.EthereumTypeGetErc20FromTx(tx) + if err != nil { + glog.Error("GetErc20FromTx for tx ", txid, ", ", err) + } else { + for i := range t { + addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser) + addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser) + } + } + if m.OnNewTxAddr != nil { + sent := make(map[string]struct{}) + for _, si := range addrIndexes { + if _, found := sent[si.addrDesc]; !found { + m.OnNewTxAddr(tx, AddressDescriptor(si.addrDesc)) + sent[si.addrDesc] = struct{}{} + } + } + } + return txEntry{addrIndexes: addrIndexes, time: txTime}, true +} + // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { +func (m *MempoolEthereumType) Resync() (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempoolTransactions() if err != nil { return 0, err } - parser := m.chain.GetChainParser() // allocate slightly larger capacity of the maps - newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) + newTxEntries := make(map[string]txEntry, len(m.txEntries)+5) newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) + txTime := uint32(time.Now().Unix()) + var ok bool for _, txid := range txs { - io, exists := m.txToInputOutput[txid] + entry, exists := m.txEntries[txid] if !exists { - tx, err := m.chain.GetTransactionForMempool(txid) - if err != nil { - if err != ErrTxNotFound { - glog.Warning("cannot get transaction ", txid, ": ", err) - } + entry, ok = m.createTxEntry(txid, txTime) + if !ok { continue } - io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) - for _, output := range tx.Vout { - addrDesc, err := parser.GetAddrDescFromVout(&output) - if err != nil { - if err != ErrAddressMissing { - glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err) - } - continue - } - if len(addrDesc) > 0 { - io = append(io, addrIndex{string(addrDesc), int32(output.N)}) - } - } - for _, input := range tx.Vin { - for i, a := range input.Addresses { - appendAddress(io, ^int32(i), a, parser) - } - } - t, err := parser.EthereumTypeGetErc20FromTx(tx) - if err != nil { - glog.Error("GetErc20FromTx for tx ", txid, ", ", err) - } else { - for i := range t { - io = appendAddress(io, ^int32(i+1), t[i].From, parser) - io = appendAddress(io, int32(i+1), t[i].To, parser) - } - } - if onNewTxAddr != nil { - sent := make(map[string]struct{}) - for _, si := range io { - if _, found := sent[si.addrDesc]; !found { - onNewTxAddr(tx, AddressDescriptor(si.addrDesc)) - sent[si.addrDesc] = struct{}{} - } - } - } } - newTxToInputOutput[txid] = io - for _, si := range io { + newTxEntries[txid] = entry + for _, si := range entry.addrIndexes { newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) } } - m.updateMappings(newTxToInputOutput, newAddrDescToTx) - glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return len(m.txToInputOutput), nil + m.updateMappings(newTxEntries, newAddrDescToTx) + glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool") + return len(m.txEntries), nil } diff --git a/bchain/types.go b/bchain/types.go index 0415f4b8..368965fa 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -202,7 +202,7 @@ type BlockChain interface { // create mempool but do not initialize it CreateMempool() (Mempool, error) // initialize mempool, create ZeroMQ (or other) subscription - InitializeMempool(AddrDescForOutpointFunc) error + InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error // shutdown mempool, ZeroMQ and block chain connections Shutdown(ctx context.Context) error // chain info @@ -278,7 +278,7 @@ type BlockChainParser interface { // Mempool defines common interface to mempool type Mempool interface { - Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) + Resync() (int, error) GetTransactions(address string) ([]Outpoint, error) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) } diff --git a/blockbook.go b/blockbook.go index e694bcc2..4630b263 100644 --- a/blockbook.go +++ b/blockbook.go @@ -231,13 +231,13 @@ func main() { if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType { addrDescForOutpoint = index.AddrDescForOutpoint } - err = chain.InitializeMempool(addrDescForOutpoint) + err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr) if err != nil { glog.Error("initializeMempool ", err) return } var mempoolCount int - if mempoolCount, err = mempool.Resync(nil); err != nil { + if mempoolCount, err = mempool.Resync(); err != nil { glog.Error("resyncMempool ", err) return } @@ -250,6 +250,8 @@ func main() { if publicServer != nil { // start full public interface + callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) + callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) publicServer.ConnectFullPublicInterface() } @@ -342,8 +344,6 @@ func startPublicServer() (*server.PublicServer, error) { } } }() - callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) - callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) return publicServer, err } @@ -474,7 +474,7 @@ func syncMempoolLoop() { // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { internalState.StartedMempoolSync() - if count, err := mempool.Resync(onNewTxAddr); err != nil { + if count, err := mempool.Resync(); err != nil { glog.Error("syncMempoolLoop ", errors.ErrorStack(err)) } else { internalState.FinishedMempoolSync(count) diff --git a/tests/dbtestdata/fakechain.go b/tests/dbtestdata/fakechain.go index 270403bb..d64968c7 100644 --- a/tests/dbtestdata/fakechain.go +++ b/tests/dbtestdata/fakechain.go @@ -25,7 +25,7 @@ func (c *fakeBlockChain) Initialize() error { return nil } -func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { +func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { return nil } diff --git a/tests/integration.go b/tests/integration.go index d1e70130..6d40b730 100644 --- a/tests/integration.go +++ b/tests/integration.go @@ -174,7 +174,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc return nil, nil, fmt.Errorf("Mempool creation failed: %s", err) } - err = cli.InitializeMempool(nil) + err = cli.InitializeMempool(nil, nil) if err != nil { return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err) } diff --git a/tests/rpc/rpc.go b/tests/rpc/rpc.go index c4122d3f..d33e6fb7 100644 --- a/tests/rpc/rpc.go +++ b/tests/rpc/rpc.go @@ -200,7 +200,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) { for i := 0; i < 3; i++ { txs := getMempool(t, h) - n, err := h.Mempool.Resync(nil) + n, err := h.Mempool.Resync() if err != nil { t.Fatal(err) }