From 531da092277f08b050fc943ea2134c966ef2970d Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Fri, 23 Feb 2018 11:56:44 +0100 Subject: [PATCH] Improve ZeroMQ startup/shutdown --- bchain/mq.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/bchain/mq.go b/bchain/mq.go index bd6f03bb..f9a21c4a 100644 --- a/bchain/mq.go +++ b/bchain/mq.go @@ -33,13 +33,22 @@ func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) { if err != nil { return nil, err } - socket.SetSubscribe("hashblock") - socket.SetSubscribe("hashtx") + err = socket.SetSubscribe("hashblock") + if err != nil { + return nil, err + } + err = socket.SetSubscribe("hashtx") + if err != nil { + return nil, err + } // for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq // on each notification we do sync or syncmempool respectively // socket.SetSubscribe("rawblock") // socket.SetSubscribe("rawtx") - socket.Connect(binding) + err = socket.Connect(binding) + if err != nil { + return nil, err + } glog.Info("MQ listening to ", binding) mq := &MQ{context, socket, true, make(chan bool)} go mq.run(callback) @@ -78,6 +87,8 @@ func (mq *MQ) run(callback func(*MQMessage)) { func (mq *MQ) Shutdown() error { glog.Info("MQ server shutdown") if mq.isRunning { + mq.socket.SetUnsubscribe("hashtx") + mq.socket.SetUnsubscribe("hashblock") // if errors in socket.Close or context.Term, let it close ungracefully if err := mq.socket.Close(); err != nil { return err