From 7185060f629597db177487946a4e3299452e193b Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 31 Jan 2018 17:51:48 +0100 Subject: [PATCH] Synchronize mempool --- bchain/mempool.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++ bchain/mq.go | 2 +- blockbook.go | 58 ++++++++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 15 deletions(-) create mode 100644 bchain/mempool.go diff --git a/bchain/mempool.go b/bchain/mempool.go new file mode 100644 index 00000000..de288406 --- /dev/null +++ b/bchain/mempool.go @@ -0,0 +1,73 @@ +package bchain + +import ( + "encoding/hex" + "sync" + + "github.com/golang/glog" +) + +// Mempool is mempool handle. +type Mempool struct { + chain *BitcoinRPC + mux sync.Mutex + scriptToTx map[string][]string + txToScript map[string][]string +} + +// NewMempool creates new mempool handler. +func NewMempool(chain *BitcoinRPC) *Mempool { + return &Mempool{chain: chain} +} + +// GetTransactions returns slice of mempool transactions for given output script. +func (m *Mempool) GetTransactions(outputScript []byte) ([]string, error) { + m.mux.Lock() + defer m.mux.Unlock() + scriptHex := hex.EncodeToString(outputScript) + return m.scriptToTx[scriptHex], nil +} + +func (m *Mempool) updateMaps(newScriptToTx map[string][]string, newTxToScript map[string][]string) { + m.mux.Lock() + defer m.mux.Unlock() + m.scriptToTx = newScriptToTx + m.txToScript = newTxToScript +} + +// Resync gets mempool transactions and maps output scripts to transactions. +// Resync is not reentrant, it should be called from a single thread. +// Read operations (GetTransactions) are safe. +func (m *Mempool) Resync() error { + glog.Info("Mempool: resync") + txs, err := m.chain.GetMempool() + if err != nil { + return err + } + newScriptToTx := make(map[string][]string) + newTxToScript := make(map[string][]string) + for _, txid := range txs { + scripts := m.txToScript[txid] + if scripts == nil { + tx, err := m.chain.GetTransaction(txid) + if err != nil { + glog.Error("cannot get transaction ", txid, ": ", err) + continue + } + scripts = make([]string, 0, len(tx.Vout)) + for _, output := range tx.Vout { + outputScript := output.ScriptPubKey.Hex + if outputScript != "" { + scripts = append(scripts, outputScript) + } + } + } + newTxToScript[txid] = scripts + for _, script := range scripts { + newScriptToTx[script] = append(newScriptToTx[script], txid) + } + } + m.updateMaps(newScriptToTx, newTxToScript) + glog.Info("Mempool: resync finished, ", len(m.txToScript), " transactions in mempool") + return nil +} diff --git a/bchain/mq.go b/bchain/mq.go index 0e41dba7..bd6f03bb 100644 --- a/bchain/mq.go +++ b/bchain/mq.go @@ -22,7 +22,7 @@ type MQMessage struct { Body []byte } -// New creates new Bitcoind ZeroMQ listener +// NewMQ creates new Bitcoind ZeroMQ listener // callback function receives messages func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) { context, err := zmq.NewContext() diff --git a/blockbook.go b/blockbook.go index 678e0c0c..eaa7d447 100644 --- a/blockbook.go +++ b/blockbook.go @@ -47,9 +47,13 @@ var ( ) var ( - syncChannel = make(chan struct{}) - chain *bchain.BitcoinRPC - index *db.RocksDB + chanSyncIndex = make(chan struct{}) + chanSyncMempool = make(chan struct{}) + chanSyncIndexDone = make(chan struct{}) + chanSyncMempoolDone = make(chan struct{}) + chain *bchain.BitcoinRPC + mempool *bchain.Mempool + index *db.RocksDB ) func main() { @@ -83,6 +87,8 @@ func main() { } } + mempool = bchain.NewMempool(chain) + var err error index, err = db.NewRocksDB(*dbPath) if err != nil { @@ -106,6 +112,9 @@ func main() { return } + go syncIndexLoop() + go syncMempoolLoop() + if *synchronize { if err := resyncIndex(); err != nil { glog.Fatal("resyncIndex ", err) @@ -135,12 +144,10 @@ func main() { if !*synchronize { glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter") } else { - go syncLoop() mq, err = bchain.NewMQ(*zeroMQBinding, mqHandler) if err != nil { glog.Fatal("mq: ", err) } - } } @@ -176,21 +183,41 @@ func main() { waitForSignalAndShutdown(httpServer, mq, 5*time.Second) } + close(chanSyncIndex) + close(chanSyncMempool) + <-chanSyncIndexDone + <-chanSyncMempoolDone } -func syncLoop() { - for range syncChannel { - resyncIndex() +func syncIndexLoop() { + defer close(chanSyncIndexDone) + glog.Info("syncIndexLoop starting") + for range chanSyncIndex { + if err := resyncIndex(); err != nil { + glog.Error(err) + } } + glog.Info("syncIndexLoop stopped") +} + +func syncMempoolLoop() { + defer close(chanSyncMempoolDone) + glog.Info("syncMempoolLoop starting") + for range chanSyncMempool { + if err := mempool.Resync(); err != nil { + glog.Error(err) + } + } + glog.Info("syncMempoolLoop stopped") } func mqHandler(m *bchain.MQMessage) { body := hex.EncodeToString(m.Body) - glog.Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) + glog.V(2).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) if m.Topic == "hashblock" { - syncChannel <- struct{}{} + chanSyncIndex <- struct{}{} } else if m.Topic == "hashtx" { - + chanSyncMempool <- struct{}{} } else { glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body) } @@ -218,8 +245,6 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bchain.MQ, timeout time. glog.Error("HttpServer.Shutdown error: ", err) } } - - close(syncChannel) } func printResult(txid string) error { @@ -329,7 +354,12 @@ func resyncIndex() error { } } - return connectBlocks(hash) + err = connectBlocks(hash) + if err != nil { + return err + } + chanSyncMempool <- struct{}{} + return nil } func connectBlocks(