workers: refactor exiting.

This commit is contained in:
Christopher Jeffrey 2017-07-05 13:49:45 -07:00
parent 0250cf4296
commit 8f295a376d
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 117 additions and 142 deletions

View File

@ -111,8 +111,9 @@ Node.prototype.init = function init() {
this.logger.warning('Worker %d exited: %s.', child.id, code); this.logger.warning('Worker %d exited: %s.', child.id, code);
}); });
this.workers.on('log', (log) => { this.workers.on('log', (text, child) => {
this.logger.debug(log); this.logger.debug(`Worker ${child.id} says:`);
this.logger.debug(text);
}); });
this.workers.on('error', (err, child) => { this.workers.on('error', (err, child) => {
@ -155,6 +156,7 @@ Node.prototype.location = function location(name) {
Node.prototype.handlePreopen = async function handlePreopen() { Node.prototype.handlePreopen = async function handlePreopen() {
await this.logger.open(); await this.logger.open();
await this.workers.open();
this.bind(this.network.time, 'offset', (offset) => { this.bind(this.network.time, 'offset', (offset) => {
this.logger.info('Time offset: %d (%d minutes).', offset, offset / 60 | 0); this.logger.info('Time offset: %d (%d minutes).', offset, offset / 60 | 0);
@ -217,6 +219,7 @@ Node.prototype.handleClose = async function handleClose() {
this.bound.length = 0; this.bound.length = 0;
this.startTime = -1; this.startTime = -1;
await this.workers.close();
await this.logger.close(); await this.logger.close();
}; };

View File

@ -67,8 +67,9 @@ server.create = function create(options) {
logger.warning('Worker %d exited: %s.', child.id, code); logger.warning('Worker %d exited: %s.', child.id, code);
}); });
workers.on('log', (log) => { workers.on('log', (text, child) => {
logger.debug(log); logger.debug(`Worker ${child.id} says:`);
logger.debug(text);
}); });
workers.on('error', (err, child) => { workers.on('error', (err, child) => {

View File

@ -22,6 +22,10 @@ const packets = require('./packets');
const HAS_WORKERS = typeof global.Worker === 'function'; const HAS_WORKERS = typeof global.Worker === 'function';
const HAS_CP = typeof cp.spawn === 'function'; const HAS_CP = typeof cp.spawn === 'function';
const HAS_SUPPORT = HAS_WORKERS || HAS_CP;
const pools = new Set();
let exitBound = false;
/** /**
* A worker pool. * A worker pool.
@ -32,7 +36,7 @@ const HAS_CP = typeof cp.spawn === 'function';
* @param {Number} [options.timeout=10000] - Execution timeout. * @param {Number} [options.timeout=10000] - Execution timeout.
* @property {Number} size * @property {Number} size
* @property {Number} timeout * @property {Number} timeout
* @property {Object} children * @property {Map} children
* @property {Number} nonce * @property {Number} nonce
*/ */
@ -44,22 +48,18 @@ function WorkerPool(options) {
this.size = WorkerPool.CORES; this.size = WorkerPool.CORES;
this.timeout = 60000; this.timeout = 60000;
this.children = []; this.children = new Map();
this.nonce = 0; this.nonce = 0;
this.enabled = true; this.enabled = true;
bindExit();
pools.add(this);
this.set(options); this.set(options);
} }
util.inherits(WorkerPool, EventEmitter); util.inherits(WorkerPool, EventEmitter);
/**
* Whether workers are supported.
* @type {Boolean}
*/
WorkerPool.support = HAS_WORKERS || HAS_CP;
/** /**
* Number of CPUs/cores available. * Number of CPUs/cores available.
* @type {Number} * @type {Number}
@ -88,101 +88,6 @@ WorkerPool.WORKER_URL = '/bcoin-worker.js';
WorkerPool.MASTER_URL = '/bcoin-master.js'; WorkerPool.MASTER_URL = '/bcoin-master.js';
/**
* Global list of workers.
* @type {Array}
*/
WorkerPool.children = [];
/**
* Destroy all workers.
* Used for cleaning up workers on exit.
* @private
*/
WorkerPool.cleanup = function cleanup() {
while (WorkerPool.children.length > 0)
WorkerPool.children.pop().destroy();
};
/**
* Whether exit events have been bound globally.
* @private
* @type {Boolean}
*/
WorkerPool.bound = false;
/**
* Bind to process events in
* order to cleanup listeners.
* @private
*/
WorkerPool.bindExit = function bindExit() {
let onSighup, onSigint, onSigterm, onError;
if (!HAS_CP)
return;
if (WorkerPool.bound)
return;
WorkerPool.bound = true;
onSighup = () => {
process.exit(1 | 0x80);
};
onSigint = () => {
process.exit(2 | 0x80);
};
onSigterm = () => {
process.exit(15 | 0x80);
};
onError = (err) => {
if (err && err.stack)
util.error(err.stack + '');
process.exit(1);
};
process.once('exit', () => {
WorkerPool.cleanup();
});
if (process.listenerCount('SIGHUP') === 0)
process.once('SIGHUP', onSighup);
if (process.listenerCount('SIGINT') === 0)
process.once('SIGINT', onSigint);
if (process.listenerCount('SIGTERM') === 0)
process.once('SIGTERM', onSigterm);
if (process.listenerCount('uncaughtException') === 0)
process.once('uncaughtException', onError);
process.on('newListener', (name) => {
switch (name) {
case 'SIGHUP':
process.removeListener(name, onSighup);
break;
case 'SIGINT':
process.removeListener(name, onSigint);
break;
case 'SIGTERM':
process.removeListener(name, onSigterm);
break;
case 'uncaughtException':
process.removeListener(name, onError);
break;
}
});
};
/** /**
* Set worker pool options. * Set worker pool options.
* @param {Object} options * @param {Object} options
@ -226,6 +131,25 @@ WorkerPool.prototype.disable = function disable() {
this.enabled = true; this.enabled = true;
}; };
/**
* Open worker pool.
* @returns {Promise}
*/
WorkerPool.prototype.open = async function open() {
pools.add(this);
};
/**
* Close worker pool.
* @returns {Promise}
*/
WorkerPool.prototype.close = async function close() {
pools.delete(this);
this.destroy();
};
/** /**
* Spawn a new worker. * Spawn a new worker.
* @param {Number} id - Worker ID. * @param {Number} id - Worker ID.
@ -241,8 +165,8 @@ WorkerPool.prototype.spawn = function spawn(id) {
child.on('exit', (code) => { child.on('exit', (code) => {
this.emit('exit', code, child); this.emit('exit', code, child);
if (this.children[child.id] === child) if (this.children.get(child.id) === child)
this.children[child.id] = null; this.children.delete(child.id);
}); });
child.on('event', (items) => { child.on('event', (items) => {
@ -250,6 +174,10 @@ WorkerPool.prototype.spawn = function spawn(id) {
this.emit.apply(this, items); this.emit.apply(this, items);
}); });
child.on('log', (text) => {
this.emit('log', text, child);
});
this.emit('spawn', child); this.emit('spawn', child);
return child; return child;
@ -263,9 +191,11 @@ WorkerPool.prototype.spawn = function spawn(id) {
WorkerPool.prototype.alloc = function alloc() { WorkerPool.prototype.alloc = function alloc() {
let id = this.nonce++ % this.size; let id = this.nonce++ % this.size;
if (!this.children[id])
this.children[id] = this.spawn(id); if (!this.children.has(id))
return this.children[id]; this.children.set(id, this.spawn(id));
return this.children.get(id);
}; };
/** /**
@ -278,10 +208,7 @@ WorkerPool.prototype.alloc = function alloc() {
WorkerPool.prototype.sendEvent = function sendEvent() { WorkerPool.prototype.sendEvent = function sendEvent() {
let result = true; let result = true;
for (let child of this.children) { for (let child of this.children.values()) {
if (!child)
continue;
if (!child.sendEvent.apply(child, arguments)) if (!child.sendEvent.apply(child, arguments))
result = false; result = false;
} }
@ -294,12 +221,8 @@ WorkerPool.prototype.sendEvent = function sendEvent() {
*/ */
WorkerPool.prototype.destroy = function destroy() { WorkerPool.prototype.destroy = function destroy() {
for (let child of this.children) { for (let child of this.children.values())
if (!child)
continue;
child.destroy(); child.destroy();
}
}; };
/** /**
@ -312,7 +235,7 @@ WorkerPool.prototype.destroy = function destroy() {
WorkerPool.prototype.execute = function execute(packet, timeout) { WorkerPool.prototype.execute = function execute(packet, timeout) {
let result, child; let result, child;
if (!this.enabled || !WorkerPool.support) { if (!this.enabled || !HAS_SUPPORT) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
setImmediate(() => { setImmediate(() => {
try { try {
@ -531,7 +454,6 @@ Worker.prototype._init = function _init() {
throw new Error('Workers not available.'); throw new Error('Workers not available.');
} }
this._bind();
this._listen(); this._listen();
}; };
@ -619,23 +541,6 @@ Worker.prototype._initChildProcess = function _initChildProcess() {
}); });
}; };
/**
* Bind to exit listener.
* @private
*/
Worker.prototype._bind = function _bind() {
this.on('exit', (code) => {
let i = WorkerPool.children.indexOf(this);
if (i !== -1)
WorkerPool.children.splice(i, 1);
});
WorkerPool.children.push(this);
WorkerPool.bindExit();
};
/** /**
* Listen for packets. * Listen for packets.
* @private * @private
@ -672,7 +577,6 @@ Worker.prototype.handlePacket = function handlePacket(packet) {
this.emit('event', packet.items); this.emit('event', packet.items);
break; break;
case packets.types.LOG: case packets.types.LOG:
this.emit('log', `Worker ${this.id}.`);
this.emit('log', packet.text); this.emit('log', packet.text);
break; break;
case packets.types.ERROR: case packets.types.ERROR:
@ -904,6 +808,73 @@ function getCores() {
return Math.max(2, os.cpus().length); return Math.max(2, os.cpus().length);
} }
function bindExit() {
let onSighup, onSigint, onSigterm, onError;
if (!HAS_CP)
return;
if (exitBound)
return;
exitBound = true;
onSighup = () => {
process.exit(1 | 0x80);
};
onSigint = () => {
process.exit(2 | 0x80);
};
onSigterm = () => {
process.exit(15 | 0x80);
};
onError = (err) => {
if (err && err.stack)
console.error(err.stack + '');
else
console.error(err + '');
process.exit(1);
};
process.once('exit', () => {
for (let pool of pools)
pool.destroy();
});
if (process.listenerCount('SIGHUP') === 0)
process.once('SIGHUP', onSighup);
if (process.listenerCount('SIGINT') === 0)
process.once('SIGINT', onSigint);
if (process.listenerCount('SIGTERM') === 0)
process.once('SIGTERM', onSigterm);
if (process.listenerCount('uncaughtException') === 0)
process.once('uncaughtException', onError);
process.on('newListener', (name) => {
switch (name) {
case 'SIGHUP':
process.removeListener(name, onSighup);
break;
case 'SIGINT':
process.removeListener(name, onSigint);
break;
case 'SIGTERM':
process.removeListener(name, onSigterm);
break;
case 'uncaughtException':
process.removeListener(name, onError);
break;
}
});
}
/* /*
* Expose * Expose
*/ */