Subscribe and handle eth notifications about new block

This commit is contained in:
Martin Boehm 2018-03-28 10:25:08 +02:00
parent 19d071a184
commit 2f4b48b3d6

View File

@ -30,16 +30,18 @@ const (
// EthRPC is an interface to JSON-RPC eth service. // EthRPC is an interface to JSON-RPC eth service.
type EthRPC struct { type EthRPC struct {
client *ethclient.Client client *ethclient.Client
rpc *rpc.Client rpc *rpc.Client
timeout time.Duration timeout time.Duration
rpcURL string rpcURL string
Parser *EthParser Parser *EthParser
Testnet bool Testnet bool
Network string Network string
Mempool *bchain.Mempool Mempool *bchain.UTXOMempool
bestHeaderMu sync.Mutex bestHeaderMu sync.Mutex
bestHeader *ethtypes.Header bestHeader *ethtypes.Header
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
} }
type configuration struct { type configuration struct {
@ -71,9 +73,29 @@ func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)
s.Parser = &EthParser{} s.Parser = &EthParser{}
s.timeout = time.Duration(c.RPCTimeout) * time.Second s.timeout = time.Duration(c.RPCTimeout) * time.Second
// new blocks notifications handling
// the subscription is done in Initialize
s.chanNewBlock = make(chan *ethtypes.Header)
go func() {
for {
h, ok := <-s.chanNewBlock
if !ok {
break
}
glog.V(2).Info("rpc: new block header ", h.Number)
// update best header to the new header
s.bestHeaderMu.Lock()
s.bestHeader = h
s.bestHeaderMu.Unlock()
// notify blockbook
pushHandler(bchain.NotificationNewBlock)
}
}()
return s, nil return s, nil
} }
// Initialize initializes ethereum rpc interface
func (b *EthRPC) Initialize() error { func (b *EthRPC) Initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout) ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel() defer cancel()
@ -98,11 +120,26 @@ func (b *EthRPC) Initialize() error {
} }
glog.Info("rpc: block chain ", b.Network) glog.Info("rpc: block chain ", b.Network)
// Subscribe to new blocks
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads")
if err != nil {
return errors.Annotatef(err, "EthSubscribe newHeads")
}
b.newBlockSubscription = sub
// b.Mempool = bchain.NewMempool(s, metrics) // b.Mempool = bchain.NewMempool(s, metrics)
return nil return nil
} }
// Shutdown cleans up rpc interface to ethereum
func (b *EthRPC) Shutdown() error { func (b *EthRPC) Shutdown() error {
if b.newBlockSubscription != nil {
b.newBlockSubscription.Unsubscribe()
}
if b.rpc != nil {
b.rpc.Close()
}
close(b.chanNewBlock)
glog.Info("rpc: shutdown")
return nil return nil
} }
@ -409,23 +446,24 @@ func (b *EthRPC) EstimateSmartFee(blocks int, conservative bool) (float64, error
// SendRawTransaction sends raw transaction. // SendRawTransaction sends raw transaction.
func (b *EthRPC) SendRawTransaction(tx string) (string, error) { func (b *EthRPC) SendRawTransaction(tx string) (string, error) {
panic("not implemented") return "", errors.New("SendRawTransaction: not implemented")
} }
func (b *EthRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error { func (b *EthRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error {
panic("not implemented") return nil
return errors.New("ResyncMempool: not implemented")
} }
func (b *EthRPC) GetMempoolTransactions(address string) ([]string, error) { func (b *EthRPC) GetMempoolTransactions(address string) ([]string, error) {
panic("not implemented") return nil, errors.New("ResyncMempool: not implemented")
} }
func (b *EthRPC) GetMempoolSpentOutput(outputTxid string, vout uint32) string { func (b *EthRPC) GetMempoolSpentOutput(outputTxid string, vout uint32) string {
panic("not implemented") return ""
} }
func (b *EthRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) { func (b *EthRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) {
panic("not implemented") return nil, errors.New("ResyncMempool: not implemented")
} }
func (b *EthRPC) GetChainParser() bchain.BlockChainParser { func (b *EthRPC) GetChainParser() bchain.BlockChainParser {