From 26c726c7719c619fb21b0dd3928bb7a37ee4554b Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Thu, 22 Feb 2018 13:32:06 +0100 Subject: [PATCH] Notify socket.io subscribers about new block --- bchain/mempool.go | 5 ++++- blockbook.go | 31 +++++++++++++++++++------------ server/socketio.go | 8 +++++++- server/static/test.html | 1 + 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/bchain/mempool.go b/bchain/mempool.go index 9aa4cd79..6dd93f6f 100644 --- a/bchain/mempool.go +++ b/bchain/mempool.go @@ -71,7 +71,7 @@ func (m *Mempool) updateMappings(newTxToInputOutput map[string]inputOutput, newS // Resync gets mempool transactions and maps output scripts to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *Mempool) Resync() error { +func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() @@ -95,6 +95,9 @@ func (m *Mempool) Resync() error { if outputScript != "" { io.outputScripts = append(io.outputScripts, scriptIndex{outputScript, output.N}) } + if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { + onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + } } io.inputs = make([]outpoint, 0, len(tx.Vin)) for _, input := range tx.Vin { diff --git a/blockbook.go b/blockbook.go index 9635ab0e..75e81515 100644 --- a/blockbook.go +++ b/blockbook.go @@ -64,15 +64,15 @@ var ( ) var ( - chanSyncIndex = make(chan struct{}) - chanSyncMempool = make(chan struct{}) - chanSyncIndexDone = make(chan struct{}) - chanSyncMempoolDone = make(chan struct{}) - chain *bchain.BitcoinRPC - mempool *bchain.Mempool - index *db.RocksDB - callbackOnNewIndexHash []func(hash string) - callbackOnNewTxAddr []func(txid string, addr string) + chanSyncIndex = make(chan struct{}) + chanSyncMempool = make(chan struct{}) + chanSyncIndexDone = make(chan struct{}) + chanSyncMempoolDone = make(chan struct{}) + chain *bchain.BitcoinRPC + mempool *bchain.Mempool + index *db.RocksDB + callbacksOnNewBlockHash []func(hash string) + callbacksOnNewTxAddr []func(txid string, addr string) ) func main() { @@ -171,7 +171,8 @@ func main() { } } }() - callbackOnNewIndexHash = append(callbackOnNewIndexHash, socketIoServer.OnNewBlockHash) + callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, socketIoServer.OnNewBlockHash) + callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, socketIoServer.OnNewTxAddr) } if *synchronize { @@ -270,7 +271,7 @@ func syncIndexLoop() { } func onNewBlockHash(hash string) { - for _, c := range callbackOnNewIndexHash { + for _, c := range callbacksOnNewBlockHash { c(hash) } } @@ -280,13 +281,19 @@ func syncMempoolLoop() { glog.Info("syncMempoolLoop starting") // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { - if err := mempool.Resync(); err != nil { + if err := mempool.Resync(onNewTxAddr); err != nil { glog.Error("syncMempoolLoop", err) } }) glog.Info("syncMempoolLoop stopped") } +func onNewTxAddr(txid string, addr string) { + for _, c := range callbacksOnNewTxAddr { + c(txid, addr) + } +} + func mqHandler(m *bchain.MQMessage) { body := hex.EncodeToString(m.Body) glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) diff --git a/server/socketio.go b/server/socketio.go index a71f8961..647e234f 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -655,7 +655,7 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac return nil } for _, a := range addrs { - c.Join(sc + "-" + a) + c.Join("bitcoind/addresstxid-" + a) } } else { sc = r[1 : len(r)-1] @@ -673,3 +673,9 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) { glog.Info("broadcasting new block hash ", hash) s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash) } + +// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block +func (s *SocketIoServer) OnNewTxAddr(txid string, addr string) { + glog.Info("broadcasting new txid ", txid, " for addr ", addr) + s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", map[string]string{"address": addr, "txid": txid}) +} diff --git a/server/static/test.html b/server/static/test.html index 574de0c1..d4118fdd 100644 --- a/server/static/test.html +++ b/server/static/test.html @@ -201,6 +201,7 @@ socket.on("bitcoind/addresstxid", function (result) { console.log('on bitcoind/addresstxid'); console.log(result); + document.getElementById('subscribeAddressTxidResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; }); }