diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 5d3e1e90..ba67d0a0 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -7,6 +7,7 @@ import ( "blockbook/bchain/coins/eth" "blockbook/bchain/coins/zec" "blockbook/common" + "context" "encoding/json" "fmt" "io/ioutil" @@ -74,8 +75,8 @@ func (c *blockChainWithMetrics) Initialize() error { return c.b.Initialize() } -func (c *blockChainWithMetrics) Shutdown() error { - return c.b.Shutdown() +func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { + return c.b.Shutdown(ctx) } func (c *blockChainWithMetrics) IsTestnet() bool { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index eb3eee6a..f928ed2d 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -3,6 +3,7 @@ package btc import ( "blockbook/bchain" "bytes" + "context" "encoding/hex" "encoding/json" "io" @@ -135,9 +136,9 @@ func (b *BitcoinRPC) Initialize() error { return nil } -func (b *BitcoinRPC) Shutdown() error { +func (b *BitcoinRPC) Shutdown(ctx context.Context) error { if b.mq != nil { - if err := b.mq.Shutdown(); err != nil { + if err := b.mq.Shutdown(ctx); err != nil { glog.Error("MQ.Shutdown error: ", err) return err } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index b4a2a238..dfe3b23b 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -217,7 +217,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error } // Shutdown cleans up rpc interface to ethereum -func (b *EthereumRPC) Shutdown() error { +func (b *EthereumRPC) Shutdown(ctx context.Context) error { if b.newBlockSubscription != nil { b.newBlockSubscription.Unsubscribe() } diff --git a/bchain/mq.go b/bchain/mq.go index 91f93674..3ba0174b 100644 --- a/bchain/mq.go +++ b/bchain/mq.go @@ -1,6 +1,7 @@ package bchain import ( + "context" "encoding/binary" "time" @@ -13,7 +14,7 @@ type MQ struct { context *zmq.Context socket *zmq.Socket isRunning bool - finished chan bool + finished chan error binding string } @@ -57,7 +58,7 @@ func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) { return nil, err } glog.Info("MQ listening to ", binding) - mq := &MQ{context, socket, true, make(chan bool), binding} + mq := &MQ{context, socket, true, make(chan error), binding} go mq.run(callback) return mq, nil } @@ -69,7 +70,7 @@ func (mq *MQ) run(callback func(NotificationType)) { } mq.isRunning = false glog.Info("MQ loop terminated") - mq.finished <- true + mq.finished <- nil }() mq.isRunning = true for { @@ -107,27 +108,42 @@ func (mq *MQ) run(callback func(NotificationType)) { } // Shutdown stops listening to the ZeroMQ and closes the connection -func (mq *MQ) Shutdown() error { +func (mq *MQ) Shutdown(ctx context.Context) error { glog.Info("MQ server shutdown") if mq.isRunning { - // if errors in the closing sequence, let it close ungracefully - if err := mq.socket.SetUnsubscribe("hashtx"); err != nil { + go func() { + // if errors in the closing sequence, let it close ungracefully + if err := mq.socket.SetUnsubscribe("hashtx"); err != nil { + mq.finished <- err + return + } + if err := mq.socket.SetUnsubscribe("hashblock"); err != nil { + mq.finished <- err + return + } + if err := mq.socket.Unbind(mq.binding); err != nil { + mq.finished <- err + return + } + if err := mq.socket.Close(); err != nil { + mq.finished <- err + return + } + if err := mq.context.Term(); err != nil { + mq.finished <- err + return + } + }() + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-mq.finished: + } + if err != nil { return err } - if err := mq.socket.SetUnsubscribe("hashblock"); err != nil { - return err - } - if err := mq.socket.Unbind(mq.binding); err != nil { - return err - } - if err := mq.socket.Close(); err != nil { - return err - } - if err := mq.context.Term(); err != nil { - return err - } - <-mq.finished - glog.Info("MQ server shutdown finished") } + glog.Info("MQ server shutdown finished") return nil } diff --git a/bchain/types.go b/bchain/types.go index 0edd8ef7..3edc214c 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -1,6 +1,7 @@ package bchain import ( + "context" "encoding/json" "errors" "fmt" @@ -115,7 +116,7 @@ func (e *RPCError) Error() string { type BlockChain interface { // life-cycle methods Initialize() error - Shutdown() error + Shutdown(ctx context.Context) error // chain info IsTestnet() bool GetNetworkName() string diff --git a/blockbook.go b/blockbook.go index 073e828e..224acd29 100644 --- a/blockbook.go +++ b/blockbook.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "sync/atomic" "syscall" "time" @@ -29,7 +30,7 @@ const resyncIndexPeriodMs = 935093 // debounce too close requests for resync const debounceResyncIndexMs = 1009 -// resync mempool at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ) +// resync mempool at least each resyncMempoolPeriodMs (could be more often if invoked by message from ZeroMQ) const resyncMempoolPeriodMs = 60017 // debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions) @@ -85,6 +86,7 @@ var ( callbacksOnNewBlockHash []func(hash string) callbacksOnNewTxAddr []func(txid string, addr string) chanOsSignal chan os.Signal + inShutdown int32 ) func init() { @@ -293,7 +295,7 @@ func main() { } if httpServer != nil || socketIoServer != nil || chain != nil { - waitForSignalAndShutdown(httpServer, socketIoServer, chain, 5*time.Second) + waitForSignalAndShutdown(httpServer, socketIoServer, chain, 10*time.Second) } if *synchronize { @@ -335,8 +337,10 @@ Loop: timer.Reset(0) } case <-timer.C: - // do the action and start the loop again - f() + // do the action, if not in shutdown, then start the loop again + if atomic.LoadInt32(&inShutdown) == 0 { + f() + } timer.Reset(tickTime) firstDebounce = time.Time{} } @@ -394,6 +398,9 @@ func onNewTxAddr(txid string, addr string) { } func pushSynchronizationHandler(nt bchain.NotificationType) { + if atomic.LoadInt32(&inShutdown) != 0 { + return + } glog.V(1).Infof("MQ: notification ", nt) if nt == bchain.NotificationNewBlock { chanSyncIndex <- struct{}{} @@ -406,6 +413,7 @@ func pushSynchronizationHandler(nt bchain.NotificationType) { func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) { sig := <-chanOsSignal + atomic.StoreInt32(&inShutdown, 1) glog.Infof("Shutdown: %v", sig) ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -424,7 +432,7 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI } if chain != nil { - if err := chain.Shutdown(); err != nil { + if err := chain.Shutdown(ctx); err != nil { glog.Error("BlockChain.Shutdown error: ", err) } }