diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 08e1bae6..9bb6961b 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -15,7 +15,7 @@ import ( "github.com/juju/errors" ) -type blockChainFactory func(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) +type blockChainFactory func(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) var blockChainFactories = make(map[string]blockChainFactory) @@ -28,7 +28,7 @@ func init() { } // NewBlockChain creates bchain.BlockChain of type defined by parameter coin -func NewBlockChain(coin string, configfile string, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) { +func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, error) { bcf, ok := blockChainFactories[coin] if !ok { return nil, errors.New(fmt.Sprint("Unsupported coin ", coin, ". Must be one of ", reflect.ValueOf(blockChainFactories).MapKeys())) diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index 754592ed..ecc25283 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -41,7 +41,7 @@ type configuration struct { } // NewBitcoinRPC returns new BitcoinRPC instance. -func NewBitcoinRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) { +func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) { var err error var c configuration err = json.Unmarshal(config, &c) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 552c7618..29ba2a77 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -48,7 +48,7 @@ type configuration struct { } // NewEthRPC returns new EthRPC instance. -func NewEthRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) { +func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) { var err error var c configuration err = json.Unmarshal(config, &c) diff --git a/bchain/coins/zec/zcashrpc.go b/bchain/coins/zec/zcashrpc.go index b17fb62a..d0243644 100644 --- a/bchain/coins/zec/zcashrpc.go +++ b/bchain/coins/zec/zcashrpc.go @@ -13,7 +13,7 @@ type ZCashRPC struct { *btc.BitcoinRPC } -func NewZCashRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) { +func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) { b, err := btc.NewBitcoinRPC(config, pushHandler) if err != nil { return nil, err diff --git a/bchain/mq.go b/bchain/mq.go index cc60a0e1..d53d9994 100644 --- a/bchain/mq.go +++ b/bchain/mq.go @@ -16,16 +16,21 @@ type MQ struct { finished chan bool } -// MQMessage contains data received from Bitcoind message queue -type MQMessage struct { - Topic string - Sequence uint32 - Body []byte -} +// NotificationType is type of notification +type NotificationType int + +const ( + // NotificationUnknown is unknown + NotificationUnknown NotificationType = iota + // NotificationNewBlock message is sent when there is a new block to be imported + NotificationNewBlock NotificationType = iota + // NotificationNewTx message is sent when there is a new mempool transaction + NotificationNewTx NotificationType = iota +) // NewMQ creates new Bitcoind ZeroMQ listener // callback function receives messages -func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) { +func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) { context, err := zmq.NewContext() if err != nil { return nil, err @@ -56,7 +61,7 @@ func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) { return mq, nil } -func (mq *MQ) run(callback func(*MQMessage)) { +func (mq *MQ) run(callback func(NotificationType)) { mq.isRunning = true for { msg, err := mq.socket.RecvMessageBytes(0) @@ -70,16 +75,25 @@ func (mq *MQ) run(callback func(*MQMessage)) { time.Sleep(100 * time.Millisecond) } if msg != nil && len(msg) >= 3 { - sequence := uint32(0) - if len(msg[len(msg)-1]) == 4 { - sequence = binary.LittleEndian.Uint32(msg[len(msg)-1]) + var nt NotificationType + switch string(msg[0]) { + case "hashblock": + nt = NotificationNewBlock + break + case "hashtx": + nt = NotificationNewTx + break + default: + nt = NotificationUnknown } - m := &MQMessage{ - Topic: string(msg[0]), - Sequence: sequence, - Body: msg[1], + if glog.V(2) { + sequence := uint32(0) + if len(msg[len(msg)-1]) == 4 { + sequence = binary.LittleEndian.Uint32(msg[len(msg)-1]) + } + glog.Infof("MQ: %v %s-%d", nt, string(msg[0]), sequence) } - callback(m) + callback(nt) } } mq.isRunning = false diff --git a/blockbook.go b/blockbook.go index a0266a29..8b6a6573 100644 --- a/blockbook.go +++ b/blockbook.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/hex" "flag" "log" "os" @@ -312,16 +311,14 @@ func onNewTxAddr(txid string, addr string) { } } -func pushSynchronizationHandler(m *bchain.MQMessage) { - // TODO - is coin specific, item for abstraction - body := hex.EncodeToString(m.Body) - glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) - if m.Topic == "hashblock" { +func pushSynchronizationHandler(nt bchain.NotificationType) { + glog.V(1).Infof("MQ: notification ", nt) + if nt == bchain.NotificationNewBlock { chanSyncIndex <- struct{}{} - } else if m.Topic == "hashtx" { + } else if nt == bchain.NotificationNewTx { chanSyncMempool <- struct{}{} } else { - glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body) + glog.Error("MQ: unknown notification sent") } }