From 8f295a376dc3dffa44e80f7038288ac44d7b4cdd Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 5 Jul 2017 13:49:45 -0700 Subject: [PATCH] workers: refactor exiting. --- lib/node/node.js | 7 +- lib/wallet/server.js | 5 +- lib/workers/workerpool.js | 247 +++++++++++++++++--------------------- 3 files changed, 117 insertions(+), 142 deletions(-) diff --git a/lib/node/node.js b/lib/node/node.js index a12bd6b6..16ec7d6d 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -111,8 +111,9 @@ Node.prototype.init = function init() { this.logger.warning('Worker %d exited: %s.', child.id, code); }); - this.workers.on('log', (log) => { - this.logger.debug(log); + this.workers.on('log', (text, child) => { + this.logger.debug(`Worker ${child.id} says:`); + this.logger.debug(text); }); this.workers.on('error', (err, child) => { @@ -155,6 +156,7 @@ Node.prototype.location = function location(name) { Node.prototype.handlePreopen = async function handlePreopen() { await this.logger.open(); + await this.workers.open(); this.bind(this.network.time, 'offset', (offset) => { this.logger.info('Time offset: %d (%d minutes).', offset, offset / 60 | 0); @@ -217,6 +219,7 @@ Node.prototype.handleClose = async function handleClose() { this.bound.length = 0; this.startTime = -1; + await this.workers.close(); await this.logger.close(); }; diff --git a/lib/wallet/server.js b/lib/wallet/server.js index 4e13da2f..4f752f8a 100644 --- a/lib/wallet/server.js +++ b/lib/wallet/server.js @@ -67,8 +67,9 @@ server.create = function create(options) { logger.warning('Worker %d exited: %s.', child.id, code); }); - workers.on('log', (log) => { - logger.debug(log); + workers.on('log', (text, child) => { + logger.debug(`Worker ${child.id} says:`); + logger.debug(text); }); workers.on('error', (err, child) => { diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index 22373746..b732e3e8 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -22,6 +22,10 @@ const packets = require('./packets'); const HAS_WORKERS = typeof global.Worker === 'function'; const HAS_CP = typeof cp.spawn === 'function'; +const HAS_SUPPORT = HAS_WORKERS || HAS_CP; + +const pools = new Set(); +let exitBound = false; /** * A worker pool. @@ -32,7 +36,7 @@ const HAS_CP = typeof cp.spawn === 'function'; * @param {Number} [options.timeout=10000] - Execution timeout. * @property {Number} size * @property {Number} timeout - * @property {Object} children + * @property {Map} children * @property {Number} nonce */ @@ -44,22 +48,18 @@ function WorkerPool(options) { this.size = WorkerPool.CORES; this.timeout = 60000; - this.children = []; + this.children = new Map(); this.nonce = 0; this.enabled = true; + bindExit(); + pools.add(this); + this.set(options); } util.inherits(WorkerPool, EventEmitter); -/** - * Whether workers are supported. - * @type {Boolean} - */ - -WorkerPool.support = HAS_WORKERS || HAS_CP; - /** * Number of CPUs/cores available. * @type {Number} @@ -88,101 +88,6 @@ WorkerPool.WORKER_URL = '/bcoin-worker.js'; WorkerPool.MASTER_URL = '/bcoin-master.js'; -/** - * Global list of workers. - * @type {Array} - */ - -WorkerPool.children = []; - -/** - * Destroy all workers. - * Used for cleaning up workers on exit. - * @private - */ - -WorkerPool.cleanup = function cleanup() { - while (WorkerPool.children.length > 0) - WorkerPool.children.pop().destroy(); -}; - -/** - * Whether exit events have been bound globally. - * @private - * @type {Boolean} - */ - -WorkerPool.bound = false; - -/** - * Bind to process events in - * order to cleanup listeners. - * @private - */ - -WorkerPool.bindExit = function bindExit() { - let onSighup, onSigint, onSigterm, onError; - - if (!HAS_CP) - return; - - if (WorkerPool.bound) - return; - - WorkerPool.bound = true; - - onSighup = () => { - process.exit(1 | 0x80); - }; - - onSigint = () => { - process.exit(2 | 0x80); - }; - - onSigterm = () => { - process.exit(15 | 0x80); - }; - - onError = (err) => { - if (err && err.stack) - util.error(err.stack + ''); - process.exit(1); - }; - - process.once('exit', () => { - WorkerPool.cleanup(); - }); - - if (process.listenerCount('SIGHUP') === 0) - process.once('SIGHUP', onSighup); - - if (process.listenerCount('SIGINT') === 0) - process.once('SIGINT', onSigint); - - if (process.listenerCount('SIGTERM') === 0) - process.once('SIGTERM', onSigterm); - - if (process.listenerCount('uncaughtException') === 0) - process.once('uncaughtException', onError); - - process.on('newListener', (name) => { - switch (name) { - case 'SIGHUP': - process.removeListener(name, onSighup); - break; - case 'SIGINT': - process.removeListener(name, onSigint); - break; - case 'SIGTERM': - process.removeListener(name, onSigterm); - break; - case 'uncaughtException': - process.removeListener(name, onError); - break; - } - }); -}; - /** * Set worker pool options. * @param {Object} options @@ -226,6 +131,25 @@ WorkerPool.prototype.disable = function disable() { this.enabled = true; }; +/** + * Open worker pool. + * @returns {Promise} + */ + +WorkerPool.prototype.open = async function open() { + pools.add(this); +}; + +/** + * Close worker pool. + * @returns {Promise} + */ + +WorkerPool.prototype.close = async function close() { + pools.delete(this); + this.destroy(); +}; + /** * Spawn a new worker. * @param {Number} id - Worker ID. @@ -241,8 +165,8 @@ WorkerPool.prototype.spawn = function spawn(id) { child.on('exit', (code) => { this.emit('exit', code, child); - if (this.children[child.id] === child) - this.children[child.id] = null; + if (this.children.get(child.id) === child) + this.children.delete(child.id); }); child.on('event', (items) => { @@ -250,6 +174,10 @@ WorkerPool.prototype.spawn = function spawn(id) { this.emit.apply(this, items); }); + child.on('log', (text) => { + this.emit('log', text, child); + }); + this.emit('spawn', child); return child; @@ -263,9 +191,11 @@ WorkerPool.prototype.spawn = function spawn(id) { WorkerPool.prototype.alloc = function alloc() { let id = this.nonce++ % this.size; - if (!this.children[id]) - this.children[id] = this.spawn(id); - return this.children[id]; + + if (!this.children.has(id)) + this.children.set(id, this.spawn(id)); + + return this.children.get(id); }; /** @@ -278,10 +208,7 @@ WorkerPool.prototype.alloc = function alloc() { WorkerPool.prototype.sendEvent = function sendEvent() { let result = true; - for (let child of this.children) { - if (!child) - continue; - + for (let child of this.children.values()) { if (!child.sendEvent.apply(child, arguments)) result = false; } @@ -294,12 +221,8 @@ WorkerPool.prototype.sendEvent = function sendEvent() { */ WorkerPool.prototype.destroy = function destroy() { - for (let child of this.children) { - if (!child) - continue; - + for (let child of this.children.values()) child.destroy(); - } }; /** @@ -312,7 +235,7 @@ WorkerPool.prototype.destroy = function destroy() { WorkerPool.prototype.execute = function execute(packet, timeout) { let result, child; - if (!this.enabled || !WorkerPool.support) { + if (!this.enabled || !HAS_SUPPORT) { return new Promise((resolve, reject) => { setImmediate(() => { try { @@ -531,7 +454,6 @@ Worker.prototype._init = function _init() { throw new Error('Workers not available.'); } - this._bind(); this._listen(); }; @@ -619,23 +541,6 @@ Worker.prototype._initChildProcess = function _initChildProcess() { }); }; -/** - * Bind to exit listener. - * @private - */ - -Worker.prototype._bind = function _bind() { - this.on('exit', (code) => { - let i = WorkerPool.children.indexOf(this); - if (i !== -1) - WorkerPool.children.splice(i, 1); - }); - - WorkerPool.children.push(this); - - WorkerPool.bindExit(); -}; - /** * Listen for packets. * @private @@ -672,7 +577,6 @@ Worker.prototype.handlePacket = function handlePacket(packet) { this.emit('event', packet.items); break; case packets.types.LOG: - this.emit('log', `Worker ${this.id}.`); this.emit('log', packet.text); break; case packets.types.ERROR: @@ -904,6 +808,73 @@ function getCores() { return Math.max(2, os.cpus().length); } +function bindExit() { + let onSighup, onSigint, onSigterm, onError; + + if (!HAS_CP) + return; + + if (exitBound) + return; + + exitBound = true; + + onSighup = () => { + process.exit(1 | 0x80); + }; + + onSigint = () => { + process.exit(2 | 0x80); + }; + + onSigterm = () => { + process.exit(15 | 0x80); + }; + + onError = (err) => { + if (err && err.stack) + console.error(err.stack + ''); + else + console.error(err + ''); + + process.exit(1); + }; + + process.once('exit', () => { + for (let pool of pools) + pool.destroy(); + }); + + if (process.listenerCount('SIGHUP') === 0) + process.once('SIGHUP', onSighup); + + if (process.listenerCount('SIGINT') === 0) + process.once('SIGINT', onSigint); + + if (process.listenerCount('SIGTERM') === 0) + process.once('SIGTERM', onSigterm); + + if (process.listenerCount('uncaughtException') === 0) + process.once('uncaughtException', onError); + + process.on('newListener', (name) => { + switch (name) { + case 'SIGHUP': + process.removeListener(name, onSighup); + break; + case 'SIGINT': + process.removeListener(name, onSigint); + break; + case 'SIGTERM': + process.removeListener(name, onSigterm); + break; + case 'uncaughtException': + process.removeListener(name, onError); + break; + } + }); +} + /* * Expose */