diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 67207589..ef0d5f79 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -42,6 +42,8 @@ type EthRPC struct { bestHeader *ethtypes.Header chanNewBlock chan *ethtypes.Header newBlockSubscription *rpc.ClientSubscription + chanNewTx chan ethcommon.Hash + newTxSubscription *rpc.ClientSubscription } type configuration struct { @@ -92,6 +94,22 @@ func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType) } }() + // new mempool transaction notifications handling + // the subscription is done in Initialize + s.chanNewTx = make(chan ethcommon.Hash) + go func() { + for { + t, ok := <-s.chanNewTx + if !ok { + break + } + if glog.V(2) { + glog.Info("rpc: new tx ", t.Hex()) + } + pushHandler(bchain.NotificationNewTx) + } + }() + return s, nil } @@ -127,6 +145,13 @@ func (b *EthRPC) Initialize() error { } b.newBlockSubscription = sub + // Subscribe to new mempool transactions + sub, err = b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions") + if err != nil { + return errors.Annotatef(err, "EthSubscribe newPendingTransactions") + } + b.newTxSubscription = sub + b.Mempool = bchain.NewNonUTXOMempool(b) return nil @@ -137,6 +162,9 @@ func (b *EthRPC) Shutdown() error { if b.newBlockSubscription != nil { b.newBlockSubscription.Unsubscribe() } + if b.newTxSubscription != nil { + b.newTxSubscription.Unsubscribe() + } if b.rpc != nil { b.rpc.Close() }