exit listener for worker.

This commit is contained in:
Christopher Jeffrey 2016-05-03 18:53:43 -07:00
parent 9aa1e9b8f1
commit f69563346d
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD

View File

@ -39,7 +39,7 @@ function Workers(options) {
this.uid = 0;
this.size = options.size || Workers.CORES;
this.timeout = options.timeout || 10000;
this.children = {};
this.children = [];
}
utils.inherits(Workers, EventEmitter);
@ -53,7 +53,7 @@ Workers.CORES = getCores();
/**
* Spawn a new worker.
* @param {Number} index - Worker ID.
* @param {Number} id - Worker ID.
* @returns {Worker}
*/
@ -72,7 +72,7 @@ Workers.prototype.spawn = function spawn(id) {
child.on('exit', function(code) {
bcoin.debug('Worker %d exited: %s', child.id, code);
if (self.children[child.id] === child)
delete self.children[child.id];
self.children[child.id] = null;
});
child.on('packet', function(job, body) {
@ -106,6 +106,47 @@ Workers.prototype.alloc = function alloc(job) {
return this.children[id];
};
/**
* Emit an event on the worker side (all workers).
* @param {String} event
* @param {...Object} arg
* @returns {Boolean}
*/
Workers.prototype.sendEvent = function sendEvent() {
var i, child;
var result = true;
for (i = 0; i < this.children.length; i++) {
child = this.children[i];
if (!child)
continue;
if (!child.sendEvent.apply(child, arguments))
result = false;
}
return result;
};
/**
* Destroy all workers.
*/
Workers.prototype.destroy = function destroy() {
var i, child;
for (i = 0; i < this.children.length; i++) {
child = this.children[i];
if (!child)
continue;
child.destroy();
}
};
/**
* Call a method for a worker to execute.
* @param {String} method - Method name.
@ -278,6 +319,7 @@ Worker.prototype.write = function write(data) {
* @param {String} job
* @param {String} name
* @param {Array} items
* @returns {Boolean}
*/
Worker.prototype.send = function send(job, name, items) {
@ -288,6 +330,7 @@ Worker.prototype.send = function send(job, name, items) {
* Emit an event on the worker side.
* @param {String} event
* @param {...Object} arg
* @returns {Boolean}
*/
Worker.prototype.sendEvent = function sendEvent() {
@ -329,18 +372,23 @@ Worker.prototype.execute = function execute(job, method, args, timeout, callback
clearTimeout(timer);
timer = null;
}
self.removeListener('error', listener);
self.removeListener(event, listener);
self.removeListener('error', listener);
self.removeListener('exit', exitListener);
callback(err, result);
}
function exitListener(code) {
listener(new Error('Worker exited: ' + code));
}
this.once(event, listener);
this.once('error', listener);
this.once('exit', exitListener);
if (timeout !== -1) {
timer = setTimeout(function() {
self.removeListener(event, listener);
callback(new Error('Worker timed out.'));
listener(new Error('Worker timed out.'));
}, timeout);
}
@ -428,6 +476,7 @@ Master.prototype.write = function write(data) {
* @param {String} job
* @param {String} name
* @param {Array} items
* @returns {Boolean}
*/
Master.prototype.send = function send(job, name, items) {
@ -438,6 +487,7 @@ Master.prototype.send = function send(job, name, items) {
* Emit an event on the worker side.
* @param {String} event
* @param {...Object} arg
* @returns {Boolean}
*/
Master.prototype.sendEvent = function sendEvent() {
@ -487,7 +537,7 @@ Master.listen = function listen(id) {
});
master.on('packet', function(job, body) {
var res;
var result;
if (body.name === 'event') {
master.emit.apply(master, body.items);
@ -495,7 +545,7 @@ Master.listen = function listen(id) {
}
try {
res = jobs[body.name].apply(jobs, body.items);
result = jobs[body.name].apply(jobs, body.items);
} catch (e) {
bcoin.debug(e.stack + '');
return master.send(job, 'response', [{
@ -504,7 +554,7 @@ Master.listen = function listen(id) {
}]);
}
return master.send(job, 'response', [null, res]);
return master.send(job, 'response', [null, result]);
});
return master;