http: better channel management.

This commit is contained in:
Christopher Jeffrey 2017-03-09 12:33:53 -08:00
parent 5cbbdbfb2f
commit 4c7a935484
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 100 additions and 21 deletions

View File

@ -18,6 +18,7 @@ var util = require('../utils/util');
var co = require('../utils/co');
var Validator = require('../utils/validator');
var List = require('../utils/list');
var ListItem = List.Item;
var fs = require('../utils/fs');
var crypto = require('../crypto/crypto');
@ -41,6 +42,7 @@ function HTTPBase(options) {
this.server = null;
this.io = null;
this.sockets = new List();
this.channels = {};
this.routes = new Routes();
this.mounts = [];
this.stack = [];
@ -513,21 +515,28 @@ HTTPBase.prototype._initSockets = function _initSockets() {
/**
* Broadcast event to channel.
* @param {String} channel
* @param {String} name
* @param {String} type
* @param {...Object} args
*/
HTTPBase.prototype.to = function to(channel) {
var args = new Array(arguments.length - 1);
var i, socket;
HTTPBase.prototype.to = function to(name) {
var list = this.channels[name];
var i, args, item, socket;
if (!list)
return;
assert(list.size > 0);
args = new Array(arguments.length - 1);
for (i = 1; i < arguments.length; i++)
args[i - 1] = arguments[i];
for (socket = this.sockets.head; socket; socket = socket.next) {
if (socket.channels[channel])
socket.emit.apply(socket, args);
for (item = list.head; item; item = item.next) {
socket = item.value;
socket.emit.apply(socket, args);
}
};
@ -539,9 +548,10 @@ HTTPBase.prototype.to = function to(channel) {
*/
HTTPBase.prototype.all = function all() {
var list = this.sockets;
var socket;
for (socket = this.sockets.head; socket; socket = socket.next)
for (socket = list.head; socket; socket = socket.next)
socket.emit.apply(socket, arguments);
};
@ -555,12 +565,20 @@ HTTPBase.prototype.addSocket = function addSocket(ws) {
var socket = new WebSocket(ws);
var i, route;
ws.on('error', function(err) {
socket.on('error', function(err) {
self.emit('error', err);
});
ws.on('disconnect', function() {
assert(self.sockets.remove(socket));
socket.on('close', function() {
self.removeSocket(socket);
});
socket.on('join channel', function(name) {
self.joinChannel(socket, name);
});
socket.on('leave channel', function(name) {
self.leaveChannel(socket, name);
});
this.sockets.push(socket);
@ -573,6 +591,68 @@ HTTPBase.prototype.addSocket = function addSocket(ws) {
this.emit('socket', socket);
};
/**
* Initialize websockets.
* @private
*/
HTTPBase.prototype.removeSocket = function removeSocket(socket) {
var keys = Object.keys(socket.channels);
var i, key;
for (i = 0; i < keys.length; i++) {
key = keys[i];
this.leaveChannel(socket, key);
}
assert(this.sockets.remove(socket));
};
/**
* Initialize websockets.
* @private
*/
HTTPBase.prototype.joinChannel = function joinChannel(socket, name) {
var list = this.channels[name];
var item = socket.channels[name];
if (item)
return;
if (!list) {
list = new List();
this.channels[name] = list;
}
item = new ListItem(socket);
list.push(item);
socket.channels[name] = item;
};
/**
* Initialize websockets.
* @private
*/
HTTPBase.prototype.leaveChannel = function leaveChannel(socket, name) {
var list = this.channels[name];
var item = socket.channels[name];
if (!item)
return;
assert(list);
assert(list.remove(item));
if (list.size === 0)
delete this.channels[name];
delete socket.channels[name];
};
/**
* Open the server.
* @alias HTTPBase#open
@ -1553,7 +1633,7 @@ WebSocket.prototype.init = function init() {
});
socket.on('disconnect', function() {
self.dispatch('disconnect');
self.dispatch('close');
});
};
@ -1568,8 +1648,6 @@ WebSocket.prototype.onevent = co(function* onevent(packet) {
else
ack = this.socket.ack(packet.id);
this.dispatch(type, args);
try {
result = yield this.fire(type, args);
} catch (e) {
@ -1603,12 +1681,12 @@ WebSocket.prototype.fire = co(function* fire(type, args) {
return yield handler(args);
});
WebSocket.prototype.join = function join(channel) {
this.channels[channel] = true;
WebSocket.prototype.join = function join(name) {
this.dispatch('join channel', name);
};
WebSocket.prototype.leave = function leave(channel) {
delete this.channels[channel];
WebSocket.prototype.leave = function leave(name) {
this.dispatch('leave channel', name);
};
WebSocket.prototype.dispatch = function dispatch() {

View File

@ -389,7 +389,7 @@ HTTPServer.prototype.handleSocket = function handleSocket(ws) {
socket.start();
socket.on('disconnect', function() {
socket.on('close', function() {
socket.destroy();
});
@ -728,8 +728,8 @@ ClientSocket.prototype.init = function init() {
emit.call(self, 'error', err);
});
socket.on('disconnect', function() {
emit.call(self, 'disconnect');
socket.on('close', function() {
emit.call(self, 'close');
});
};

View File

@ -266,6 +266,7 @@ List.prototype.toArray = function toArray() {
function ListItem(value) {
this.next = null;
this.prev = null;
this.value = value;
}
/*