From c8a8bcdc1bb9469daa23ef3356e180088317d8bb Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 18 Apr 2021 21:39:48 +0200 Subject: [PATCH] Move OnNeBlock and OnNewTxAddr to goroutine --- server/socketio.go | 8 ++++++-- server/websocket.go | 32 +++++++++++++++++++------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/server/socketio.go b/server/socketio.go index d88772b1..2e844af9 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -716,12 +716,16 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac return nil } -// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block -func (s *SocketIoServer) OnNewBlockHash(hash string) { +func (s *SocketIoServer) onNewBlockHashAsync(hash string) { c := s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash) glog.Info("broadcasting new block hash ", hash, " to ", c, " channels") } +// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block +func (s *SocketIoServer) OnNewBlockHash(hash string) { + go s.onNewBlockHashAsync(hash) +} + // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block func (s *SocketIoServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor) { addr, searchable, err := s.chainParser.GetAddressesFromAddrDesc(desc) diff --git a/server/websocket.go b/server/websocket.go index b0aec341..880d8e71 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -808,8 +808,7 @@ func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interfa return &subscriptionResponse{false}, nil } -// OnNewBlock is a callback that broadcasts info about new block to subscribed clients -func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { +func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() data := struct { @@ -828,6 +827,11 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } +// OnNewBlock is a callback that broadcasts info about new block to subscribed clients +func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { + go s.onNewBlockAsync(hash, height) +} + func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) { s.newTransactionSubscriptionsLock.Lock() defer s.newTransactionSubscriptionsLock.Unlock() @@ -915,21 +919,23 @@ func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string return subscribed } +func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[string]struct{}) { + atx, err := s.api.GetTransactionFromMempoolTx(tx) + if err != nil { + glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid) + return + } + s.sendOnNewTx(atx) + for stringAddressDescriptor := range subscribed { + s.sendOnNewTxAddr(stringAddressDescriptor, atx) + } +} + // OnNewTx is a callback that broadcasts info about a tx affecting subscribed address func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { subscribed := s.getNewTxSubscriptions(tx) if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 { - atx, err := s.api.GetTransactionFromMempoolTx(tx) - if err != nil { - glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid) - return - } - - s.sendOnNewTx(atx) - - for stringAddressDescriptor := range subscribed { - s.sendOnNewTxAddr(stringAddressDescriptor, atx) - } + go s.onNewTxAsync(tx, subscribed) } }