workers: refactor.

This commit is contained in:
Christopher Jeffrey 2016-10-12 22:28:29 -07:00
parent 469474ae0a
commit 1be63b1bc7
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 85 additions and 84 deletions

View File

@ -17,7 +17,7 @@ proxy.on('error', function(err) {
var index = fs.readFileSync(__dirname + '/index.html');
var indexjs = fs.readFileSync(__dirname + '/index.js');
var bcoin = fs.readFileSync(__dirname + '/bcoin.js');
var worker = fs.readFileSync(__dirname + '/../lib/workers/worker.js');
var worker = fs.readFileSync(__dirname + '/../lib/workers/worker-browser.js');
server.get('/favicon.ico', function(req, res, send, next) {
send(404, '', 'text');

View File

@ -0,0 +1,25 @@
/*!
* worker.js - worker thread/process for bcoin
* Copyright (c) 2014-2015, Fedor Indutny (MIT License)
* Copyright (c) 2014-2016, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
/* jshint worker: true */
var bcoin, env;
self.importScripts('/bcoin.js');
bcoin = self.bcoin;
self.onmessage = function onmessage(event) {
self.onmessage = function() {};
env = JSON.parse(event.data);
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();
};

View File

@ -7,24 +7,8 @@
'use strict';
/* jshint worker: true */
var env = process.env;
var bcoin = require('../env');
var bcoin, env;
if (typeof importScripts !== 'undefined') {
self.importScripts('/bcoin.js');
bcoin = self.bcoin;
self.onmessage = function onmessage(event) {
self.onmessage = function() {};
env = JSON.parse(event.data);
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();
};
} else {
env = process.env;
bcoin = require('../env');
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();
}
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();

View File

@ -7,8 +7,6 @@
'use strict';
module.exports = Workers;
var EventEmitter = require('events').EventEmitter;
var utils = require('../utils/utils');
var co = require('../utils/co');
@ -20,7 +18,7 @@ var Framer = require('./framer');
/**
* A worker pool.
* @exports Workers
* @exports WorkerPool
* @constructor
* @param {Object} options
* @param {Number} [options.size=num-cores] - Max pool size.
@ -31,37 +29,37 @@ var Framer = require('./framer');
* @property {Number} nonce
*/
function Workers(options) {
if (!(this instanceof Workers))
return new Workers(options);
function WorkerPool(options) {
if (!(this instanceof WorkerPool))
return new WorkerPool(options);
EventEmitter.call(this);
if (!options)
options = {};
this.size = Math.max(1, options.size || Workers.CORES);
this.size = Math.max(1, options.size || WorkerPool.CORES);
this.timeout = options.timeout || 60000;
this.children = [];
this.nonce = 0;
this.enabled = true;
}
utils.inherits(Workers, EventEmitter);
utils.inherits(WorkerPool, EventEmitter);
/**
* Number of CPUs/cores available.
* @const {Number}
*/
Workers.CORES = getCores();
WorkerPool.CORES = getCores();
/**
* Global list of workers.
* @type {Array}
*/
Workers.children = [];
WorkerPool.children = [];
/**
* Destroy all workers.
@ -69,29 +67,29 @@ Workers.children = [];
* @private
*/
Workers.cleanup = function cleanup() {
while (Workers.children.length > 0)
Workers.children.pop().destroy();
WorkerPool.cleanup = function cleanup() {
while (WorkerPool.children.length > 0)
WorkerPool.children.pop().destroy();
};
Workers._exitBound = false;
WorkerPool._exitBound = false;
/**
* Bind to process events in order to cleanup listeners.
* @private
*/
Workers._bindExit = function _bindExit() {
WorkerPool._bindExit = function _bindExit() {
if (utils.isBrowser)
return;
if (Workers._exitBound)
if (WorkerPool._exitBound)
return;
Workers._exitBound = true;
WorkerPool._exitBound = true;
function onExit(err) {
Workers.cleanup();
WorkerPool.cleanup();
if (err) {
utils.error(err.stack + '');
@ -103,7 +101,7 @@ Workers._bindExit = function _bindExit() {
}
process.once('exit', function() {
Workers.cleanup();
WorkerPool.cleanup();
});
if (process.listeners('SIGINT').length === 0)
@ -130,7 +128,7 @@ Workers._bindExit = function _bindExit() {
* @returns {Worker}
*/
Workers.prototype.spawn = function spawn(id) {
WorkerPool.prototype.spawn = function spawn(id) {
var self = this;
var child;
@ -162,7 +160,7 @@ Workers.prototype.spawn = function spawn(id) {
* @returns {Worker}
*/
Workers.prototype.alloc = function alloc() {
WorkerPool.prototype.alloc = function alloc() {
var id = this.nonce++ % this.size;
if (!this.children[id])
this.children[id] = this.spawn(id);
@ -176,7 +174,7 @@ Workers.prototype.alloc = function alloc() {
* @returns {Boolean}
*/
Workers.prototype.sendEvent = function sendEvent() {
WorkerPool.prototype.sendEvent = function sendEvent() {
var i, child;
var result = true;
@ -197,7 +195,7 @@ Workers.prototype.sendEvent = function sendEvent() {
* Destroy all workers.
*/
Workers.prototype.destroy = function destroy() {
WorkerPool.prototype.destroy = function destroy() {
var i, child;
for (i = 0; i < this.children.length; i++) {
@ -218,10 +216,10 @@ Workers.prototype.destroy = function destroy() {
* the worker method specifies.
*/
Workers.prototype.execute = function execute(method, args, timeout) {
WorkerPool.prototype.execute = function execute(method, args, timeout) {
var result, child;
if (!this.enabled || !Workers.support) {
if (!this.enabled || !exports.support) {
return new Promise(function(resolve, reject) {
utils.nextTick(function() {
try {
@ -250,7 +248,7 @@ Workers.prototype.execute = function execute(method, args, timeout) {
* @returns {Promise} - Returns Boolean.
*/
Workers.prototype.verify = function verify(tx, flags) {
WorkerPool.prototype.verify = function verify(tx, flags) {
return this.execute('verify', [tx, flags], -1);
};
@ -262,7 +260,7 @@ Workers.prototype.verify = function verify(tx, flags) {
* @returns {Promise} - Returns Boolean.
*/
Workers.prototype.verifyInput = function verifyInput(tx, index, flags) {
WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) {
return this.execute('verifyInput', [tx, index, flags], -1);
};
@ -274,7 +272,7 @@ Workers.prototype.verifyInput = function verifyInput(tx, index, flags) {
* @returns {Promise}
*/
Workers.prototype.sign = co(function* sign(tx, ring, type) {
WorkerPool.prototype.sign = co(function* sign(tx, ring, type) {
var i, result, input, sig, sigs, total;
result = yield this.execute('sign', [tx, ring, type], -1);
@ -301,7 +299,7 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) {
* @returns {Promise}
*/
Workers.prototype.signInput = co(function* signInput(tx, index, key, type) {
WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) {
var sig = yield this.execute('signInput', [tx, index, key, type], -1);
var input = tx.inputs[index];
@ -322,7 +320,7 @@ Workers.prototype.signInput = co(function* signInput(tx, index, key, type) {
* @returns {Promise}
*/
Workers.prototype.ecVerify = function ecVerify(msg, sig, key) {
WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) {
return this.execute('ecVerify', [msg, sig, key], -1);
};
@ -333,7 +331,7 @@ Workers.prototype.ecVerify = function ecVerify(msg, sig, key) {
* @returns {Promise}
*/
Workers.prototype.ecSign = function ecSign(msg, key) {
WorkerPool.prototype.ecSign = function ecSign(msg, key) {
return this.execute('ecSign', [msg, key], -1);
};
@ -346,7 +344,7 @@ Workers.prototype.ecSign = function ecSign(msg, key) {
* @returns {Promise} - Returns {Number}.
*/
Workers.prototype.mine = function mine(data, target, min, max) {
WorkerPool.prototype.mine = function mine(data, target, min, max) {
return this.execute('mine', [data, target, min, max], -1);
};
@ -362,7 +360,7 @@ Workers.prototype.mine = function mine(data, target, min, max) {
* @returns {Buffer}
*/
Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) {
WorkerPool.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) {
return this.execute('scrypt', [passwd, salt, N, r, p, len], -1);
};
@ -375,7 +373,7 @@ Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) {
function Worker(id) {
if (!(this instanceof Worker))
return new Worker();
return new Worker(id);
EventEmitter.call(this);
@ -489,9 +487,9 @@ Worker.prototype._bind = function _bind() {
});
this.on('exit', function(code) {
var i = Workers.children.indexOf(self);
var i = WorkerPool.children.indexOf(self);
if (i !== -1)
Workers.children.splice(i, 1);
WorkerPool.children.splice(i, 1);
});
this.on('log', function(items) {
@ -524,9 +522,9 @@ Worker.prototype._bind = function _bind() {
self.emit('error', err);
});
Workers.children.push(this);
WorkerPool.children.push(this);
Workers._bindExit();
WorkerPool._bindExit();
};
/**
@ -658,10 +656,9 @@ Worker.prototype.execute = function execute(method, args, timeout) {
* Represents the master process.
* @exports Master
* @constructor
* @param {Object?} options
*/
function Master(options) {
function Master() {
if (!(this instanceof Master))
return new Master();
@ -669,7 +666,6 @@ function Master(options) {
this.framer = new Framer();
this.parser = new Parser();
this.options = options || {};
this._init();
}
@ -797,12 +793,11 @@ Master.prototype.log = function log() {
/**
* Listen for messages from master process (only if worker).
* @param {Object?} options
* @returns {Master}
*/
Master.listen = function listen(options) {
var master = new Master(options);
Master.listen = function listen() {
var master = new Master();
utils.log = master.log.bind(master);
utils.error = utils.log;
@ -823,10 +818,11 @@ Master.listen = function listen(options) {
try {
result = jobs[packet.cmd].apply(jobs, packet.items);
} catch (e) {
return master.send(packet.job, 'response', [fromError(e)]);
master.send(packet.job, 'response', [fromError(e)]);
return;
}
return master.send(packet.job, 'response', [null, result]);
master.send(packet.job, 'response', [null, result]);
});
jobs.master = master;
@ -864,30 +860,30 @@ function fromError(err) {
* Default
*/
Workers.pool = new Workers();
Workers.pool.enabled = false;
exports.pool = new WorkerPool();
exports.pool.enabled = false;
Workers.support = true;
exports.support = true;
if (utils.isBrowser) {
Workers.support = typeof global.Worker === 'function'
exports.support = typeof global.Worker === 'function'
|| typeof global.postMessage === 'function';
}
Workers.set = function set(options) {
if (typeof options.useWorkers === 'boolean')
this.pool.enabled = options.useWorkers;
exports.set = function set(options) {
if (typeof options.useWorkerPool === 'boolean')
this.pool.enabled = options.useWorkerPool;
if (utils.isNumber(options.maxWorkers))
this.pool.size = options.maxWorkers;
if (utils.isNumber(options.maxWorkerPool))
this.pool.size = options.maxWorkerPool;
if (utils.isNumber(options.workerTimeout))
this.pool.timeout = options.workerTimeout;
};
Workers.set({
useWorkers: +process.env.BCOIN_USE_WORKERS === 1,
maxWorkers: +process.env.BCOIN_MAX_WORKERS,
exports.set({
useWorkerPool: +process.env.BCOIN_USE_WORKERS === 1,
maxWorkerPool: +process.env.BCOIN_MAX_WORKERS,
workerTimeout: +process.env.BCOIN_WORKER_TIMEOUT
});
@ -895,13 +891,9 @@ Workers.set({
* Expose
*/
exports = Workers;
exports.Workers = Workers;
exports.WorkerPool = WorkerPool;
exports.Worker = Worker;
exports.Master = Master;
exports.Framer = Framer;
exports.Parser = Parser;
exports.listen = Master.listen;
module.exports = exports;