Handle notifications of new mempool transactions in eth

This commit is contained in:
Martin Boehm 2018-04-03 18:22:36 +02:00
parent d1a047c667
commit bbd51e3624

View File

@ -42,6 +42,8 @@ type EthRPC struct {
bestHeader *ethtypes.Header
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
}
type configuration struct {
@ -92,6 +94,22 @@ func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)
}
}()
// new mempool transaction notifications handling
// the subscription is done in Initialize
s.chanNewTx = make(chan ethcommon.Hash)
go func() {
for {
t, ok := <-s.chanNewTx
if !ok {
break
}
if glog.V(2) {
glog.Info("rpc: new tx ", t.Hex())
}
pushHandler(bchain.NotificationNewTx)
}
}()
return s, nil
}
@ -127,6 +145,13 @@ func (b *EthRPC) Initialize() error {
}
b.newBlockSubscription = sub
// Subscribe to new mempool transactions
sub, err = b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions")
if err != nil {
return errors.Annotatef(err, "EthSubscribe newPendingTransactions")
}
b.newTxSubscription = sub
b.Mempool = bchain.NewNonUTXOMempool(b)
return nil
@ -137,6 +162,9 @@ func (b *EthRPC) Shutdown() error {
if b.newBlockSubscription != nil {
b.newBlockSubscription.Unsubscribe()
}
if b.newTxSubscription != nil {
b.newTxSubscription.Unsubscribe()
}
if b.rpc != nil {
b.rpc.Close()
}