From 4427af769bbb7fb9c5620a09652162ff6b10c2c5 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 22 Jan 2018 16:46:54 +0100 Subject: [PATCH] ZeroMQ handler in the background with graceful shutdown --- bitcoin/mq.go | 87 ++++++++++++++++++++++++++++++++++++++++----------- blockbook.go | 45 +++++++++++++++++--------- 2 files changed, 99 insertions(+), 33 deletions(-) diff --git a/bitcoin/mq.go b/bitcoin/mq.go index a7f01b05..62d46fda 100644 --- a/bitcoin/mq.go +++ b/bitcoin/mq.go @@ -2,38 +2,89 @@ package bitcoin import ( "encoding/binary" - "encoding/hex" "log" zmq "github.com/pebbe/zmq4" ) -func ZeroMQ(binding string) { +// MQ is message queue listener handle +type MQ struct { + context *zmq.Context + socket *zmq.Socket + isRunning bool + finished chan bool +} + +// MQMessage contains data received from Bitcoind message queue +type MQMessage struct { + Topic string + Sequence uint32 + Body []byte +} + +// New creates new Bitcoind ZeroMQ listener +// callback function receives messages +func New(binding string, callback func(*MQMessage)) (*MQ, error) { context, err := zmq.NewContext() if err != nil { - log.Fatal(err) + return nil, err } socket, err := context.NewSocket(zmq.SUB) if err != nil { - log.Fatal(err) + return nil, err } socket.SetSubscribe("hashblock") socket.SetSubscribe("hashtx") socket.SetSubscribe("rawblock") socket.SetSubscribe("rawtx") socket.Connect(binding) - defer socket.Close() - for i := 0; i < 101; i++ { - msg, err := socket.RecvMessageBytes(0) - if err != nil { - log.Fatal(err) - } - topic := string(msg[0]) - body := hex.EncodeToString(msg[1]) - sequence := uint32(0) - if len(msg[len(msg)-1]) == 4 { - sequence = binary.LittleEndian.Uint32(msg[len(msg)-1]) - } - log.Printf("%s-%d (%v) %s", topic, sequence, msg[len(msg)-1], body) - } + log.Printf("MQ listening to %s", binding) + mq := &MQ{context, socket, true, make(chan bool)} + go mq.run(callback) + return mq, nil +} + +func (mq *MQ) run(callback func(*MQMessage)) { + mq.isRunning = true + for { + msg, err := mq.socket.RecvMessageBytes(0) + if err != nil { + if zmq.AsErrno(err) == zmq.Errno(zmq.ETERM) { + close(mq.finished) + log.Print("MQ loop terminated") + break + } + log.Printf("MQ RecvMessageBytes error %v", err) + } + if msg != nil && len(msg) >= 3 { + sequence := uint32(0) + if len(msg[len(msg)-1]) == 4 { + sequence = binary.LittleEndian.Uint32(msg[len(msg)-1]) + } + m := &MQMessage{ + Topic: string(msg[0]), + Sequence: sequence, + Body: msg[1], + } + callback(m) + } + } + mq.isRunning = false +} + +// Shutdown stops listening to the ZeroMQ and closes the connection +func (mq *MQ) Shutdown() error { + log.Printf("MQ server shutdown") + if mq.isRunning { + // if errors in socket.Close or context.Term, let it close ungracefully + if err := mq.socket.Close(); err != nil { + return err + } + if err := mq.context.Term(); err != nil { + return err + } + _, _ = <-mq.finished + log.Printf("MQ server shutdown finished") + } + return nil } diff --git a/blockbook.go b/blockbook.go index ebb6f0ed..da0df33e 100644 --- a/blockbook.go +++ b/blockbook.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/hex" "flag" "log" "os" @@ -62,9 +63,8 @@ var ( func main() { flag.Parse() - if *zeroMQBinding != "nil" { - bitcoin.ZeroMQ(*zeroMQBinding) - return + if *prof { + defer profile.Start().Stop() } if *repair { @@ -74,10 +74,6 @@ func main() { return } - if *prof { - defer profile.Start().Stop() - } - rpc := bitcoin.NewBitcoinRPC( *rpcURL, *rpcUser, @@ -101,16 +97,24 @@ func main() { if *httpServerBinding != "nil" { httpServer, err = server.New(*httpServerBinding, db) if err != nil { - log.Fatalf("https: %s", err) + log.Fatalf("https: %v", err) } go func() { err = httpServer.Run() if err != nil { - log.Fatalf("https: %s", err) + log.Fatalf("https: %v", err) } }() } + var mq *bitcoin.MQ + if *zeroMQBinding != "nil" { + mq, err = bitcoin.New(*zeroMQBinding, mqHandler) + if err != nil { + log.Fatalf("mq: %v", err) + } + } + if *resync { if err := resyncIndex(rpc, db); err != nil { log.Fatal(err) @@ -144,27 +148,38 @@ func main() { } if httpServer != nil { - waitForSignalAndShutdown(httpServer, 5*time.Second) + waitForSignalAndShutdown(httpServer, mq, 5*time.Second) } } -func waitForSignalAndShutdown(s *server.HttpServer, timeout time.Duration) { - stop := make(chan os.Signal, 1) +func mqHandler(m *bitcoin.MQMessage) { + body := hex.EncodeToString(m.Body) + log.Printf("MQ: %s-%d %s", m.Topic, m.Sequence, body) +} - signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) +func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time.Duration) { + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) sig := <-stop ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - log.Printf("Shutdown %v", sig) + log.Printf("Shutdown with reason: %v", sig) + + if mq != nil { + if err := mq.Shutdown(); err != nil { + log.Printf("MQ.Shutdown error: %v", err) + } + } if s != nil { if err := s.Shutdown(ctx); err != nil { - log.Printf("Error: %v", err) + log.Printf("HttpServer.Shutdown error: %v", err) } } + } func printResult(txid string) error {