Change websocket address subscription behavior

This commit is contained in:
Martin Boehm 2018-12-11 11:50:43 +01:00
parent 13f7b48ae6
commit 4b39519750
2 changed files with 123 additions and 61 deletions

View File

@ -197,15 +197,7 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) {
func (s *WebsocketServer) onDisconnect(c *websocketChannel) {
s.unsubscribeNewBlock(c)
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
for _, sa := range s.addressSubscriptions {
for sc := range sa {
if sc == c {
delete(sa, c)
}
}
}
s.unsubscribeAddresses(c)
glog.Info("Client disconnected ", c.id, ", ", c.ip)
s.metrics.WebsocketClients.Dec()
}
@ -229,26 +221,20 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
return
},
"subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
rv, err = s.subscribeNewBlock(c, req)
return
return s.subscribeNewBlock(c, req)
},
"unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
rv, err = s.unsubscribeNewBlock(c)
return
return s.unsubscribeNewBlock(c)
},
"subscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddress(req.Params)
"subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddresses(req.Params)
if err == nil {
rv, err = s.subscribeAddress(c, ad, req)
rv, err = s.subscribeAddresses(c, ad, req)
}
return
},
"unsubscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddress(req.Params)
if err == nil {
rv, err = s.unsubscribeAddress(c, ad)
}
return
"unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeAddresses(c)
},
}
@ -334,51 +320,72 @@ func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction,
return
}
type subscriptionResponse struct {
Subscribed bool `json:"subscribed"`
}
func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
s.newBlockSubscriptions[c] = req.ID
return
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
delete(s.newBlockSubscriptions, c)
return
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) unmarshalAddress(params []byte) (bchain.AddressDescriptor, error) {
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) {
r := struct {
Address string `json:"address"`
Addresses []string `json:"addresses"`
}{}
err := json.Unmarshal(params, &r)
if err != nil {
return nil, err
}
return s.chainParser.GetAddrDescFromAddress(r.Address)
rv := make([]bchain.AddressDescriptor, len(r.Addresses))
for i, a := range r.Addresses {
ad, err := s.chainParser.GetAddrDescFromAddress(a)
if err != nil {
return nil, err
}
rv[i] = ad
}
return rv, nil
}
func (s *WebsocketServer) subscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) {
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) {
// unsubscribe all previous subscriptions
s.unsubscribeAddresses(c)
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[string(addrDesc)]
if !ok {
as = make(map[*websocketChannel]string)
s.addressSubscriptions[string(addrDesc)] = as
for i := range addrDesc {
ads := string(addrDesc[i])
as, ok := s.addressSubscriptions[ads]
if !ok {
as = make(map[*websocketChannel]string)
s.addressSubscriptions[ads] = as
}
as[c] = req.ID
}
as[c] = req.ID
return
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor) (res interface{}, err error) {
// unsubscribeAddresses unsubscribes all address subscriptions by this channel
func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[string(addrDesc)]
if ok {
delete(as, c)
for _, sa := range s.addressSubscriptions {
for sc := range sa {
if sc == c {
delete(sa, c)
}
}
}
return
return &subscriptionResponse{false}, nil
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients

View File

@ -16,8 +16,10 @@
var messageID;
var pendingMessages;
var subscriptions;
var subscriptionNewBlockId;
var subscriptionAddressesId;
function send(method, params, callback) {
id = messageID.toString();
var id = messageID.toString();
messageID++;
pendingMessages[id] = callback;
var req = {
@ -29,7 +31,7 @@
return id;
}
function subscribe(method, params, callback) {
id = messageID.toString();
var id = messageID.toString();
messageID++;
subscriptions[id] = callback;
var req = {
@ -40,12 +42,25 @@
ws.send(JSON.stringify(req));
return id;
}
function unsubscribe(method, id, params, callback) {
delete subscriptions[id];
pendingMessages[id] = callback;
var req = {
id,
method,
params
}
ws.send(JSON.stringify(req));
return id;
}
function connect(server) {
messageID = 0;
pendingMessages = {}
subscriptions = {}
pendingMessages = {};
subscriptions = {};
subscribeNewBlockId = "";
subscribeAddressesId = "";
if (server.startsWith("http")) {
server.replace("http", "ws");
server = server.replace("http", "ws");
}
if (!server.endsWith("/websocket")) {
server += "/websocket";
@ -115,22 +130,57 @@
const method = 'subscribeNewBlock';
const params = {
};
var id = subscribe(method, params, function (result) {
if (subscribeNewBlockId) {
delete subscriptions[subscribeNewBlockId];
subscribeNewBlockId = "";
}
subscribeNewBlockId = subscribe(method, params, function (result) {
document.getElementById('subscribeNewBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeNewBlockId').innerText = subscribeNewBlockId;
document.getElementById('unsubscribeNewBlockButton').setAttribute("style", "display: inherit;");
});
document.getElementById('subscribeNewBlockId').innerText = id;
}
function subscribeAddress() {
const method = 'subscribeAddress';
var address = document.getElementById('subscribeAddressName').value;
function unsubscribeNewBlock() {
const method = 'unsubscribeNewBlock';
const params = {
address
};
var id = subscribe(method, params, function (result) {
document.getElementById('subscribeAddressResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
unsubscribe(method, subscribeNewBlockId, params, function (result) {
subscribeNewBlockId = "";
document.getElementById('subscribeNewBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeNewBlockId').innerText = "";
document.getElementById('unsubscribeNewBlockButton').setAttribute("style", "display: none;");
});
}
function subscribeAddresses() {
const method = 'subscribeAddresses';
var addresses = document.getElementById('subscribeAddressesName').value.split(",");
addresses = addresses.map(s => s.trim());
const params = {
addresses
};
if (subscribeAddressesId) {
delete subscriptions[subscribeAddressesId];
subscribeAddressesId = "";
}
subscribeAddressesId = subscribe(method, params, function (result) {
document.getElementById('subscribeAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeAddressesIds').innerText = subscribeAddressesId;
document.getElementById('unsubscribeAddressesButton').setAttribute("style", "display: inherit;");
});
}
function unsubscribeAddresses() {
const method = 'unsubscribeAddresses';
const params = {
};
unsubscribe(method, subscribeAddressesId, params, function (result) {
subscribeAddressesId = "";
document.getElementById('subscribeAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeAddressesIds').innerText = "";
document.getElementById('unsubscribeAddressesButton').setAttribute("style", "display: none;");
});
document.getElementById('subscribeAddressIds').innerText += " " + id ;
}
function getBlockHeader() {
@ -276,32 +326,37 @@
<div class="col">
<input class="btn btn-secondary" type="button" value="subscribe new block" onclick="subscribeNewBlock()">
</div>
<div class="col-8">
<div class="col-4">
<span id="subscribeNewBlockId"></span>
</div>
<div class="col">
<input class="btn btn-secondary" id="unsubscribeNewBlockButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeNewBlock()">
</div>
</div>
<div class="row">
<div class="col" id="subscribeNewBlockResult"></div>
</div>
<div class="row">
<div class="col">
<input class="btn btn-secondary" type="button" value="subscribe address" onclick="subscribeAddress()">
<input class="btn btn-secondary" type="button" value="subscribe address" onclick="subscribeAddresses()">
</div>
<div class="col-8">
<input type="text" class="form-control" id="subscribeAddressName" value="2MzTmvPJLZaLzD9XdN3jMtQA5NexC3rAPww">
<input type="text" class="form-control" id="subscribeAddressesName" value="0x65513ecd11fd3a5b1fefdcc6a500b025008405a2,0xbdfeff9a1f4a1bdf483d680046344316019c58cf">
</div>
<div class="col">
<span id="subscribeAddressIds"></span>
<span id="subscribeAddressesIds"></span>
</div>
<div class="col">
<input class="btn btn-secondary" id="unsubscribeAddressesButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeAddresses()">
</div>
</div>
</div>
<div class="row">
<div class="col" id="subscribeAddressResult">
</div>
<div class="col" id="subscribeAddressesResult"></div>
</div>
</div>
</body>
<script>
document.getElementById('serverAddress').value = window.location.protocol.replace("http", "ws") + "//" + window.location.host;
document.getElementById('serverAddress').value = window.location.protocol + "//" + window.location.host;
</script>
</html>