Implement new ethereum mempool sync with tx timeout

This commit is contained in:
Martin Boehm 2019-04-04 23:35:38 +02:00
parent 4435dbbfb4
commit 3f973bf47d
3 changed files with 140 additions and 86 deletions

View File

@ -67,33 +67,33 @@ func (a MempoolTxidEntries) Less(i, j int) bool {
func (m *BaseMempool) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txEntries = newTxEntries
m.addrDescToTx = newAddrDescToTx
m.mux.Unlock()
}
func getAllEntries(txEntries map[string]txEntry) MempoolTxidEntries {
a := make(MempoolTxidEntries, len(txEntries))
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *BaseMempool) GetAllEntries() MempoolTxidEntries {
entries := make(MempoolTxidEntries, len(m.txEntries))
i := 0
for txid, entry := range txEntries {
a[i] = MempoolTxidEntry{
m.mux.Lock()
for txid, entry := range m.txEntries {
entries[i] = MempoolTxidEntry{
Txid: txid,
Time: entry.time,
}
i++
}
sort.Sort(a)
return a
}
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *BaseMempool) GetAllEntries() MempoolTxidEntries {
return getAllEntries(m.txEntries)
m.mux.Unlock()
sort.Sort(entries)
return entries
}
// GetTransactionTime returns first seen time of a transaction
func (m *BaseMempool) GetTransactionTime(txid string) uint32 {
m.mux.Lock()
e, found := m.txEntries[txid]
m.mux.Unlock()
if !found {
return 0
}

View File

@ -41,22 +41,21 @@ type Configuration struct {
// EthereumRPC is an interface to JSON-RPC eth service.
type EthereumRPC struct {
*bchain.BaseChain
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
bestHeaderLock sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
pendingTransactions map[string]struct{}
pendingTransactionsLock sync.Mutex
ChainConfig *Configuration
isETC bool
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
mempoolInitialized bool
bestHeaderLock sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
ChainConfig *Configuration
isETC bool
}
// NewEthereumRPC returns new EthRPC instance.
@ -78,11 +77,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
ec := ethclient.NewClient(rc)
s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
pendingTransactions: make(map[string]struct{}),
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
}
// always create parser
@ -125,9 +123,7 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if glog.V(2) {
glog.Info("rpc: new tx ", hex)
}
s.pendingTransactionsLock.Lock()
s.pendingTransactions[hex] = struct{}{}
s.pendingTransactionsLock.Unlock()
s.Mempool.AddTransactionToMempool(hex)
pushHandler(bchain.NotificationNewTx)
}
}()
@ -176,7 +172,18 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
if b.Mempool == nil {
return errors.New("Mempool not created")
}
// get initial mempool transactions
txs, err := b.GetMempoolTransactions()
if err != nil {
return err
}
for _, txid := range txs {
b.Mempool.AddTransactionToMempool(txid)
}
b.Mempool.OnNewTxAddr = onNewTxAddr
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {
@ -213,6 +220,9 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
}); err != nil {
return err
}
b.mempoolInitialized = true
return nil
}
@ -495,9 +505,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash)
}
btxs[i] = *btx
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, tx.Hash)
b.pendingTransactionsLock.Unlock()
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(tx.Hash)
}
}
bbk := bchain.Block{
BlockHeader: *bbh,
@ -535,14 +545,7 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) {
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
tx, err := b.GetTransaction(txid)
// if there is an error getting the tx or the tx is confirmed, remove it from pending transactions
if err == bchain.ErrTxNotFound || (tx != nil && tx.Confirmations > 0) {
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, txid)
b.pendingTransactionsLock.Unlock()
}
return tx, err
return b.GetTransaction(txid)
}
// GetTransaction returns a transaction by the transaction ID.
@ -555,6 +558,9 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
if err != nil {
return nil, err
} else if tx == nil {
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(txid)
}
return nil, bchain.ErrTxNotFound
}
var btx *bchain.Tx
@ -621,6 +627,10 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
// remove tx from mempool if it is there
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(txid)
}
}
return btx, nil
}
@ -654,19 +664,7 @@ func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) {
return nil, err
}
}
b.pendingTransactionsLock.Lock()
// join transactions returned by getBlockRaw with pendingTransactions from subscription
for _, txid := range body.Transactions {
b.pendingTransactions[txid] = struct{}{}
}
txids := make([]string, len(b.pendingTransactions))
i := 0
for txid := range b.pendingTransactions {
txids[i] = txid
i++
}
b.pendingTransactionsLock.Unlock()
return txids, nil
return body.Transactions, nil
}
// EstimateFee returns fee estimation

View File

@ -6,14 +6,25 @@ import (
"github.com/golang/glog"
)
const mempoolTimeoutTime = 24 * time.Hour
const mempoolTimeoutRunPeriod = 10 * time.Minute
// MempoolEthereumType is mempool handle of EthereumType chains
type MempoolEthereumType struct {
BaseMempool
nextTimeoutRun time.Time
}
// NewMempoolEthereumType creates new mempool handler.
func NewMempoolEthereumType(chain BlockChain) *MempoolEthereumType {
return &MempoolEthereumType{BaseMempool: BaseMempool{chain: chain}}
return &MempoolEthereumType{
BaseMempool: BaseMempool{
chain: chain,
txEntries: make(map[string]txEntry),
addrDescToTx: make(map[string][]Outpoint),
},
nextTimeoutRun: time.Now().Add(mempoolTimeoutTime),
}
}
func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex {
@ -76,35 +87,80 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry
return txEntry{addrIndexes: addrIndexes, time: txTime}, true
}
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
// Resync ethereum type removes timed out transactions and returns number of transactions in mempool.
// Transactions are added/removed by AddTransactionToMempool/RemoveTransactionFromMempool methods
func (m *MempoolEthereumType) Resync() (int, error) {
start := time.Now()
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempoolTransactions()
if err != nil {
return 0, err
}
// allocate slightly larger capacity of the maps
newTxEntries := make(map[string]txEntry, len(m.txEntries)+5)
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
txTime := uint32(time.Now().Unix())
var ok bool
for _, txid := range txs {
entry, exists := m.txEntries[txid]
if !exists {
entry, ok = m.createTxEntry(txid, txTime)
if !ok {
continue
m.mux.Lock()
entries := len(m.txEntries)
now := time.Now()
if m.nextTimeoutRun.Before(now) {
threshold := now.Add(-mempoolTimeoutTime)
for txid, entry := range m.txEntries {
if time.Unix(int64(entry.time), 0).Before(threshold) {
m.removeEntryFromMempool(txid, entry)
}
}
newTxEntries[txid] = entry
removed := entries - len(m.txEntries)
entries = len(m.txEntries)
glog.Info("Mempool: cleanup, removed ", removed, " transactions from mempool")
m.nextTimeoutRun = now.Add(mempoolTimeoutRunPeriod)
}
m.mux.Unlock()
glog.Info("Mempool: resync ", entries, " transactions in mempool")
return entries, nil
}
// AddTransactionToMempool adds transactions to mempool
func (m *MempoolEthereumType) AddTransactionToMempool(txid string) {
m.mux.Lock()
_, exists := m.txEntries[txid]
m.mux.Unlock()
if glog.V(1) {
glog.Info("AddTransactionToMempool ", txid, ", existed ", exists)
}
if !exists {
entry, ok := m.createTxEntry(txid, uint32(time.Now().Unix()))
if !ok {
return
}
m.mux.Lock()
m.txEntries[txid] = entry
for _, si := range entry.addrIndexes {
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n})
}
m.mux.Unlock()
}
}
func (m *MempoolEthereumType) removeEntryFromMempool(txid string, entry txEntry) {
delete(m.txEntries, txid)
for _, si := range entry.addrIndexes {
outpoints, found := m.addrDescToTx[si.addrDesc]
if found {
newOutpoints := make([]Outpoint, 0, len(outpoints)-1)
for _, o := range outpoints {
if o.Txid != txid {
newOutpoints = append(newOutpoints, o)
}
}
if len(newOutpoints) > 0 {
m.addrDescToTx[si.addrDesc] = newOutpoints
} else {
delete(m.addrDescToTx, si.addrDesc)
}
}
}
m.updateMappings(newTxEntries, newAddrDescToTx)
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
return len(m.txEntries), nil
}
// RemoveTransactionFromMempool removes transaction from mempool
func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) {
m.mux.Lock()
entry, exists := m.txEntries[txid]
if glog.V(1) {
glog.Info("RemoveTransactionFromMempool ", txid, ", existed ", exists)
}
if exists {
m.removeEntryFromMempool(txid, entry)
}
m.mux.Unlock()
}