From f69563346d7846d8d63238b073b7feb557cf4d3a Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 3 May 2016 18:53:43 -0700 Subject: [PATCH] exit listener for worker. --- lib/bcoin/workers.js | 68 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/lib/bcoin/workers.js b/lib/bcoin/workers.js index 0399a9cd..6292c1dd 100644 --- a/lib/bcoin/workers.js +++ b/lib/bcoin/workers.js @@ -39,7 +39,7 @@ function Workers(options) { this.uid = 0; this.size = options.size || Workers.CORES; this.timeout = options.timeout || 10000; - this.children = {}; + this.children = []; } utils.inherits(Workers, EventEmitter); @@ -53,7 +53,7 @@ Workers.CORES = getCores(); /** * Spawn a new worker. - * @param {Number} index - Worker ID. + * @param {Number} id - Worker ID. * @returns {Worker} */ @@ -72,7 +72,7 @@ Workers.prototype.spawn = function spawn(id) { child.on('exit', function(code) { bcoin.debug('Worker %d exited: %s', child.id, code); if (self.children[child.id] === child) - delete self.children[child.id]; + self.children[child.id] = null; }); child.on('packet', function(job, body) { @@ -106,6 +106,47 @@ Workers.prototype.alloc = function alloc(job) { return this.children[id]; }; +/** + * Emit an event on the worker side (all workers). + * @param {String} event + * @param {...Object} arg + * @returns {Boolean} + */ + +Workers.prototype.sendEvent = function sendEvent() { + var i, child; + var result = true; + + for (i = 0; i < this.children.length; i++) { + child = this.children[i]; + + if (!child) + continue; + + if (!child.sendEvent.apply(child, arguments)) + result = false; + } + + return result; +}; + +/** + * Destroy all workers. + */ + +Workers.prototype.destroy = function destroy() { + var i, child; + + for (i = 0; i < this.children.length; i++) { + child = this.children[i]; + + if (!child) + continue; + + child.destroy(); + } +}; + /** * Call a method for a worker to execute. * @param {String} method - Method name. @@ -278,6 +319,7 @@ Worker.prototype.write = function write(data) { * @param {String} job * @param {String} name * @param {Array} items + * @returns {Boolean} */ Worker.prototype.send = function send(job, name, items) { @@ -288,6 +330,7 @@ Worker.prototype.send = function send(job, name, items) { * Emit an event on the worker side. * @param {String} event * @param {...Object} arg + * @returns {Boolean} */ Worker.prototype.sendEvent = function sendEvent() { @@ -329,18 +372,23 @@ Worker.prototype.execute = function execute(job, method, args, timeout, callback clearTimeout(timer); timer = null; } - self.removeListener('error', listener); self.removeListener(event, listener); + self.removeListener('error', listener); + self.removeListener('exit', exitListener); callback(err, result); } + function exitListener(code) { + listener(new Error('Worker exited: ' + code)); + } + this.once(event, listener); this.once('error', listener); + this.once('exit', exitListener); if (timeout !== -1) { timer = setTimeout(function() { - self.removeListener(event, listener); - callback(new Error('Worker timed out.')); + listener(new Error('Worker timed out.')); }, timeout); } @@ -428,6 +476,7 @@ Master.prototype.write = function write(data) { * @param {String} job * @param {String} name * @param {Array} items + * @returns {Boolean} */ Master.prototype.send = function send(job, name, items) { @@ -438,6 +487,7 @@ Master.prototype.send = function send(job, name, items) { * Emit an event on the worker side. * @param {String} event * @param {...Object} arg + * @returns {Boolean} */ Master.prototype.sendEvent = function sendEvent() { @@ -487,7 +537,7 @@ Master.listen = function listen(id) { }); master.on('packet', function(job, body) { - var res; + var result; if (body.name === 'event') { master.emit.apply(master, body.items); @@ -495,7 +545,7 @@ Master.listen = function listen(id) { } try { - res = jobs[body.name].apply(jobs, body.items); + result = jobs[body.name].apply(jobs, body.items); } catch (e) { bcoin.debug(e.stack + ''); return master.send(job, 'response', [{ @@ -504,7 +554,7 @@ Master.listen = function listen(id) { }]); } - return master.send(job, 'response', [null, res]); + return master.send(job, 'response', [null, result]); }); return master;