From 27c60ce76e57af1695d78f912227d93194812c88 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 7 Jul 2017 00:34:59 -0700 Subject: [PATCH] workers: refactor for browser vs node. --- lib/workers/child-browser.js | 100 ++++++++++++++ lib/workers/child.js | 187 +++++++++++++++++++++++++ lib/workers/cp-browser.js | 3 - lib/workers/cp.js | 3 - lib/workers/master.js | 80 ++--------- lib/workers/parent-browser.js | 83 +++++++++++ lib/workers/parent.js | 71 ++++++++++ lib/workers/workerpool.js | 252 ++++------------------------------ 8 files changed, 473 insertions(+), 306 deletions(-) create mode 100644 lib/workers/child-browser.js create mode 100644 lib/workers/child.js delete mode 100644 lib/workers/cp-browser.js delete mode 100644 lib/workers/cp.js create mode 100644 lib/workers/parent-browser.js create mode 100644 lib/workers/parent.js diff --git a/lib/workers/child-browser.js b/lib/workers/child-browser.js new file mode 100644 index 00000000..b824662c --- /dev/null +++ b/lib/workers/child-browser.js @@ -0,0 +1,100 @@ +/*! + * child.js - child processes for bcoin + * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const assert = require('assert'); +const EventEmitter = require('events'); +const util = require('../utils/util'); + +/** + * Represents a worker. + * @alias module:workers.Child + * @constructor + * @param {String} file + * @param {Object} env + */ + +function Child(file, env) { + if (!(this instanceof Child)) + return new Child(file, env); + + EventEmitter.call(this); + + this.init(file, env); +} + +util.inherits(Child, EventEmitter); + +/** + * Test whether worker support is available. + * @returns {Boolean} + */ + +Child.hasSupport = function hasSupport() { + return typeof global.postMessage === 'function'; +}; + +/** + * Initialize worker. Bind to events. + * @private + */ + +Child.prototype.init = function init(file, env) { + this.child = new global.Worker(file); + + this.child.onerror = (event) => { + this.emit('error', new Error('Child error.')); + this.emit('exit', 1, null); + }; + + this.child.onmessage = (event) => { + let data; + if (typeof event.data === 'string') { + data = Buffer.from(event.data, 'hex'); + assert(data.length === event.data.length / 2); + } else { + assert(event.data && typeof event.data === 'object'); + assert(event.data.data && typeof event.data.data.length === 'number'); + data = event.data.data; + data.__proto__ = Buffer.prototype; + } + this.emit('data', data); + }; + + this.child.postMessage(JSON.stringify(env)); +}; + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Child.prototype.write = function write(data) { + if (this.child.postMessage.length === 2) { + data.__proto__ = Uint8Array.prototype; + this.child.postMessage({ data }, [data]); + } else { + this.child.postMessage(data.toString('hex')); + } + return true; +}; + +/** + * Destroy the worker. + */ + +Child.prototype.destroy = function destroy() { + this.child.terminate(); + this.emit('exit', 15 | 0x80, 'SIGTERM'); +}; + +/* + * Expose + */ + +module.exports = Child; diff --git a/lib/workers/child.js b/lib/workers/child.js new file mode 100644 index 00000000..a0bd5c57 --- /dev/null +++ b/lib/workers/child.js @@ -0,0 +1,187 @@ +/*! + * child.js - child processes for bcoin + * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const EventEmitter = require('events'); +const path = require('path'); +const cp = require('child_process'); +const util = require('../utils/util'); + +const children = new Set(); +let exitBound = false; + +/** + * Represents a worker. + * @alias module:workers.Child + * @constructor + * @param {String} file + * @param {Object} env + */ + +function Child(file, env) { + if (!(this instanceof Child)) + return new Child(file, env); + + EventEmitter.call(this); + + children.add(this); + bindExit(); + + this.init(file, env); +} + +util.inherits(Child, EventEmitter); + +/** + * Test whether worker support is available. + * @returns {Boolean} + */ + +Child.hasSupport = function hasSupport() { + return true; +}; + +/** + * Initialize worker (node.js). + * @private + */ + +Child.prototype.init = function init(file, env) { + let bin = process.argv[0]; + let filename = path.resolve(__dirname, file); + let environ = Object.assign({}, process.env, env); + let options = { stdio: 'pipe', env: environ }; + + this.child = cp.spawn(bin, [filename], options); + + this.child.unref(); + this.child.stdin.unref(); + this.child.stdout.unref(); + this.child.stderr.unref(); + + this.child.on('error', (err) => { + this.emit('error', err); + }); + + this.child.on('exit', (code, signal) => { + children.delete(this); + this.emit('exit', code == null ? -1 : code, signal); + }); + + this.child.on('close', () => { + children.delete(this); + this.emit('exit', -1, null); + }); + + this.child.stdin.on('error', (err) => { + this.emit('error', err); + }); + + this.child.stdout.on('error', (err) => { + this.emit('error', err); + }); + + this.child.stderr.on('error', (err) => { + this.emit('error', err); + }); + + this.child.stdout.on('data', (data) => { + this.emit('data', data); + }); +}; + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Child.prototype.write = function write(data) { + return this.child.stdin.write(data); +}; + +/** + * Destroy the worker. + */ + +Child.prototype.destroy = function destroy() { + this.child.kill('SIGTERM'); +}; + +/* + * Helpers + */ + +function bindExit() { + let onSighup, onSigint, onSigterm, onError; + + if (exitBound) + return; + + exitBound = true; + + onSighup = () => { + process.exit(1 | 0x80); + }; + + onSigint = () => { + process.exit(2 | 0x80); + }; + + onSigterm = () => { + process.exit(15 | 0x80); + }; + + onError = (err) => { + if (err && err.stack) + console.error(err.stack + ''); + else + console.error(err + ''); + + process.exit(1); + }; + + process.once('exit', () => { + for (let child of children) + child.destroy(); + }); + + if (process.listenerCount('SIGHUP') === 0) + process.once('SIGHUP', onSighup); + + if (process.listenerCount('SIGINT') === 0) + process.once('SIGINT', onSigint); + + if (process.listenerCount('SIGTERM') === 0) + process.once('SIGTERM', onSigterm); + + if (process.listenerCount('uncaughtException') === 0) + process.once('uncaughtException', onError); + + process.on('newListener', (name) => { + switch (name) { + case 'SIGHUP': + process.removeListener(name, onSighup); + break; + case 'SIGINT': + process.removeListener(name, onSigint); + break; + case 'SIGTERM': + process.removeListener(name, onSigterm); + break; + case 'uncaughtException': + process.removeListener(name, onError); + break; + } + }); +} + +/* + * Expose + */ + +module.exports = Child; diff --git a/lib/workers/cp-browser.js b/lib/workers/cp-browser.js deleted file mode 100644 index 21227270..00000000 --- a/lib/workers/cp-browser.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict'; - -exports.unsupported = true; diff --git a/lib/workers/cp.js b/lib/workers/cp.js deleted file mode 100644 index 6f3d6fa2..00000000 --- a/lib/workers/cp.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict'; - -module.exports = require('child_process'); diff --git a/lib/workers/master.js b/lib/workers/master.js index 007d5690..6bd68e4a 100644 --- a/lib/workers/master.js +++ b/lib/workers/master.js @@ -15,8 +15,7 @@ const jobs = require('./jobs'); const Parser = require('./parser'); const Framer = require('./framer'); const packets = require('./packets'); -const HAS_WORKERS = typeof global.postMessage === 'function'; -const HAS_CP = !!(process.stdin && process.stdout && process.stderr); +const Parent = require('./parent'); /** * Represents the master process. @@ -30,13 +29,14 @@ function Master() { EventEmitter.call(this); + this.parent = new Parent(); this.framer = new Framer(); this.parser = new Parser(); this.env = {}; this.listening = false; this.color = false; - this._init(); + this.init(); } util.inherits(Master, EventEmitter); @@ -46,8 +46,8 @@ util.inherits(Master, EventEmitter); * @private */ -Master.prototype._init = function _init() { - this.on('data', (data) => { +Master.prototype.init = function init() { + this.parent.on('data', (data) => { this.parser.feed(data); }); @@ -59,60 +59,9 @@ Master.prototype._init = function _init() { this.emit('packet', packet); }); - if (HAS_WORKERS) { - // Web workers - this._initWebWorkers(); - } else if (HAS_CP) { - // Child process + pipes - this._initChildProcess(); - } else { - throw new Error('Workers not available.'); - } -}; - -/** - * Initialize master (web workers). - * @private - */ - -Master.prototype._initWebWorkers = function _initWebWorkers() { - global.onerror = (event) => { - this.emit('error', new Error('Master error.')); - }; - - global.onmessage = (event) => { - let data; - if (typeof event.data === 'string') { - data = Buffer.from(event.data, 'hex'); - assert(data.length === event.data.length / 2); - } else { - assert(event.data && typeof event.data === 'object'); - assert(event.data.data && typeof event.data.data.length === 'number'); - data = event.data.data; - data.__proto__ = Buffer.prototype; - } - this.emit('data', data); - }; -}; - -/** - * Initialize master (node.js). - * @private - */ - -Master.prototype._initChildProcess = function _initChildProcess() { - process.stdin.on('data', (data) => { - this.emit('data', data); - }); - - // Nowhere to send these errors: - process.stdin.on('error', () => {}); - process.stdout.on('error', () => {}); - process.stderr.on('error', () => {}); - - process.on('uncaughtException', (err) => { + this.parent.on('exception', (err) => { this.send(new packets.ErrorPacket(err)); - setTimeout(() => process.exit(1), 1000); + setTimeout(() => this.destroy(), 1000); }); }; @@ -132,16 +81,7 @@ Master.prototype.set = function set(network) { */ Master.prototype.write = function write(data) { - if (HAS_WORKERS) { - if (global.postMessage.length === 2) { - data.__proto__ = Uint8Array.prototype; - global.postMessage({ data }, [data]); - } else { - global.postMessage(data.toString('hex')); - } - return true; - } - return process.stdout.write(data); + return this.parent.write(data); }; /** @@ -170,9 +110,7 @@ Master.prototype.sendEvent = function sendEvent(...items) { */ Master.prototype.destroy = function destroy() { - if (HAS_WORKERS) - return global.close(); - return process.exit(0); + return this.parent.destroy(); }; /** diff --git a/lib/workers/parent-browser.js b/lib/workers/parent-browser.js new file mode 100644 index 00000000..32d1613e --- /dev/null +++ b/lib/workers/parent-browser.js @@ -0,0 +1,83 @@ +/*! + * parent.js - worker processes for bcoin + * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const assert = require('assert'); +const EventEmitter = require('events'); +const util = require('../utils/util'); + +/** + * Represents the parent process. + * @alias module:workers.Parent + * @constructor + */ + +function Parent() { + if (!(this instanceof Parent)) + return new Parent(); + + EventEmitter.call(this); + + this.init(); +} + +util.inherits(Parent, EventEmitter); + +/** + * Initialize master (web workers). + * @private + */ + +Parent.prototype.init = function init() { + global.onerror = (event) => { + this.emit('error', new Error('Worker error.')); + }; + + global.onmessage = (event) => { + let data; + if (typeof event.data === 'string') { + data = Buffer.from(event.data, 'hex'); + assert(data.length === event.data.length / 2); + } else { + assert(event.data && typeof event.data === 'object'); + assert(event.data.data && typeof event.data.data.length === 'number'); + data = event.data.data; + data.__proto__ = Buffer.prototype; + } + this.emit('data', data); + }; +}; + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Parent.prototype.write = function write(data) { + if (global.postMessage.length === 2) { + data.__proto__ = Uint8Array.prototype; + global.postMessage({ data }, [data]); + } else { + global.postMessage(data.toString('hex')); + } + return true; +}; + +/** + * Destroy the worker. + */ + +Parent.prototype.destroy = function destroy() { + global.close(); +}; + +/* + * Expose + */ + +module.exports = Parent; diff --git a/lib/workers/parent.js b/lib/workers/parent.js new file mode 100644 index 00000000..1a0f5886 --- /dev/null +++ b/lib/workers/parent.js @@ -0,0 +1,71 @@ +/*! + * parent.js - worker processes for bcoin + * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const EventEmitter = require('events'); +const util = require('../utils/util'); + +/** + * Represents the parent process. + * @alias module:workers.Parent + * @constructor + */ + +function Parent() { + if (!(this instanceof Parent)) + return new Parent(); + + EventEmitter.call(this); + + this.init(); +} + +util.inherits(Parent, EventEmitter); + +/** + * Initialize master (node.js). + * @private + */ + +Parent.prototype.init = function init() { + process.stdin.on('data', (data) => { + this.emit('data', data); + }); + + // Nowhere to send these errors: + process.stdin.on('error', () => {}); + process.stdout.on('error', () => {}); + process.stderr.on('error', () => {}); + + process.on('uncaughtException', (err) => { + this.emit('exception', err); + }); +}; + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Parent.prototype.write = function write(data) { + return process.stdout.write(data); +}; + +/** + * Destroy the worker. + */ + +Parent.prototype.destroy = function destroy() { + return process.exit(0); +}; + +/* + * Expose + */ + +module.exports = Parent; diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index 0306f7e3..3b5888c1 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -10,23 +10,15 @@ const assert = require('assert'); const EventEmitter = require('events'); const os = require('os'); -const path = require('path'); -const cp = require('./cp'); const util = require('../utils/util'); const co = require('../utils/co'); const Network = require('../protocol/network'); +const Child = require('./child'); const jobs = require('./jobs'); const Parser = require('./parser'); const Framer = require('./framer'); const packets = require('./packets'); -const HAS_WORKERS = typeof global.Worker === 'function'; -const HAS_CP = typeof cp.spawn === 'function'; -const HAS_SUPPORT = HAS_WORKERS || HAS_CP; - -const pools = new Set(); -let exitBound = false; - /** * A worker pool. * @alias module:workers.WorkerPool @@ -46,36 +38,18 @@ function WorkerPool(options) { EventEmitter.call(this); - this.size = WorkerPool.CORES; + this.size = getCores(); this.timeout = 60000; this.children = new Map(); this.nonce = 0; this.enabled = true; - - this.file = WorkerPool.WORKER_FILE; - - bindExit(); - pools.add(this); + this.file = process.env.BCOIN_WORKER_FILE || 'worker.js'; this.set(options); } util.inherits(WorkerPool, EventEmitter); -/** - * Number of CPUs/cores available. - * @type {Number} - */ - -WorkerPool.CORES = getCores(); - -/** - * Default worker file. - * @const {String} - */ - -WorkerPool.WORKER_FILE = process.env.BCOIN_WORKER_FILE || 'worker.js'; - /** * Set worker pool options. * @param {Object} options @@ -108,29 +82,13 @@ WorkerPool.prototype.set = function set(options) { } }; -/** - * Enable the worker pool. - */ - -WorkerPool.prototype.enable = function enable() { - this.enabled = true; -}; - -/** - * Disable the worker pool. - */ - -WorkerPool.prototype.disable = function disable() { - this.enabled = true; -}; - /** * Open worker pool. * @returns {Promise} */ WorkerPool.prototype.open = async function open() { - pools.add(this); + ; }; /** @@ -139,7 +97,6 @@ WorkerPool.prototype.open = async function open() { */ WorkerPool.prototype.close = async function close() { - pools.delete(this); this.destroy(); }; @@ -230,7 +187,7 @@ WorkerPool.prototype.destroy = function destroy() { WorkerPool.prototype.execute = function execute(packet, timeout) { let result, child; - if (!this.enabled || !HAS_SUPPORT) { + if (!this.enabled || !Child.hasSupport()) { return new Promise((resolve, reject) => { setImmediate(() => { try { @@ -401,22 +358,19 @@ function Worker(file) { EventEmitter.call(this); - this.file = file; this.id = -1; - this.framer = new Framer(); this.parser = new Parser(); this.pending = new Map(); - this.child = null; - this.env = { + this.child = new Child(file, { BCOIN_WORKER_NETWORK: Network.type, BCOIN_WORKER_ISTTY: process.stdout ? (process.stdout.isTTY ? '1' : '0') : '0' - }; + }); - this._init(); + this.init(); } util.inherits(Worker, EventEmitter); @@ -426,11 +380,19 @@ util.inherits(Worker, EventEmitter); * @private */ -Worker.prototype._init = function _init() { - this.on('data', (data) => { +Worker.prototype.init = function init() { + this.child.on('data', (data) => { this.parser.feed(data); }); + this.child.on('exit', (code, signal) => { + this.emit('exit', code, signal); + }); + + this.child.on('error', (err) => { + this.emit('error', err); + }); + this.parser.on('error', (err) => { this.emit('error', err); }); @@ -439,94 +401,7 @@ Worker.prototype._init = function _init() { this.emit('packet', packet); }); - if (HAS_WORKERS) { - // Web workers - this._initWebWorkers(); - } else if (HAS_CP) { - // Child process + pipes - this._initChildProcess(); - } else { - throw new Error('Workers not available.'); - } - - this._listen(); -}; - -/** - * Initialize worker (web workers). - * @private - */ - -Worker.prototype._initWebWorkers = function _initWebWorkers() { - this.child = new global.Worker(this.file); - - this.child.onerror = (event) => { - this.emit('error', new Error('Worker error.')); - this.emit('exit', -1, null); - }; - - this.child.onmessage = (event) => { - let data; - if (typeof event.data === 'string') { - data = Buffer.from(event.data, 'hex'); - assert(data.length === event.data.length / 2); - } else { - assert(event.data && typeof event.data === 'object'); - assert(event.data.data && typeof event.data.data.length === 'number'); - data = event.data.data; - data.__proto__ = Buffer.prototype; - } - this.emit('data', data); - }; - - this.child.postMessage(JSON.stringify(this.env)); -}; - -/** - * Initialize worker (node.js). - * @private - */ - -Worker.prototype._initChildProcess = function _initChildProcess() { - let bin = process.argv[0]; - let file = path.resolve(__dirname, this.file); - let env = Object.assign({}, process.env, this.env); - let options = { stdio: 'pipe', env: env }; - - this.child = cp.spawn(bin, [file], options); - - this.child.unref(); - this.child.stdin.unref(); - this.child.stdout.unref(); - this.child.stderr.unref(); - - this.child.on('error', (err) => { - this.emit('error', err); - }); - - this.child.on('exit', (code, signal) => { - this.emit('exit', code == null ? -1 : code, signal); - }); - - this.child.on('close', () => { - this.emit('exit', -1, null); - }); - - this.child.stdin.on('error', (err) => { - this.emit('error', err); - }); - - this.child.stdout.on('error', (err) => { - this.emit('error', err); - }); - - this.child.stderr.on('error', (err) => { - this.emit('error', err); - }); - - this.child.stdout.on('data', (data) => { - this.emit('data', data); - }); + this.listen(); }; /** @@ -534,8 +409,8 @@ Worker.prototype._initChildProcess = function _initChildProcess() { * @private */ -Worker.prototype._listen = function _listen() { - this.on('exit', () => { +Worker.prototype.listen = function listen() { + this.on('exit', (code, signal) => { this.killJobs(); }); @@ -586,16 +461,7 @@ Worker.prototype.handlePacket = function handlePacket(packet) { */ Worker.prototype.write = function write(data) { - if (HAS_WORKERS) { - if (this.child.postMessage.length === 2) { - data.__proto__ = Uint8Array.prototype; - this.child.postMessage({ data }, [data]); - } else { - this.child.postMessage(data.toString('hex')); - } - return true; - } - return this.child.stdin.write(data); + return this.child.write(data); }; /** @@ -624,12 +490,7 @@ Worker.prototype.sendEvent = function sendEvent(...items) { */ Worker.prototype.destroy = function destroy() { - if (HAS_WORKERS) { - this.child.terminate(); - this.emit('exit', -1, 'SIGTERM'); - return; - } - return this.child.kill('SIGTERM'); + return this.child.destroy(); }; /** @@ -796,73 +657,6 @@ function getCores() { return Math.max(2, os.cpus().length); } -function bindExit() { - let onSighup, onSigint, onSigterm, onError; - - if (!HAS_CP) - return; - - if (exitBound) - return; - - exitBound = true; - - onSighup = () => { - process.exit(1 | 0x80); - }; - - onSigint = () => { - process.exit(2 | 0x80); - }; - - onSigterm = () => { - process.exit(15 | 0x80); - }; - - onError = (err) => { - if (err && err.stack) - console.error(err.stack + ''); - else - console.error(err + ''); - - process.exit(1); - }; - - process.once('exit', () => { - for (let pool of pools) - pool.destroy(); - }); - - if (process.listenerCount('SIGHUP') === 0) - process.once('SIGHUP', onSighup); - - if (process.listenerCount('SIGINT') === 0) - process.once('SIGINT', onSigint); - - if (process.listenerCount('SIGTERM') === 0) - process.once('SIGTERM', onSigterm); - - if (process.listenerCount('uncaughtException') === 0) - process.once('uncaughtException', onError); - - process.on('newListener', (name) => { - switch (name) { - case 'SIGHUP': - process.removeListener(name, onSighup); - break; - case 'SIGINT': - process.removeListener(name, onSigint); - break; - case 'SIGTERM': - process.removeListener(name, onSigterm); - break; - case 'uncaughtException': - process.removeListener(name, onError); - break; - } - }); -} - /* * Expose */