From 13f7b48ae6833f20ce895e1408c4f32d912ffc97 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 10 Dec 2018 17:22:37 +0100 Subject: [PATCH] Add websocket interface --- api/worker.go | 14 +++-- bchain/mempool_bitcoin_type.go | 2 +- bchain/mempool_ethereum_type.go | 4 +- bchain/types.go | 2 +- blockbook.go | 4 +- server/public.go | 5 +- server/websocket.go | 102 ++++++++++++++++++++++++++++++++ static/test-socketio.html | 4 +- static/test-websocket.html | 12 ++-- 9 files changed, 128 insertions(+), 21 deletions(-) diff --git a/api/worker.go b/api/worker.go index 78932c92..b3f5ef81 100644 --- a/api/worker.go +++ b/api/worker.go @@ -101,20 +101,25 @@ func (w *Worker) GetSpendingTxid(txid string, n int) (string, error) { // GetTransaction reads transaction data from txid func (w *Worker) GetTransaction(txid string, spendingTxs bool, specificJSON bool) (*Tx, error) { - start := time.Now() bchainTx, height, err := w.txCache.GetTransaction(txid) if err != nil { return nil, NewAPIError(fmt.Sprintf("Tx not found, %v", err), true) } + return w.GetTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON) +} + +// GetTransactionFromBchainTx reads transaction data from txid +func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32, spendingTxs bool, specificJSON bool) (*Tx, error) { + var err error var ta *db.TxAddresses var erc20t []Erc20Transfer var ethSpecific *eth.EthereumTxData var blockhash string if bchainTx.Confirmations > 0 { if w.chainType == bchain.ChainBitcoinType { - ta, err = w.db.GetTxAddresses(txid) + ta, err = w.db.GetTxAddresses(bchainTx.Txid) if err != nil { - return nil, errors.Annotatef(err, "GetTxAddresses %v", txid) + return nil, errors.Annotatef(err, "GetTxAddresses %v", bchainTx.Txid) } } blockhash, err = w.db.GetBlockHash(height) @@ -284,9 +289,6 @@ func (w *Worker) GetTransaction(txid string, spendingTxs bool, specificJSON bool Erc20Transfers: erc20t, EthereumSpecific: ethSpecific, } - if spendingTxs { - glog.Info("GetTransaction ", txid, " finished in ", time.Since(start)) - } return r, nil } diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index c4eab212..e6ccf868 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -122,7 +122,7 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } if m.onNewTxAddr != nil { - m.onNewTxAddr(tx.Txid, addrDesc, true) + m.onNewTxAddr(tx, addrDesc, true) } } dispatched := 0 diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index c8ef39ab..ec5493d0 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -79,7 +79,7 @@ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } if onNewTxAddr != nil { - onNewTxAddr(tx.Txid, addrDesc, true) + onNewTxAddr(tx, addrDesc, true) } } for _, input := range tx.Vin { @@ -92,7 +92,7 @@ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { } io = append(io, addrIndex{string(addrDesc), int32(^i)}) if onNewTxAddr != nil { - onNewTxAddr(tx.Txid, addrDesc, false) + onNewTxAddr(tx, addrDesc, false) } } } diff --git a/bchain/types.go b/bchain/types.go index 4c280d54..cf565234 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -179,7 +179,7 @@ type Erc20Contract struct { type OnNewBlockFunc func(hash string, height uint32) // OnNewTxAddrFunc is used to send notification about a new transaction/address -type OnNewTxAddrFunc func(txid string, desc AddressDescriptor, isOutput bool) +type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor, isOutput bool) // BlockChain defines common interface to block chain daemon type BlockChain interface { diff --git a/blockbook.go b/blockbook.go index f2ff9f9b..9f918172 100644 --- a/blockbook.go +++ b/blockbook.go @@ -498,9 +498,9 @@ func storeInternalStateLoop() { glog.Info("storeInternalStateLoop stopped") } -func onNewTxAddr(txid string, desc bchain.AddressDescriptor, isOutput bool) { +func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) { for _, c := range callbacksOnNewTxAddr { - c(txid, desc, isOutput) + c(tx, desc, isOutput) } } diff --git a/server/public.go b/server/public.go index 5d149d36..08c010cc 100644 --- a/server/public.go +++ b/server/public.go @@ -167,8 +167,9 @@ func (s *PublicServer) OnNewBlock(hash string, height uint32) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *PublicServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor, isOutput bool) { - s.socketio.OnNewTxAddr(txid, desc, isOutput) +func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) { + s.socketio.OnNewTxAddr(tx.Txid, desc, isOutput) + s.websocket.OnNewTxAddr(tx, desc, isOutput) } func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) { diff --git a/server/websocket.go b/server/websocket.go index bedc95c2..42c2d244 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -197,6 +197,15 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) { func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewBlock(c) + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + for _, sa := range s.addressSubscriptions { + for sc := range sa { + if sc == c { + delete(sa, c) + } + } + } glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } @@ -227,6 +236,20 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs rv, err = s.unsubscribeNewBlock(c) return }, + "subscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + ad, err := s.unmarshalAddress(req.Params) + if err == nil { + rv, err = s.subscribeAddress(c, ad, req) + } + return + }, + "unsubscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + ad, err := s.unmarshalAddress(req.Params) + if err == nil { + rv, err = s.unsubscribeAddress(c, ad) + } + return + }, } func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) { @@ -325,6 +348,39 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac return } +func (s *WebsocketServer) unmarshalAddress(params []byte) (bchain.AddressDescriptor, error) { + r := struct { + Address string `json:"address"` + }{} + err := json.Unmarshal(params, &r) + if err != nil { + return nil, err + } + return s.chainParser.GetAddrDescFromAddress(r.Address) +} + +func (s *WebsocketServer) subscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) { + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + as, ok := s.addressSubscriptions[string(addrDesc)] + if !ok { + as = make(map[*websocketChannel]string) + s.addressSubscriptions[string(addrDesc)] = as + } + as[c] = req.ID + return +} + +func (s *WebsocketServer) unsubscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor) (res interface{}, err error) { + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + as, ok := s.addressSubscriptions[string(addrDesc)] + if ok { + delete(as, c) + } + return +} + // OnNewBlock is a callback that broadcasts info about new block to subscribed clients func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { s.newBlockSubscriptionsLock.Lock() @@ -346,3 +402,49 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { } glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } + +// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address +func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor, isOutput bool) { + // check if there is any subscription but release lock immediately, GetTransactionFromBchainTx may take some time + s.addressSubscriptionsLock.Lock() + as, ok := s.addressSubscriptions[string(addrDesc)] + s.addressSubscriptionsLock.Unlock() + if ok && len(as) > 0 { + addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) + if err != nil { + glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc) + return + } + if len(addr) == 1 { + atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false) + if err != nil { + glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) + return + } + data := struct { + Address string `json:"address"` + Input bool `json:"input"` + Tx *api.Tx `json:"tx"` + }{ + Address: addr[0], + Input: !isOutput, + Tx: atx, + } + // get the list of subscriptions again, this time keep the lock + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + as, ok = s.addressSubscriptions[string(addrDesc)] + if ok { + for c, id := range as { + if c.IsAlive() { + c.out <- &websocketRes{ + ID: id, + Data: &data, + } + } + } + glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels") + } + } + } +} diff --git a/static/test-socketio.html b/static/test-socketio.html index c4b07b30..c96724b6 100644 --- a/static/test-socketio.html +++ b/static/test-socketio.html @@ -11,7 +11,7 @@ } - Blockbook test socket.io + Blockbook Socket.io Test Page