From c409a350c9fde5bbeb908388d109075ca8182ae7 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 3 Jun 2019 17:48:09 +0200 Subject: [PATCH] Try to reconnect ethereum RPC --- bchain/coins/eth/ethrpc.go | 60 +++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 82fa83c2..3103bf43 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -72,11 +72,8 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification if c.BlockAddressesToKeep < 100 { c.BlockAddressesToKeep = 100 } - rc, err := rpc.Dial(c.RPCURL) - if err != nil { - return nil, err - } - ec := ethclient.NewClient(rc) + + rc, ec, err := openRPC(c.RPCURL) s := &EthereumRPC{ BaseChain: &bchain.BaseChain{}, @@ -133,6 +130,15 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification return s, nil } +func openRPC(url string) (*rpc.Client, *ethclient.Client, error) { + rc, err := rpc.Dial(url) + if err != nil { + return nil, nil, err + } + ec := ethclient.NewClient(rc) + return rc, ec, nil +} + // Initialize initializes ethereum rpc interface func (b *EthereumRPC) Initialize() error { ctx, cancel := context.WithTimeout(context.Background(), b.timeout) @@ -187,6 +193,16 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu b.Mempool.OnNewTxAddr = onNewTxAddr + if err = b.subscribeEvents(); err != nil { + return err + } + + b.mempoolInitialized = true + + return nil +} + +func (b *EthereumRPC) subscribeEvents() error { if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { @@ -224,8 +240,6 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu return err } - b.mempoolInitialized = true - return nil } @@ -246,7 +260,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error } glog.Error("Subscription error ", e) timer := time.NewTimer(time.Second * 2) - // try in 1 second interval to resubscribe + // try in 2 second interval to resubscribe for { select { case e = <-s.Err(): @@ -260,7 +274,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error s = ns continue Loop } - glog.Error("Resubscribe error ", e) + glog.Error("Resubscribe error ", err) timer.Reset(time.Second * 2) } } @@ -269,8 +283,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error return nil } -// Shutdown cleans up rpc interface to ethereum -func (b *EthereumRPC) Shutdown(ctx context.Context) error { +func (b *EthereumRPC) closeRPC() { if b.newBlockSubscription != nil { b.newBlockSubscription.Unsubscribe() } @@ -280,6 +293,23 @@ func (b *EthereumRPC) Shutdown(ctx context.Context) error { if b.rpc != nil { b.rpc.Close() } +} + +func (b *EthereumRPC) reconnectRPC() error { + glog.Info("Reconnecting RPC") + b.closeRPC() + rc, ec, err := openRPC(b.ChainConfig.RPCURL) + if err != nil { + return err + } + b.rpc = rc + b.client = ec + return b.subscribeEvents() +} + +// Shutdown cleans up rpc interface to ethereum +func (b *EthereumRPC) Shutdown(ctx context.Context) error { + b.closeRPC() close(b.chanNewBlock) glog.Info("rpc: shutdown") return nil @@ -339,6 +369,14 @@ func (b *EthereumRPC) getBestHeader() (*ethtypes.Header, error) { b.bestHeader = nil } } + // if the best header was not updated for 15 minutes, there could be a subscription problem, reconnect RPC + if b.bestHeaderTime.Add(15 * time.Minute).Before(time.Now()) { + err := b.reconnectRPC() + if err != nil { + return nil, err + } + b.bestHeader = nil + } if b.bestHeader == nil { var err error ctx, cancel := context.WithTimeout(context.Background(), b.timeout)