From 3f973bf47d308c4ac563543ceba97a90c342ef5d Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Thu, 4 Apr 2019 23:35:38 +0200 Subject: [PATCH] Implement new ethereum mempool sync with tx timeout --- bchain/basemempool.go | 24 +++---- bchain/coins/eth/ethrpc.go | 94 ++++++++++++++------------- bchain/mempool_ethereum_type.go | 108 ++++++++++++++++++++++++-------- 3 files changed, 140 insertions(+), 86 deletions(-) diff --git a/bchain/basemempool.go b/bchain/basemempool.go index 01e1b90b..807f1ed6 100644 --- a/bchain/basemempool.go +++ b/bchain/basemempool.go @@ -67,33 +67,33 @@ func (a MempoolTxidEntries) Less(i, j int) bool { func (m *BaseMempool) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) { m.mux.Lock() - defer m.mux.Unlock() m.txEntries = newTxEntries m.addrDescToTx = newAddrDescToTx + m.mux.Unlock() } -func getAllEntries(txEntries map[string]txEntry) MempoolTxidEntries { - a := make(MempoolTxidEntries, len(txEntries)) +// GetAllEntries returns all mempool entries sorted by fist seen time in descending order +func (m *BaseMempool) GetAllEntries() MempoolTxidEntries { + entries := make(MempoolTxidEntries, len(m.txEntries)) i := 0 - for txid, entry := range txEntries { - a[i] = MempoolTxidEntry{ + m.mux.Lock() + for txid, entry := range m.txEntries { + entries[i] = MempoolTxidEntry{ Txid: txid, Time: entry.time, } i++ } - sort.Sort(a) - return a -} - -// GetAllEntries returns all mempool entries sorted by fist seen time in descending order -func (m *BaseMempool) GetAllEntries() MempoolTxidEntries { - return getAllEntries(m.txEntries) + m.mux.Unlock() + sort.Sort(entries) + return entries } // GetTransactionTime returns first seen time of a transaction func (m *BaseMempool) GetTransactionTime(txid string) uint32 { + m.mux.Lock() e, found := m.txEntries[txid] + m.mux.Unlock() if !found { return 0 } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 5b0d3546..9656e6c3 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -41,22 +41,21 @@ type Configuration struct { // EthereumRPC is an interface to JSON-RPC eth service. type EthereumRPC struct { *bchain.BaseChain - client *ethclient.Client - rpc *rpc.Client - timeout time.Duration - Parser *EthereumParser - Mempool *bchain.MempoolEthereumType - bestHeaderLock sync.Mutex - bestHeader *ethtypes.Header - bestHeaderTime time.Time - chanNewBlock chan *ethtypes.Header - newBlockSubscription *rpc.ClientSubscription - chanNewTx chan ethcommon.Hash - newTxSubscription *rpc.ClientSubscription - pendingTransactions map[string]struct{} - pendingTransactionsLock sync.Mutex - ChainConfig *Configuration - isETC bool + client *ethclient.Client + rpc *rpc.Client + timeout time.Duration + Parser *EthereumParser + Mempool *bchain.MempoolEthereumType + mempoolInitialized bool + bestHeaderLock sync.Mutex + bestHeader *ethtypes.Header + bestHeaderTime time.Time + chanNewBlock chan *ethtypes.Header + newBlockSubscription *rpc.ClientSubscription + chanNewTx chan ethcommon.Hash + newTxSubscription *rpc.ClientSubscription + ChainConfig *Configuration + isETC bool } // NewEthereumRPC returns new EthRPC instance. @@ -78,11 +77,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification ec := ethclient.NewClient(rc) s := &EthereumRPC{ - BaseChain: &bchain.BaseChain{}, - client: ec, - rpc: rc, - ChainConfig: &c, - pendingTransactions: make(map[string]struct{}), + BaseChain: &bchain.BaseChain{}, + client: ec, + rpc: rc, + ChainConfig: &c, } // always create parser @@ -125,9 +123,7 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification if glog.V(2) { glog.Info("rpc: new tx ", hex) } - s.pendingTransactionsLock.Lock() - s.pendingTransactions[hex] = struct{}{} - s.pendingTransactionsLock.Unlock() + s.Mempool.AddTransactionToMempool(hex) pushHandler(bchain.NotificationNewTx) } }() @@ -176,7 +172,18 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu if b.Mempool == nil { return errors.New("Mempool not created") } + + // get initial mempool transactions + txs, err := b.GetMempoolTransactions() + if err != nil { + return err + } + for _, txid := range txs { + b.Mempool.AddTransactionToMempool(txid) + } + b.Mempool.OnNewTxAddr = onNewTxAddr + if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { @@ -213,6 +220,9 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu }); err != nil { return err } + + b.mempoolInitialized = true + return nil } @@ -495,9 +505,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash) } btxs[i] = *btx - b.pendingTransactionsLock.Lock() - delete(b.pendingTransactions, tx.Hash) - b.pendingTransactionsLock.Unlock() + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(tx.Hash) + } } bbk := bchain.Block{ BlockHeader: *bbh, @@ -535,14 +545,7 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) { // GetTransactionForMempool returns a transaction by the transaction ID. // It could be optimized for mempool, i.e. without block time and confirmations func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) { - tx, err := b.GetTransaction(txid) - // if there is an error getting the tx or the tx is confirmed, remove it from pending transactions - if err == bchain.ErrTxNotFound || (tx != nil && tx.Confirmations > 0) { - b.pendingTransactionsLock.Lock() - delete(b.pendingTransactions, txid) - b.pendingTransactionsLock.Unlock() - } - return tx, err + return b.GetTransaction(txid) } // GetTransaction returns a transaction by the transaction ID. @@ -555,6 +558,9 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, err } else if tx == nil { + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(txid) + } return nil, bchain.ErrTxNotFound } var btx *bchain.Tx @@ -621,6 +627,10 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } + // remove tx from mempool if it is there + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(txid) + } } return btx, nil } @@ -654,19 +664,7 @@ func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) { return nil, err } } - b.pendingTransactionsLock.Lock() - // join transactions returned by getBlockRaw with pendingTransactions from subscription - for _, txid := range body.Transactions { - b.pendingTransactions[txid] = struct{}{} - } - txids := make([]string, len(b.pendingTransactions)) - i := 0 - for txid := range b.pendingTransactions { - txids[i] = txid - i++ - } - b.pendingTransactionsLock.Unlock() - return txids, nil + return body.Transactions, nil } // EstimateFee returns fee estimation diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index 315f166a..52f176c5 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -6,14 +6,25 @@ import ( "github.com/golang/glog" ) +const mempoolTimeoutTime = 24 * time.Hour +const mempoolTimeoutRunPeriod = 10 * time.Minute + // MempoolEthereumType is mempool handle of EthereumType chains type MempoolEthereumType struct { BaseMempool + nextTimeoutRun time.Time } // NewMempoolEthereumType creates new mempool handler. func NewMempoolEthereumType(chain BlockChain) *MempoolEthereumType { - return &MempoolEthereumType{BaseMempool: BaseMempool{chain: chain}} + return &MempoolEthereumType{ + BaseMempool: BaseMempool{ + chain: chain, + txEntries: make(map[string]txEntry), + addrDescToTx: make(map[string][]Outpoint), + }, + nextTimeoutRun: time.Now().Add(mempoolTimeoutTime), + } } func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex { @@ -76,35 +87,80 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry return txEntry{addrIndexes: addrIndexes, time: txTime}, true } -// Resync gets mempool transactions and maps outputs to transactions. -// Resync is not reentrant, it should be called from a single thread. -// Read operations (GetTransactions) are safe. +// Resync ethereum type removes timed out transactions and returns number of transactions in mempool. +// Transactions are added/removed by AddTransactionToMempool/RemoveTransactionFromMempool methods func (m *MempoolEthereumType) Resync() (int, error) { - start := time.Now() - glog.V(1).Info("Mempool: resync") - txs, err := m.chain.GetMempoolTransactions() - if err != nil { - return 0, err - } - // allocate slightly larger capacity of the maps - newTxEntries := make(map[string]txEntry, len(m.txEntries)+5) - newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) - txTime := uint32(time.Now().Unix()) - var ok bool - for _, txid := range txs { - entry, exists := m.txEntries[txid] - if !exists { - entry, ok = m.createTxEntry(txid, txTime) - if !ok { - continue + m.mux.Lock() + entries := len(m.txEntries) + now := time.Now() + if m.nextTimeoutRun.Before(now) { + threshold := now.Add(-mempoolTimeoutTime) + for txid, entry := range m.txEntries { + if time.Unix(int64(entry.time), 0).Before(threshold) { + m.removeEntryFromMempool(txid, entry) } } - newTxEntries[txid] = entry + removed := entries - len(m.txEntries) + entries = len(m.txEntries) + glog.Info("Mempool: cleanup, removed ", removed, " transactions from mempool") + m.nextTimeoutRun = now.Add(mempoolTimeoutRunPeriod) + } + m.mux.Unlock() + glog.Info("Mempool: resync ", entries, " transactions in mempool") + return entries, nil +} + +// AddTransactionToMempool adds transactions to mempool +func (m *MempoolEthereumType) AddTransactionToMempool(txid string) { + m.mux.Lock() + _, exists := m.txEntries[txid] + m.mux.Unlock() + if glog.V(1) { + glog.Info("AddTransactionToMempool ", txid, ", existed ", exists) + } + if !exists { + entry, ok := m.createTxEntry(txid, uint32(time.Now().Unix())) + if !ok { + return + } + m.mux.Lock() + m.txEntries[txid] = entry for _, si := range entry.addrIndexes { - newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + } + m.mux.Unlock() + } +} + +func (m *MempoolEthereumType) removeEntryFromMempool(txid string, entry txEntry) { + delete(m.txEntries, txid) + for _, si := range entry.addrIndexes { + outpoints, found := m.addrDescToTx[si.addrDesc] + if found { + newOutpoints := make([]Outpoint, 0, len(outpoints)-1) + for _, o := range outpoints { + if o.Txid != txid { + newOutpoints = append(newOutpoints, o) + } + } + if len(newOutpoints) > 0 { + m.addrDescToTx[si.addrDesc] = newOutpoints + } else { + delete(m.addrDescToTx, si.addrDesc) + } } } - m.updateMappings(newTxEntries, newAddrDescToTx) - glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool") - return len(m.txEntries), nil +} + +// RemoveTransactionFromMempool removes transaction from mempool +func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) { + m.mux.Lock() + entry, exists := m.txEntries[txid] + if glog.V(1) { + glog.Info("RemoveTransactionFromMempool ", txid, ", existed ", exists) + } + if exists { + m.removeEntryFromMempool(txid, entry) + } + m.mux.Unlock() }