diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 29ba2a77..117879df 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -30,16 +30,18 @@ const ( // EthRPC is an interface to JSON-RPC eth service. type EthRPC struct { - client *ethclient.Client - rpc *rpc.Client - timeout time.Duration - rpcURL string - Parser *EthParser - Testnet bool - Network string - Mempool *bchain.Mempool - bestHeaderMu sync.Mutex - bestHeader *ethtypes.Header + client *ethclient.Client + rpc *rpc.Client + timeout time.Duration + rpcURL string + Parser *EthParser + Testnet bool + Network string + Mempool *bchain.UTXOMempool + bestHeaderMu sync.Mutex + bestHeader *ethtypes.Header + chanNewBlock chan *ethtypes.Header + newBlockSubscription *rpc.ClientSubscription } type configuration struct { @@ -71,9 +73,29 @@ func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType) s.Parser = &EthParser{} s.timeout = time.Duration(c.RPCTimeout) * time.Second + // new blocks notifications handling + // the subscription is done in Initialize + s.chanNewBlock = make(chan *ethtypes.Header) + go func() { + for { + h, ok := <-s.chanNewBlock + if !ok { + break + } + glog.V(2).Info("rpc: new block header ", h.Number) + // update best header to the new header + s.bestHeaderMu.Lock() + s.bestHeader = h + s.bestHeaderMu.Unlock() + // notify blockbook + pushHandler(bchain.NotificationNewBlock) + } + }() + return s, nil } +// Initialize initializes ethereum rpc interface func (b *EthRPC) Initialize() error { ctx, cancel := context.WithTimeout(context.Background(), b.timeout) defer cancel() @@ -98,11 +120,26 @@ func (b *EthRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) + // Subscribe to new blocks + sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads") + if err != nil { + return errors.Annotatef(err, "EthSubscribe newHeads") + } + b.newBlockSubscription = sub // b.Mempool = bchain.NewMempool(s, metrics) return nil } +// Shutdown cleans up rpc interface to ethereum func (b *EthRPC) Shutdown() error { + if b.newBlockSubscription != nil { + b.newBlockSubscription.Unsubscribe() + } + if b.rpc != nil { + b.rpc.Close() + } + close(b.chanNewBlock) + glog.Info("rpc: shutdown") return nil } @@ -409,23 +446,24 @@ func (b *EthRPC) EstimateSmartFee(blocks int, conservative bool) (float64, error // SendRawTransaction sends raw transaction. func (b *EthRPC) SendRawTransaction(tx string) (string, error) { - panic("not implemented") + return "", errors.New("SendRawTransaction: not implemented") } func (b *EthRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error { - panic("not implemented") + return nil + return errors.New("ResyncMempool: not implemented") } func (b *EthRPC) GetMempoolTransactions(address string) ([]string, error) { - panic("not implemented") + return nil, errors.New("ResyncMempool: not implemented") } func (b *EthRPC) GetMempoolSpentOutput(outputTxid string, vout uint32) string { - panic("not implemented") + return "" } func (b *EthRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) { - panic("not implemented") + return nil, errors.New("ResyncMempool: not implemented") } func (b *EthRPC) GetChainParser() bchain.BlockChainParser {