diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index fae51546..8aba3598 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -41,20 +41,22 @@ type Configuration struct { // EthereumRPC is an interface to JSON-RPC eth service. type EthereumRPC struct { *bchain.BaseChain - client *ethclient.Client - rpc *rpc.Client - timeout time.Duration - Parser *EthereumParser - Mempool *bchain.MempoolEthereumType - bestHeaderMu sync.Mutex - bestHeader *ethtypes.Header - bestHeaderTime time.Time - chanNewBlock chan *ethtypes.Header - newBlockSubscription *rpc.ClientSubscription - chanNewTx chan ethcommon.Hash - newTxSubscription *rpc.ClientSubscription - ChainConfig *Configuration - isETC bool + client *ethclient.Client + rpc *rpc.Client + timeout time.Duration + Parser *EthereumParser + Mempool *bchain.MempoolEthereumType + bestHeaderLock sync.Mutex + bestHeader *ethtypes.Header + bestHeaderTime time.Time + chanNewBlock chan *ethtypes.Header + newBlockSubscription *rpc.ClientSubscription + chanNewTx chan ethcommon.Hash + newTxSubscription *rpc.ClientSubscription + pendingTransactions map[string]struct{} + pendingTransactionsLock sync.Mutex + ChainConfig *Configuration + isETC bool } // NewEthereumRPC returns new EthRPC instance. @@ -76,10 +78,11 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification ec := ethclient.NewClient(rc) s := &EthereumRPC{ - BaseChain: &bchain.BaseChain{}, - client: ec, - rpc: rc, - ChainConfig: &c, + BaseChain: &bchain.BaseChain{}, + client: ec, + rpc: rc, + ChainConfig: &c, + pendingTransactions: make(map[string]struct{}), } // always create parser @@ -100,10 +103,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification } glog.V(2).Info("rpc: new block header ", h.Number) // update best header to the new header - s.bestHeaderMu.Lock() + s.bestHeaderLock.Lock() s.bestHeader = h s.bestHeaderTime = time.Now() - s.bestHeaderMu.Unlock() + s.bestHeaderLock.Unlock() // notify blockbook pushHandler(bchain.NotificationNewBlock) } @@ -118,9 +121,13 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification if !ok { break } + hex := t.Hex() if glog.V(2) { - glog.Info("rpc: new tx ", t.Hex()) + glog.Info("rpc: new tx ", hex) } + s.pendingTransactionsLock.Lock() + s.pendingTransactions[hex] = struct{}{} + s.pendingTransactionsLock.Unlock() pushHandler(bchain.NotificationNewTx) } }() @@ -296,8 +303,8 @@ func (b *EthereumRPC) GetChainInfo() (*bchain.ChainInfo, error) { } func (b *EthereumRPC) getBestHeader() (*ethtypes.Header, error) { - b.bestHeaderMu.Lock() - defer b.bestHeaderMu.Unlock() + b.bestHeaderLock.Lock() + defer b.bestHeaderLock.Unlock() // ETC does not have newBlocks subscription, bestHeader must be updated very often (each 1 second) if b.isETC { if b.bestHeaderTime.Add(1 * time.Second).Before(time.Now()) { @@ -467,12 +474,16 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error return nil, err } btxs := make([]bchain.Tx, len(body.Transactions)) - for i, tx := range body.Transactions { - btx, err := b.Parser.ethTxToTx(&tx, &rpcReceipt{Logs: logs[tx.Hash]}, bbh.Time, uint32(bbh.Confirmations)) + for i := range body.Transactions { + tx := &body.Transactions[i] + btx, err := b.Parser.ethTxToTx(tx, &rpcReceipt{Logs: logs[tx.Hash]}, bbh.Time, uint32(bbh.Confirmations)) if err != nil { return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash) } btxs[i] = *btx + b.pendingTransactionsLock.Lock() + delete(b.pendingTransactions, tx.Hash) + b.pendingTransactionsLock.Unlock() } bbk := bchain.Block{ BlockHeader: *bbh, @@ -507,7 +518,14 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) { // GetTransactionForMempool returns a transaction by the transaction ID. // It could be optimized for mempool, i.e. without block time and confirmations func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) { - return b.GetTransaction(txid) + tx, err := b.GetTransaction(txid) + // it there is an error getting the tx or the tx is confirmed, remove it from pending transactions + if err != nil || (tx != nil && tx.Confirmations > 0) { + b.pendingTransactionsLock.Lock() + delete(b.pendingTransactions, txid) + b.pendingTransactionsLock.Unlock() + } + return tx, err } // GetTransaction returns a transaction by the transaction ID. @@ -613,14 +631,25 @@ func (b *EthereumRPC) GetMempool() ([]string, error) { if err != nil { return nil, err } - if len(raw) == 0 { - return nil, bchain.ErrBlockNotFound - } var body rpcBlockTxids - if err := json.Unmarshal(raw, &body); err != nil { - return nil, err + if len(raw) > 0 { + if err := json.Unmarshal(raw, &body); err != nil { + return nil, err + } } - return body.Transactions, nil + b.pendingTransactionsLock.Lock() + // join transactions returned by getBlockRaw with pendingTransactions from subscription + for _, txid := range body.Transactions { + b.pendingTransactions[txid] = struct{}{} + } + txids := make([]string, len(b.pendingTransactions)) + i := 0 + for txid := range b.pendingTransactions { + txids[i] = txid + i++ + } + b.pendingTransactionsLock.Unlock() + return txids, nil } // EstimateFee returns fee estimation