From 1be63b1bc7324ea2ad12b144098f838ce42857a7 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 12 Oct 2016 22:28:29 -0700 Subject: [PATCH] workers: refactor. --- browser/server.js | 2 +- lib/workers/worker-browser.js | 25 +++++++ lib/workers/worker.js | 24 ++----- lib/workers/workers.js | 118 ++++++++++++++++------------------ 4 files changed, 85 insertions(+), 84 deletions(-) create mode 100644 lib/workers/worker-browser.js diff --git a/browser/server.js b/browser/server.js index ab271791..2e786213 100644 --- a/browser/server.js +++ b/browser/server.js @@ -17,7 +17,7 @@ proxy.on('error', function(err) { var index = fs.readFileSync(__dirname + '/index.html'); var indexjs = fs.readFileSync(__dirname + '/index.js'); var bcoin = fs.readFileSync(__dirname + '/bcoin.js'); -var worker = fs.readFileSync(__dirname + '/../lib/workers/worker.js'); +var worker = fs.readFileSync(__dirname + '/../lib/workers/worker-browser.js'); server.get('/favicon.ico', function(req, res, send, next) { send(404, '', 'text'); diff --git a/lib/workers/worker-browser.js b/lib/workers/worker-browser.js new file mode 100644 index 00000000..9d930b4a --- /dev/null +++ b/lib/workers/worker-browser.js @@ -0,0 +1,25 @@ +/*! + * worker.js - worker thread/process for bcoin + * Copyright (c) 2014-2015, Fedor Indutny (MIT License) + * Copyright (c) 2014-2016, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +/* jshint worker: true */ + +var bcoin, env; + +self.importScripts('/bcoin.js'); + +bcoin = self.bcoin; + +self.onmessage = function onmessage(event) { + self.onmessage = function() {}; + + env = JSON.parse(event.data); + + bcoin.set(env.BCOIN_WORKER_NETWORK); + bcoin.workers.listen(); +}; diff --git a/lib/workers/worker.js b/lib/workers/worker.js index 48041414..68624d63 100644 --- a/lib/workers/worker.js +++ b/lib/workers/worker.js @@ -7,24 +7,8 @@ 'use strict'; -/* jshint worker: true */ +var env = process.env; +var bcoin = require('../env'); -var bcoin, env; - -if (typeof importScripts !== 'undefined') { - self.importScripts('/bcoin.js'); - bcoin = self.bcoin; - self.onmessage = function onmessage(event) { - self.onmessage = function() {}; - - env = JSON.parse(event.data); - - bcoin.set(env.BCOIN_WORKER_NETWORK); - bcoin.workers.listen(); - }; -} else { - env = process.env; - bcoin = require('../env'); - bcoin.set(env.BCOIN_WORKER_NETWORK); - bcoin.workers.listen(); -} +bcoin.set(env.BCOIN_WORKER_NETWORK); +bcoin.workers.listen(); diff --git a/lib/workers/workers.js b/lib/workers/workers.js index c9282425..86744667 100644 --- a/lib/workers/workers.js +++ b/lib/workers/workers.js @@ -7,8 +7,6 @@ 'use strict'; -module.exports = Workers; - var EventEmitter = require('events').EventEmitter; var utils = require('../utils/utils'); var co = require('../utils/co'); @@ -20,7 +18,7 @@ var Framer = require('./framer'); /** * A worker pool. - * @exports Workers + * @exports WorkerPool * @constructor * @param {Object} options * @param {Number} [options.size=num-cores] - Max pool size. @@ -31,37 +29,37 @@ var Framer = require('./framer'); * @property {Number} nonce */ -function Workers(options) { - if (!(this instanceof Workers)) - return new Workers(options); +function WorkerPool(options) { + if (!(this instanceof WorkerPool)) + return new WorkerPool(options); EventEmitter.call(this); if (!options) options = {}; - this.size = Math.max(1, options.size || Workers.CORES); + this.size = Math.max(1, options.size || WorkerPool.CORES); this.timeout = options.timeout || 60000; this.children = []; this.nonce = 0; this.enabled = true; } -utils.inherits(Workers, EventEmitter); +utils.inherits(WorkerPool, EventEmitter); /** * Number of CPUs/cores available. * @const {Number} */ -Workers.CORES = getCores(); +WorkerPool.CORES = getCores(); /** * Global list of workers. * @type {Array} */ -Workers.children = []; +WorkerPool.children = []; /** * Destroy all workers. @@ -69,29 +67,29 @@ Workers.children = []; * @private */ -Workers.cleanup = function cleanup() { - while (Workers.children.length > 0) - Workers.children.pop().destroy(); +WorkerPool.cleanup = function cleanup() { + while (WorkerPool.children.length > 0) + WorkerPool.children.pop().destroy(); }; -Workers._exitBound = false; +WorkerPool._exitBound = false; /** * Bind to process events in order to cleanup listeners. * @private */ -Workers._bindExit = function _bindExit() { +WorkerPool._bindExit = function _bindExit() { if (utils.isBrowser) return; - if (Workers._exitBound) + if (WorkerPool._exitBound) return; - Workers._exitBound = true; + WorkerPool._exitBound = true; function onExit(err) { - Workers.cleanup(); + WorkerPool.cleanup(); if (err) { utils.error(err.stack + ''); @@ -103,7 +101,7 @@ Workers._bindExit = function _bindExit() { } process.once('exit', function() { - Workers.cleanup(); + WorkerPool.cleanup(); }); if (process.listeners('SIGINT').length === 0) @@ -130,7 +128,7 @@ Workers._bindExit = function _bindExit() { * @returns {Worker} */ -Workers.prototype.spawn = function spawn(id) { +WorkerPool.prototype.spawn = function spawn(id) { var self = this; var child; @@ -162,7 +160,7 @@ Workers.prototype.spawn = function spawn(id) { * @returns {Worker} */ -Workers.prototype.alloc = function alloc() { +WorkerPool.prototype.alloc = function alloc() { var id = this.nonce++ % this.size; if (!this.children[id]) this.children[id] = this.spawn(id); @@ -176,7 +174,7 @@ Workers.prototype.alloc = function alloc() { * @returns {Boolean} */ -Workers.prototype.sendEvent = function sendEvent() { +WorkerPool.prototype.sendEvent = function sendEvent() { var i, child; var result = true; @@ -197,7 +195,7 @@ Workers.prototype.sendEvent = function sendEvent() { * Destroy all workers. */ -Workers.prototype.destroy = function destroy() { +WorkerPool.prototype.destroy = function destroy() { var i, child; for (i = 0; i < this.children.length; i++) { @@ -218,10 +216,10 @@ Workers.prototype.destroy = function destroy() { * the worker method specifies. */ -Workers.prototype.execute = function execute(method, args, timeout) { +WorkerPool.prototype.execute = function execute(method, args, timeout) { var result, child; - if (!this.enabled || !Workers.support) { + if (!this.enabled || !exports.support) { return new Promise(function(resolve, reject) { utils.nextTick(function() { try { @@ -250,7 +248,7 @@ Workers.prototype.execute = function execute(method, args, timeout) { * @returns {Promise} - Returns Boolean. */ -Workers.prototype.verify = function verify(tx, flags) { +WorkerPool.prototype.verify = function verify(tx, flags) { return this.execute('verify', [tx, flags], -1); }; @@ -262,7 +260,7 @@ Workers.prototype.verify = function verify(tx, flags) { * @returns {Promise} - Returns Boolean. */ -Workers.prototype.verifyInput = function verifyInput(tx, index, flags) { +WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) { return this.execute('verifyInput', [tx, index, flags], -1); }; @@ -274,7 +272,7 @@ Workers.prototype.verifyInput = function verifyInput(tx, index, flags) { * @returns {Promise} */ -Workers.prototype.sign = co(function* sign(tx, ring, type) { +WorkerPool.prototype.sign = co(function* sign(tx, ring, type) { var i, result, input, sig, sigs, total; result = yield this.execute('sign', [tx, ring, type], -1); @@ -301,7 +299,7 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) { * @returns {Promise} */ -Workers.prototype.signInput = co(function* signInput(tx, index, key, type) { +WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) { var sig = yield this.execute('signInput', [tx, index, key, type], -1); var input = tx.inputs[index]; @@ -322,7 +320,7 @@ Workers.prototype.signInput = co(function* signInput(tx, index, key, type) { * @returns {Promise} */ -Workers.prototype.ecVerify = function ecVerify(msg, sig, key) { +WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) { return this.execute('ecVerify', [msg, sig, key], -1); }; @@ -333,7 +331,7 @@ Workers.prototype.ecVerify = function ecVerify(msg, sig, key) { * @returns {Promise} */ -Workers.prototype.ecSign = function ecSign(msg, key) { +WorkerPool.prototype.ecSign = function ecSign(msg, key) { return this.execute('ecSign', [msg, key], -1); }; @@ -346,7 +344,7 @@ Workers.prototype.ecSign = function ecSign(msg, key) { * @returns {Promise} - Returns {Number}. */ -Workers.prototype.mine = function mine(data, target, min, max) { +WorkerPool.prototype.mine = function mine(data, target, min, max) { return this.execute('mine', [data, target, min, max], -1); }; @@ -362,7 +360,7 @@ Workers.prototype.mine = function mine(data, target, min, max) { * @returns {Buffer} */ -Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { +WorkerPool.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { return this.execute('scrypt', [passwd, salt, N, r, p, len], -1); }; @@ -375,7 +373,7 @@ Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { function Worker(id) { if (!(this instanceof Worker)) - return new Worker(); + return new Worker(id); EventEmitter.call(this); @@ -489,9 +487,9 @@ Worker.prototype._bind = function _bind() { }); this.on('exit', function(code) { - var i = Workers.children.indexOf(self); + var i = WorkerPool.children.indexOf(self); if (i !== -1) - Workers.children.splice(i, 1); + WorkerPool.children.splice(i, 1); }); this.on('log', function(items) { @@ -524,9 +522,9 @@ Worker.prototype._bind = function _bind() { self.emit('error', err); }); - Workers.children.push(this); + WorkerPool.children.push(this); - Workers._bindExit(); + WorkerPool._bindExit(); }; /** @@ -658,10 +656,9 @@ Worker.prototype.execute = function execute(method, args, timeout) { * Represents the master process. * @exports Master * @constructor - * @param {Object?} options */ -function Master(options) { +function Master() { if (!(this instanceof Master)) return new Master(); @@ -669,7 +666,6 @@ function Master(options) { this.framer = new Framer(); this.parser = new Parser(); - this.options = options || {}; this._init(); } @@ -797,12 +793,11 @@ Master.prototype.log = function log() { /** * Listen for messages from master process (only if worker). - * @param {Object?} options * @returns {Master} */ -Master.listen = function listen(options) { - var master = new Master(options); +Master.listen = function listen() { + var master = new Master(); utils.log = master.log.bind(master); utils.error = utils.log; @@ -823,10 +818,11 @@ Master.listen = function listen(options) { try { result = jobs[packet.cmd].apply(jobs, packet.items); } catch (e) { - return master.send(packet.job, 'response', [fromError(e)]); + master.send(packet.job, 'response', [fromError(e)]); + return; } - return master.send(packet.job, 'response', [null, result]); + master.send(packet.job, 'response', [null, result]); }); jobs.master = master; @@ -864,30 +860,30 @@ function fromError(err) { * Default */ -Workers.pool = new Workers(); -Workers.pool.enabled = false; +exports.pool = new WorkerPool(); +exports.pool.enabled = false; -Workers.support = true; +exports.support = true; if (utils.isBrowser) { - Workers.support = typeof global.Worker === 'function' + exports.support = typeof global.Worker === 'function' || typeof global.postMessage === 'function'; } -Workers.set = function set(options) { - if (typeof options.useWorkers === 'boolean') - this.pool.enabled = options.useWorkers; +exports.set = function set(options) { + if (typeof options.useWorkerPool === 'boolean') + this.pool.enabled = options.useWorkerPool; - if (utils.isNumber(options.maxWorkers)) - this.pool.size = options.maxWorkers; + if (utils.isNumber(options.maxWorkerPool)) + this.pool.size = options.maxWorkerPool; if (utils.isNumber(options.workerTimeout)) this.pool.timeout = options.workerTimeout; }; -Workers.set({ - useWorkers: +process.env.BCOIN_USE_WORKERS === 1, - maxWorkers: +process.env.BCOIN_MAX_WORKERS, +exports.set({ + useWorkerPool: +process.env.BCOIN_USE_WORKERS === 1, + maxWorkerPool: +process.env.BCOIN_MAX_WORKERS, workerTimeout: +process.env.BCOIN_WORKER_TIMEOUT }); @@ -895,13 +891,9 @@ Workers.set({ * Expose */ -exports = Workers; - -exports.Workers = Workers; +exports.WorkerPool = WorkerPool; exports.Worker = Worker; exports.Master = Master; exports.Framer = Framer; exports.Parser = Parser; exports.listen = Master.listen; - -module.exports = exports;