Move OnNeBlock and OnNewTxAddr to goroutine

This commit is contained in:
Martin Boehm 2021-04-18 21:39:48 +02:00
parent 538ff0cdcb
commit c8a8bcdc1b
2 changed files with 25 additions and 15 deletions

View File

@ -716,12 +716,16 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac
return nil
}
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block
func (s *SocketIoServer) OnNewBlockHash(hash string) {
func (s *SocketIoServer) onNewBlockHashAsync(hash string) {
c := s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash)
glog.Info("broadcasting new block hash ", hash, " to ", c, " channels")
}
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block
func (s *SocketIoServer) OnNewBlockHash(hash string) {
go s.onNewBlockHashAsync(hash)
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *SocketIoServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor) {
addr, searchable, err := s.chainParser.GetAddressesFromAddrDesc(desc)

View File

@ -808,8 +808,7 @@ func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interfa
return &subscriptionResponse{false}, nil
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients
func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
data := struct {
@ -828,6 +827,11 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients
func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
go s.onNewBlockAsync(hash, height)
}
func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
@ -915,21 +919,23 @@ func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string
return subscribed
}
func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[string]struct{}) {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
}
}
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
subscribed := s.getNewTxSubscriptions(tx)
if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
}
go s.onNewTxAsync(tx, subscribed)
}
}