diff --git a/server/websocket.go b/server/websocket.go index 42c2d244..cd8ae847 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -197,15 +197,7 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) { func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewBlock(c) - s.addressSubscriptionsLock.Lock() - defer s.addressSubscriptionsLock.Unlock() - for _, sa := range s.addressSubscriptions { - for sc := range sa { - if sc == c { - delete(sa, c) - } - } - } + s.unsubscribeAddresses(c) glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } @@ -229,26 +221,20 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs return }, "subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { - rv, err = s.subscribeNewBlock(c, req) - return + return s.subscribeNewBlock(c, req) }, "unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { - rv, err = s.unsubscribeNewBlock(c) - return + return s.unsubscribeNewBlock(c) }, - "subscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { - ad, err := s.unmarshalAddress(req.Params) + "subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + ad, err := s.unmarshalAddresses(req.Params) if err == nil { - rv, err = s.subscribeAddress(c, ad, req) + rv, err = s.subscribeAddresses(c, ad, req) } return }, - "unsubscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { - ad, err := s.unmarshalAddress(req.Params) - if err == nil { - rv, err = s.unsubscribeAddress(c, ad) - } - return + "unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + return s.unsubscribeAddresses(c) }, } @@ -334,51 +320,72 @@ func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction, return } +type subscriptionResponse struct { + Subscribed bool `json:"subscribed"` +} + func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() s.newBlockSubscriptions[c] = req.ID - return + return &subscriptionResponse{true}, nil } func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() delete(s.newBlockSubscriptions, c) - return + return &subscriptionResponse{false}, nil } -func (s *WebsocketServer) unmarshalAddress(params []byte) (bchain.AddressDescriptor, error) { +func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) { r := struct { - Address string `json:"address"` + Addresses []string `json:"addresses"` }{} err := json.Unmarshal(params, &r) if err != nil { return nil, err } - return s.chainParser.GetAddrDescFromAddress(r.Address) + rv := make([]bchain.AddressDescriptor, len(r.Addresses)) + for i, a := range r.Addresses { + ad, err := s.chainParser.GetAddrDescFromAddress(a) + if err != nil { + return nil, err + } + rv[i] = ad + } + return rv, nil } -func (s *WebsocketServer) subscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) { +func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) { + // unsubscribe all previous subscriptions + s.unsubscribeAddresses(c) s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() - as, ok := s.addressSubscriptions[string(addrDesc)] - if !ok { - as = make(map[*websocketChannel]string) - s.addressSubscriptions[string(addrDesc)] = as + for i := range addrDesc { + ads := string(addrDesc[i]) + as, ok := s.addressSubscriptions[ads] + if !ok { + as = make(map[*websocketChannel]string) + s.addressSubscriptions[ads] = as + } + as[c] = req.ID } - as[c] = req.ID - return + return &subscriptionResponse{true}, nil } -func (s *WebsocketServer) unsubscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor) (res interface{}, err error) { +// unsubscribeAddresses unsubscribes all address subscriptions by this channel +func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() - as, ok := s.addressSubscriptions[string(addrDesc)] - if ok { - delete(as, c) + for _, sa := range s.addressSubscriptions { + for sc := range sa { + if sc == c { + delete(sa, c) + } + } } - return + return &subscriptionResponse{false}, nil } // OnNewBlock is a callback that broadcasts info about new block to subscribed clients diff --git a/static/test-websocket.html b/static/test-websocket.html index 6013901a..c854b1d2 100644 --- a/static/test-websocket.html +++ b/static/test-websocket.html @@ -16,8 +16,10 @@ var messageID; var pendingMessages; var subscriptions; + var subscriptionNewBlockId; + var subscriptionAddressesId; function send(method, params, callback) { - id = messageID.toString(); + var id = messageID.toString(); messageID++; pendingMessages[id] = callback; var req = { @@ -29,7 +31,7 @@ return id; } function subscribe(method, params, callback) { - id = messageID.toString(); + var id = messageID.toString(); messageID++; subscriptions[id] = callback; var req = { @@ -40,12 +42,25 @@ ws.send(JSON.stringify(req)); return id; } + function unsubscribe(method, id, params, callback) { + delete subscriptions[id]; + pendingMessages[id] = callback; + var req = { + id, + method, + params + } + ws.send(JSON.stringify(req)); + return id; + } function connect(server) { messageID = 0; - pendingMessages = {} - subscriptions = {} + pendingMessages = {}; + subscriptions = {}; + subscribeNewBlockId = ""; + subscribeAddressesId = ""; if (server.startsWith("http")) { - server.replace("http", "ws"); + server = server.replace("http", "ws"); } if (!server.endsWith("/websocket")) { server += "/websocket"; @@ -115,22 +130,57 @@ const method = 'subscribeNewBlock'; const params = { }; - var id = subscribe(method, params, function (result) { + if (subscribeNewBlockId) { + delete subscriptions[subscribeNewBlockId]; + subscribeNewBlockId = ""; + } + subscribeNewBlockId = subscribe(method, params, function (result) { document.getElementById('subscribeNewBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + document.getElementById('subscribeNewBlockId').innerText = subscribeNewBlockId; + document.getElementById('unsubscribeNewBlockButton').setAttribute("style", "display: inherit;"); }); - document.getElementById('subscribeNewBlockId').innerText = id; } - function subscribeAddress() { - const method = 'subscribeAddress'; - var address = document.getElementById('subscribeAddressName').value; + function unsubscribeNewBlock() { + const method = 'unsubscribeNewBlock'; const params = { - address }; - var id = subscribe(method, params, function (result) { - document.getElementById('subscribeAddressResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + unsubscribe(method, subscribeNewBlockId, params, function (result) { + subscribeNewBlockId = ""; + document.getElementById('subscribeNewBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + document.getElementById('subscribeNewBlockId').innerText = ""; + document.getElementById('unsubscribeNewBlockButton').setAttribute("style", "display: none;"); + }); + } + + function subscribeAddresses() { + const method = 'subscribeAddresses'; + var addresses = document.getElementById('subscribeAddressesName').value.split(","); + addresses = addresses.map(s => s.trim()); + const params = { + addresses + }; + if (subscribeAddressesId) { + delete subscriptions[subscribeAddressesId]; + subscribeAddressesId = ""; + } + subscribeAddressesId = subscribe(method, params, function (result) { + document.getElementById('subscribeAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + document.getElementById('subscribeAddressesIds').innerText = subscribeAddressesId; + document.getElementById('unsubscribeAddressesButton').setAttribute("style", "display: inherit;"); + }); + } + + function unsubscribeAddresses() { + const method = 'unsubscribeAddresses'; + const params = { + }; + unsubscribe(method, subscribeAddressesId, params, function (result) { + subscribeAddressesId = ""; + document.getElementById('subscribeAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + document.getElementById('subscribeAddressesIds').innerText = ""; + document.getElementById('unsubscribeAddressesButton').setAttribute("style", "display: none;"); }); - document.getElementById('subscribeAddressIds').innerText += " " + id ; } function getBlockHeader() { @@ -276,32 +326,37 @@