diff --git a/lib/http/base.js b/lib/http/base.js index 2acd97b2..7bb19e2f 100644 --- a/lib/http/base.js +++ b/lib/http/base.js @@ -18,6 +18,7 @@ var util = require('../utils/util'); var co = require('../utils/co'); var Validator = require('../utils/validator'); var List = require('../utils/list'); +var ListItem = List.Item; var fs = require('../utils/fs'); var crypto = require('../crypto/crypto'); @@ -41,6 +42,7 @@ function HTTPBase(options) { this.server = null; this.io = null; this.sockets = new List(); + this.channels = {}; this.routes = new Routes(); this.mounts = []; this.stack = []; @@ -513,21 +515,28 @@ HTTPBase.prototype._initSockets = function _initSockets() { /** * Broadcast event to channel. - * @param {String} channel + * @param {String} name * @param {String} type * @param {...Object} args */ -HTTPBase.prototype.to = function to(channel) { - var args = new Array(arguments.length - 1); - var i, socket; +HTTPBase.prototype.to = function to(name) { + var list = this.channels[name]; + var i, args, item, socket; + + if (!list) + return; + + assert(list.size > 0); + + args = new Array(arguments.length - 1); for (i = 1; i < arguments.length; i++) args[i - 1] = arguments[i]; - for (socket = this.sockets.head; socket; socket = socket.next) { - if (socket.channels[channel]) - socket.emit.apply(socket, args); + for (item = list.head; item; item = item.next) { + socket = item.value; + socket.emit.apply(socket, args); } }; @@ -539,9 +548,10 @@ HTTPBase.prototype.to = function to(channel) { */ HTTPBase.prototype.all = function all() { + var list = this.sockets; var socket; - for (socket = this.sockets.head; socket; socket = socket.next) + for (socket = list.head; socket; socket = socket.next) socket.emit.apply(socket, arguments); }; @@ -555,12 +565,20 @@ HTTPBase.prototype.addSocket = function addSocket(ws) { var socket = new WebSocket(ws); var i, route; - ws.on('error', function(err) { + socket.on('error', function(err) { self.emit('error', err); }); - ws.on('disconnect', function() { - assert(self.sockets.remove(socket)); + socket.on('close', function() { + self.removeSocket(socket); + }); + + socket.on('join channel', function(name) { + self.joinChannel(socket, name); + }); + + socket.on('leave channel', function(name) { + self.leaveChannel(socket, name); }); this.sockets.push(socket); @@ -573,6 +591,68 @@ HTTPBase.prototype.addSocket = function addSocket(ws) { this.emit('socket', socket); }; +/** + * Initialize websockets. + * @private + */ + +HTTPBase.prototype.removeSocket = function removeSocket(socket) { + var keys = Object.keys(socket.channels); + var i, key; + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + this.leaveChannel(socket, key); + } + + assert(this.sockets.remove(socket)); +}; + +/** + * Initialize websockets. + * @private + */ + +HTTPBase.prototype.joinChannel = function joinChannel(socket, name) { + var list = this.channels[name]; + var item = socket.channels[name]; + + if (item) + return; + + if (!list) { + list = new List(); + this.channels[name] = list; + } + + item = new ListItem(socket); + list.push(item); + + socket.channels[name] = item; +}; + + +/** + * Initialize websockets. + * @private + */ + +HTTPBase.prototype.leaveChannel = function leaveChannel(socket, name) { + var list = this.channels[name]; + var item = socket.channels[name]; + + if (!item) + return; + + assert(list); + assert(list.remove(item)); + + if (list.size === 0) + delete this.channels[name]; + + delete socket.channels[name]; +}; + /** * Open the server. * @alias HTTPBase#open @@ -1553,7 +1633,7 @@ WebSocket.prototype.init = function init() { }); socket.on('disconnect', function() { - self.dispatch('disconnect'); + self.dispatch('close'); }); }; @@ -1568,8 +1648,6 @@ WebSocket.prototype.onevent = co(function* onevent(packet) { else ack = this.socket.ack(packet.id); - this.dispatch(type, args); - try { result = yield this.fire(type, args); } catch (e) { @@ -1603,12 +1681,12 @@ WebSocket.prototype.fire = co(function* fire(type, args) { return yield handler(args); }); -WebSocket.prototype.join = function join(channel) { - this.channels[channel] = true; +WebSocket.prototype.join = function join(name) { + this.dispatch('join channel', name); }; -WebSocket.prototype.leave = function leave(channel) { - delete this.channels[channel]; +WebSocket.prototype.leave = function leave(name) { + this.dispatch('leave channel', name); }; WebSocket.prototype.dispatch = function dispatch() { diff --git a/lib/http/server.js b/lib/http/server.js index 79dcf2fb..de8c3df6 100644 --- a/lib/http/server.js +++ b/lib/http/server.js @@ -389,7 +389,7 @@ HTTPServer.prototype.handleSocket = function handleSocket(ws) { socket.start(); - socket.on('disconnect', function() { + socket.on('close', function() { socket.destroy(); }); @@ -728,8 +728,8 @@ ClientSocket.prototype.init = function init() { emit.call(self, 'error', err); }); - socket.on('disconnect', function() { - emit.call(self, 'disconnect'); + socket.on('close', function() { + emit.call(self, 'close'); }); }; diff --git a/lib/utils/list.js b/lib/utils/list.js index e5bfb568..657517d7 100644 --- a/lib/utils/list.js +++ b/lib/utils/list.js @@ -266,6 +266,7 @@ List.prototype.toArray = function toArray() { function ListItem(value) { this.next = null; this.prev = null; + this.value = value; } /*