diff --git a/bchain/mq.go b/bchain/mq.go index d53d9994..299fcc19 100644 --- a/bchain/mq.go +++ b/bchain/mq.go @@ -62,13 +62,19 @@ func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) { } func (mq *MQ) run(callback func(NotificationType)) { + defer func() { + if r := recover(); r != nil { + glog.Error("MQ loop recovered from ", r) + } + mq.isRunning = false + close(mq.finished) + glog.Info("MQ loop terminated") + }() mq.isRunning = true for { msg, err := mq.socket.RecvMessageBytes(0) if err != nil { if zmq.AsErrno(err) == zmq.Errno(zmq.ETERM) || err.Error() == "Socket is closed" { - close(mq.finished) - glog.Info("MQ loop terminated") break } glog.Error("MQ RecvMessageBytes error ", err, ", ", zmq.AsErrno(err)) @@ -96,7 +102,6 @@ func (mq *MQ) run(callback func(NotificationType)) { callback(nt) } } - mq.isRunning = false } // Shutdown stops listening to the ZeroMQ and closes the connection