Retry connection to blockchain rpc on startup

After restart, the blockchain daemons are not immediately ready
to serve requests. Blockbook must wait for them.
This commit is contained in:
Martin Boehm 2018-05-17 11:52:16 +02:00
parent 9aaa242d09
commit bc6f8a9e5d
4 changed files with 77 additions and 30 deletions

View File

@ -30,10 +30,10 @@ func NewBCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
return s, nil return s, nil
} }
// Initialize initializes BCashRPC instance.
func (b *BCashRPC) Initialize() error { func (b *BCashRPC) Initialize() error {
b.Mempool = bchain.NewUTXOMempool(b)
chainName, err := b.GetBlockChainInfo() chainName, err := b.GetChainInfoAndInitializeMempool()
if err != nil { if err != nil {
return err return err
} }

View File

@ -19,17 +19,19 @@ import (
// BitcoinRPC is an interface to JSON-RPC bitcoind service. // BitcoinRPC is an interface to JSON-RPC bitcoind service.
type BitcoinRPC struct { type BitcoinRPC struct {
client http.Client client http.Client
rpcURL string rpcURL string
user string user string
password string password string
Parser bchain.BlockChainParser Parser bchain.BlockChainParser
Testnet bool Testnet bool
Network string Network string
Mempool *bchain.UTXOMempool Mempool *bchain.UTXOMempool
ParseBlocks bool ParseBlocks bool
mq *bchain.MQ zeroMQBinding string
Subversion string pushHandler func(bchain.NotificationType)
mq *bchain.MQ
Subversion string
} }
type configuration struct { type configuration struct {
@ -57,28 +59,45 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT
} }
s := &BitcoinRPC{ s := &BitcoinRPC{
client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport}, client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport},
rpcURL: c.RPCURL, rpcURL: c.RPCURL,
user: c.RPCUser, user: c.RPCUser,
password: c.RPCPass, password: c.RPCPass,
ParseBlocks: c.Parse, ParseBlocks: c.Parse,
Subversion: c.Subversion, Subversion: c.Subversion,
zeroMQBinding: c.ZeroMQBinding,
pushHandler: pushHandler,
} }
mq, err := bchain.NewMQ(c.ZeroMQBinding, pushHandler)
if err != nil {
glog.Error("mq: ", err)
return nil, err
}
s.mq = mq
return s, nil return s, nil
} }
func (b *BitcoinRPC) Initialize() error { // GetChainInfoAndInitializeMempool is called by Initialize and reused by other coins
// it contacts the blockchain rpc interface for the first time
// and if successful it connects to ZeroMQ and creates mempool handler
func (b *BitcoinRPC) GetChainInfoAndInitializeMempool() (string, error) {
// try to connect to block chain and get some info
chainName, err := b.GetBlockChainInfo()
if err != nil {
return "", err
}
mq, err := bchain.NewMQ(b.zeroMQBinding, b.pushHandler)
if err != nil {
glog.Error("mq: ", err)
return "", err
}
b.mq = mq
b.Mempool = bchain.NewUTXOMempool(b) b.Mempool = bchain.NewUTXOMempool(b)
chainName, err := b.GetBlockChainInfo() return chainName, nil
}
// Initialize initializes BitcoinRPC instance.
func (b *BitcoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool()
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,8 +24,13 @@ func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
return z, nil return z, nil
} }
// Initialize initializes ZCashRPC instance.
func (z *ZCashRPC) Initialize() error { func (z *ZCashRPC) Initialize() error {
z.Mempool = bchain.NewUTXOMempool(z) _, err := z.GetChainInfoAndInitializeMempool()
if err != nil {
return err
}
z.Parser = &ZCashParser{} z.Parser = &ZCashParser{}
z.Testnet = false z.Testnet = false
z.Network = "livenet" z.Network = "livenet"

View File

@ -86,6 +86,29 @@ func init() {
glog.CopyStandardLogTo("INFO") glog.CopyStandardLogTo("INFO")
} }
func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, error) {
var chain bchain.BlockChain
var err error
timer := time.NewTimer(time.Second)
for i := 0; ; i++ {
if chain, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil {
if i < seconds {
glog.Error("rpc: ", err, " Retrying...")
select {
case <-chanOsSignal:
return nil, errors.New("Interrupted")
case <-timer.C:
timer.Reset(time.Second)
continue
}
} else {
return nil, err
}
}
return chain, nil
}
}
func main() { func main() {
flag.Parse() flag.Parse()
@ -116,7 +139,7 @@ func main() {
glog.Fatal("Missing blockchaincfg configuration parameter") glog.Fatal("Missing blockchaincfg configuration parameter")
} }
if chain, err = coins.NewBlockChain(*coin, *blockchain, pushSynchronizationHandler, metrics); err != nil { if chain, err = getBlockChainWithRetry(*coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil {
glog.Fatal("rpc: ", err) glog.Fatal("rpc: ", err)
} }