Optimize websocket subscribe and unsubscribe addresses

This commit is contained in:
Martin Boehm 2021-05-17 23:50:08 +02:00
parent 8fd8e17929
commit f5ae7c540c

View File

@ -53,6 +53,7 @@ type websocketChannel struct {
requestHeader http.Header requestHeader http.Header
alive bool alive bool
aliveLock sync.Mutex aliveLock sync.Mutex
addrDescs []string // subscribed address descriptors as strings
} }
// WebsocketServer is a handle to websocket server // WebsocketServer is a handle to websocket server
@ -722,7 +723,7 @@ func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res in
return &subscriptionResponse{false}, nil return &subscriptionResponse{false}, nil
} }
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) { func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, error) {
r := struct { r := struct {
Addresses []string `json:"addresses"` Addresses []string `json:"addresses"`
}{} }{}
@ -730,38 +731,41 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDes
if err != nil { if err != nil {
return nil, err return nil, err
} }
rv := make([]bchain.AddressDescriptor, len(r.Addresses)) rv := make([]string, len(r.Addresses))
for i, a := range r.Addresses { for i, a := range r.Addresses {
ad, err := s.chainParser.GetAddrDescFromAddress(a) ad, err := s.chainParser.GetAddrDescFromAddress(a)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rv[i] = ad rv[i] = string(ad)
} }
return rv, nil return rv, nil
} }
// unsubscribe addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses // unsubscribe addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses
func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
for ads, sa := range s.addressSubscriptions { for _, ads := range c.addrDescs {
for sc := range sa { sa, e := s.addressSubscriptions[ads]
if sc == c { if e {
delete(sa, c) for sc := range sa {
if sc == c {
delete(sa, c)
}
}
if len(sa) == 0 {
delete(s.addressSubscriptions, ads)
} }
} }
if len(sa) == 0 {
delete(s.addressSubscriptions, ads)
}
} }
c.addrDescs = nil
} }
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) { func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, req *websocketReq) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock() s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock() defer s.addressSubscriptionsLock.Unlock()
// unsubscribe all previous subscriptions // unsubscribe all previous subscriptions
s.doUnsubscribeAddresses(c) s.doUnsubscribeAddresses(c)
for i := range addrDesc { for _, ads := range addrDesc {
ads := string(addrDesc[i])
as, ok := s.addressSubscriptions[ads] as, ok := s.addressSubscriptions[ads]
if !ok { if !ok {
as = make(map[*websocketChannel]string) as = make(map[*websocketChannel]string)
@ -769,6 +773,7 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bch
} }
as[c] = req.ID as[c] = req.ID
} }
c.addrDescs = addrDesc
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions))) s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions)))
return &subscriptionResponse{true}, nil return &subscriptionResponse{true}, nil
} }