diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index aba0128d..202fa446 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -139,25 +139,83 @@ func (b *EthereumRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) - // Subscribe to new blocks - sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads") - if err != nil { - return errors.Annotatef(err, "EthSubscribe newHeads") + // subscriptions + if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + // invalidate the previous subscription - it is either the first one or there was an error + b.newBlockSubscription = nil + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads") + if err != nil { + return nil, errors.Annotatef(err, "EthSubscribe newHeads") + } + b.newBlockSubscription = sub + glog.Info("Subscribed to newHeads") + return sub, nil + }); err != nil { + return err } - b.newBlockSubscription = sub - - // Subscribe to new mempool transactions - sub, err = b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions") - if err != nil { - return errors.Annotatef(err, "EthSubscribe newPendingTransactions") + if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + // invalidate the previous subscription - it is either the first one or there was an error + b.newTxSubscription = nil + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + sub, err := b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions") + if err != nil { + return nil, errors.Annotatef(err, "EthSubscribe newPendingTransactions") + } + b.newTxSubscription = sub + glog.Info("Subscribed to newPendingTransactions") + return sub, nil + }); err != nil { + return err } - b.newTxSubscription = sub + // create mempool b.Mempool = bchain.NewNonUTXOMempool(b) return nil } +// subscribeNewBlocks subscribes to new blocks notification +func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error { + s, err := f() + if err != nil { + return err + } + go func() { + Loop: + for { + // wait for error in subscription + e := <-s.Err() + // nil error means sub.Unsubscribe called, exit goroutine + if e == nil { + return + } + glog.Error("Subscription error ", e) + timer := time.NewTimer(time.Second) + // try in 1 second interval to resubscribe + for { + select { + case e = <-s.Err(): + if e == nil { + return + } + case <-timer.C: + ns, err := f() + if err == nil { + // subscription successful, restart wait for next error + s = ns + continue Loop + } + timer.Reset(time.Second) + } + } + } + }() + return nil +} + // Shutdown cleans up rpc interface to ethereum func (b *EthereumRPC) Shutdown() error { if b.newBlockSubscription != nil {