workers: refactor for browser vs node.

This commit is contained in:
Christopher Jeffrey 2017-07-07 00:34:59 -07:00
parent 8419392d44
commit 27c60ce76e
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
8 changed files with 473 additions and 306 deletions

View File

@ -0,0 +1,100 @@
/*!
* child.js - child processes for bcoin
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const assert = require('assert');
const EventEmitter = require('events');
const util = require('../utils/util');
/**
* Represents a worker.
* @alias module:workers.Child
* @constructor
* @param {String} file
* @param {Object} env
*/
function Child(file, env) {
if (!(this instanceof Child))
return new Child(file, env);
EventEmitter.call(this);
this.init(file, env);
}
util.inherits(Child, EventEmitter);
/**
* Test whether worker support is available.
* @returns {Boolean}
*/
Child.hasSupport = function hasSupport() {
return typeof global.postMessage === 'function';
};
/**
* Initialize worker. Bind to events.
* @private
*/
Child.prototype.init = function init(file, env) {
this.child = new global.Worker(file);
this.child.onerror = (event) => {
this.emit('error', new Error('Child error.'));
this.emit('exit', 1, null);
};
this.child.onmessage = (event) => {
let data;
if (typeof event.data === 'string') {
data = Buffer.from(event.data, 'hex');
assert(data.length === event.data.length / 2);
} else {
assert(event.data && typeof event.data === 'object');
assert(event.data.data && typeof event.data.data.length === 'number');
data = event.data.data;
data.__proto__ = Buffer.prototype;
}
this.emit('data', data);
};
this.child.postMessage(JSON.stringify(env));
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Child.prototype.write = function write(data) {
if (this.child.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
this.child.postMessage({ data }, [data]);
} else {
this.child.postMessage(data.toString('hex'));
}
return true;
};
/**
* Destroy the worker.
*/
Child.prototype.destroy = function destroy() {
this.child.terminate();
this.emit('exit', 15 | 0x80, 'SIGTERM');
};
/*
* Expose
*/
module.exports = Child;

187
lib/workers/child.js Normal file
View File

@ -0,0 +1,187 @@
/*!
* child.js - child processes for bcoin
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const EventEmitter = require('events');
const path = require('path');
const cp = require('child_process');
const util = require('../utils/util');
const children = new Set();
let exitBound = false;
/**
* Represents a worker.
* @alias module:workers.Child
* @constructor
* @param {String} file
* @param {Object} env
*/
function Child(file, env) {
if (!(this instanceof Child))
return new Child(file, env);
EventEmitter.call(this);
children.add(this);
bindExit();
this.init(file, env);
}
util.inherits(Child, EventEmitter);
/**
* Test whether worker support is available.
* @returns {Boolean}
*/
Child.hasSupport = function hasSupport() {
return true;
};
/**
* Initialize worker (node.js).
* @private
*/
Child.prototype.init = function init(file, env) {
let bin = process.argv[0];
let filename = path.resolve(__dirname, file);
let environ = Object.assign({}, process.env, env);
let options = { stdio: 'pipe', env: environ };
this.child = cp.spawn(bin, [filename], options);
this.child.unref();
this.child.stdin.unref();
this.child.stdout.unref();
this.child.stderr.unref();
this.child.on('error', (err) => {
this.emit('error', err);
});
this.child.on('exit', (code, signal) => {
children.delete(this);
this.emit('exit', code == null ? -1 : code, signal);
});
this.child.on('close', () => {
children.delete(this);
this.emit('exit', -1, null);
});
this.child.stdin.on('error', (err) => {
this.emit('error', err);
});
this.child.stdout.on('error', (err) => {
this.emit('error', err);
});
this.child.stderr.on('error', (err) => {
this.emit('error', err);
});
this.child.stdout.on('data', (data) => {
this.emit('data', data);
});
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Child.prototype.write = function write(data) {
return this.child.stdin.write(data);
};
/**
* Destroy the worker.
*/
Child.prototype.destroy = function destroy() {
this.child.kill('SIGTERM');
};
/*
* Helpers
*/
function bindExit() {
let onSighup, onSigint, onSigterm, onError;
if (exitBound)
return;
exitBound = true;
onSighup = () => {
process.exit(1 | 0x80);
};
onSigint = () => {
process.exit(2 | 0x80);
};
onSigterm = () => {
process.exit(15 | 0x80);
};
onError = (err) => {
if (err && err.stack)
console.error(err.stack + '');
else
console.error(err + '');
process.exit(1);
};
process.once('exit', () => {
for (let child of children)
child.destroy();
});
if (process.listenerCount('SIGHUP') === 0)
process.once('SIGHUP', onSighup);
if (process.listenerCount('SIGINT') === 0)
process.once('SIGINT', onSigint);
if (process.listenerCount('SIGTERM') === 0)
process.once('SIGTERM', onSigterm);
if (process.listenerCount('uncaughtException') === 0)
process.once('uncaughtException', onError);
process.on('newListener', (name) => {
switch (name) {
case 'SIGHUP':
process.removeListener(name, onSighup);
break;
case 'SIGINT':
process.removeListener(name, onSigint);
break;
case 'SIGTERM':
process.removeListener(name, onSigterm);
break;
case 'uncaughtException':
process.removeListener(name, onError);
break;
}
});
}
/*
* Expose
*/
module.exports = Child;

View File

@ -1,3 +0,0 @@
'use strict';
exports.unsupported = true;

View File

@ -1,3 +0,0 @@
'use strict';
module.exports = require('child_process');

View File

@ -15,8 +15,7 @@ const jobs = require('./jobs');
const Parser = require('./parser');
const Framer = require('./framer');
const packets = require('./packets');
const HAS_WORKERS = typeof global.postMessage === 'function';
const HAS_CP = !!(process.stdin && process.stdout && process.stderr);
const Parent = require('./parent');
/**
* Represents the master process.
@ -30,13 +29,14 @@ function Master() {
EventEmitter.call(this);
this.parent = new Parent();
this.framer = new Framer();
this.parser = new Parser();
this.env = {};
this.listening = false;
this.color = false;
this._init();
this.init();
}
util.inherits(Master, EventEmitter);
@ -46,8 +46,8 @@ util.inherits(Master, EventEmitter);
* @private
*/
Master.prototype._init = function _init() {
this.on('data', (data) => {
Master.prototype.init = function init() {
this.parent.on('data', (data) => {
this.parser.feed(data);
});
@ -59,60 +59,9 @@ Master.prototype._init = function _init() {
this.emit('packet', packet);
});
if (HAS_WORKERS) {
// Web workers
this._initWebWorkers();
} else if (HAS_CP) {
// Child process + pipes
this._initChildProcess();
} else {
throw new Error('Workers not available.');
}
};
/**
* Initialize master (web workers).
* @private
*/
Master.prototype._initWebWorkers = function _initWebWorkers() {
global.onerror = (event) => {
this.emit('error', new Error('Master error.'));
};
global.onmessage = (event) => {
let data;
if (typeof event.data === 'string') {
data = Buffer.from(event.data, 'hex');
assert(data.length === event.data.length / 2);
} else {
assert(event.data && typeof event.data === 'object');
assert(event.data.data && typeof event.data.data.length === 'number');
data = event.data.data;
data.__proto__ = Buffer.prototype;
}
this.emit('data', data);
};
};
/**
* Initialize master (node.js).
* @private
*/
Master.prototype._initChildProcess = function _initChildProcess() {
process.stdin.on('data', (data) => {
this.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', () => {});
process.stdout.on('error', () => {});
process.stderr.on('error', () => {});
process.on('uncaughtException', (err) => {
this.parent.on('exception', (err) => {
this.send(new packets.ErrorPacket(err));
setTimeout(() => process.exit(1), 1000);
setTimeout(() => this.destroy(), 1000);
});
};
@ -132,16 +81,7 @@ Master.prototype.set = function set(network) {
*/
Master.prototype.write = function write(data) {
if (HAS_WORKERS) {
if (global.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
global.postMessage({ data }, [data]);
} else {
global.postMessage(data.toString('hex'));
}
return true;
}
return process.stdout.write(data);
return this.parent.write(data);
};
/**
@ -170,9 +110,7 @@ Master.prototype.sendEvent = function sendEvent(...items) {
*/
Master.prototype.destroy = function destroy() {
if (HAS_WORKERS)
return global.close();
return process.exit(0);
return this.parent.destroy();
};
/**

View File

@ -0,0 +1,83 @@
/*!
* parent.js - worker processes for bcoin
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const assert = require('assert');
const EventEmitter = require('events');
const util = require('../utils/util');
/**
* Represents the parent process.
* @alias module:workers.Parent
* @constructor
*/
function Parent() {
if (!(this instanceof Parent))
return new Parent();
EventEmitter.call(this);
this.init();
}
util.inherits(Parent, EventEmitter);
/**
* Initialize master (web workers).
* @private
*/
Parent.prototype.init = function init() {
global.onerror = (event) => {
this.emit('error', new Error('Worker error.'));
};
global.onmessage = (event) => {
let data;
if (typeof event.data === 'string') {
data = Buffer.from(event.data, 'hex');
assert(data.length === event.data.length / 2);
} else {
assert(event.data && typeof event.data === 'object');
assert(event.data.data && typeof event.data.data.length === 'number');
data = event.data.data;
data.__proto__ = Buffer.prototype;
}
this.emit('data', data);
};
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Parent.prototype.write = function write(data) {
if (global.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
global.postMessage({ data }, [data]);
} else {
global.postMessage(data.toString('hex'));
}
return true;
};
/**
* Destroy the worker.
*/
Parent.prototype.destroy = function destroy() {
global.close();
};
/*
* Expose
*/
module.exports = Parent;

71
lib/workers/parent.js Normal file
View File

@ -0,0 +1,71 @@
/*!
* parent.js - worker processes for bcoin
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const EventEmitter = require('events');
const util = require('../utils/util');
/**
* Represents the parent process.
* @alias module:workers.Parent
* @constructor
*/
function Parent() {
if (!(this instanceof Parent))
return new Parent();
EventEmitter.call(this);
this.init();
}
util.inherits(Parent, EventEmitter);
/**
* Initialize master (node.js).
* @private
*/
Parent.prototype.init = function init() {
process.stdin.on('data', (data) => {
this.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', () => {});
process.stdout.on('error', () => {});
process.stderr.on('error', () => {});
process.on('uncaughtException', (err) => {
this.emit('exception', err);
});
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Parent.prototype.write = function write(data) {
return process.stdout.write(data);
};
/**
* Destroy the worker.
*/
Parent.prototype.destroy = function destroy() {
return process.exit(0);
};
/*
* Expose
*/
module.exports = Parent;

View File

@ -10,23 +10,15 @@
const assert = require('assert');
const EventEmitter = require('events');
const os = require('os');
const path = require('path');
const cp = require('./cp');
const util = require('../utils/util');
const co = require('../utils/co');
const Network = require('../protocol/network');
const Child = require('./child');
const jobs = require('./jobs');
const Parser = require('./parser');
const Framer = require('./framer');
const packets = require('./packets');
const HAS_WORKERS = typeof global.Worker === 'function';
const HAS_CP = typeof cp.spawn === 'function';
const HAS_SUPPORT = HAS_WORKERS || HAS_CP;
const pools = new Set();
let exitBound = false;
/**
* A worker pool.
* @alias module:workers.WorkerPool
@ -46,36 +38,18 @@ function WorkerPool(options) {
EventEmitter.call(this);
this.size = WorkerPool.CORES;
this.size = getCores();
this.timeout = 60000;
this.children = new Map();
this.nonce = 0;
this.enabled = true;
this.file = WorkerPool.WORKER_FILE;
bindExit();
pools.add(this);
this.file = process.env.BCOIN_WORKER_FILE || 'worker.js';
this.set(options);
}
util.inherits(WorkerPool, EventEmitter);
/**
* Number of CPUs/cores available.
* @type {Number}
*/
WorkerPool.CORES = getCores();
/**
* Default worker file.
* @const {String}
*/
WorkerPool.WORKER_FILE = process.env.BCOIN_WORKER_FILE || 'worker.js';
/**
* Set worker pool options.
* @param {Object} options
@ -108,29 +82,13 @@ WorkerPool.prototype.set = function set(options) {
}
};
/**
* Enable the worker pool.
*/
WorkerPool.prototype.enable = function enable() {
this.enabled = true;
};
/**
* Disable the worker pool.
*/
WorkerPool.prototype.disable = function disable() {
this.enabled = true;
};
/**
* Open worker pool.
* @returns {Promise}
*/
WorkerPool.prototype.open = async function open() {
pools.add(this);
;
};
/**
@ -139,7 +97,6 @@ WorkerPool.prototype.open = async function open() {
*/
WorkerPool.prototype.close = async function close() {
pools.delete(this);
this.destroy();
};
@ -230,7 +187,7 @@ WorkerPool.prototype.destroy = function destroy() {
WorkerPool.prototype.execute = function execute(packet, timeout) {
let result, child;
if (!this.enabled || !HAS_SUPPORT) {
if (!this.enabled || !Child.hasSupport()) {
return new Promise((resolve, reject) => {
setImmediate(() => {
try {
@ -401,22 +358,19 @@ function Worker(file) {
EventEmitter.call(this);
this.file = file;
this.id = -1;
this.framer = new Framer();
this.parser = new Parser();
this.pending = new Map();
this.child = null;
this.env = {
this.child = new Child(file, {
BCOIN_WORKER_NETWORK: Network.type,
BCOIN_WORKER_ISTTY: process.stdout
? (process.stdout.isTTY ? '1' : '0')
: '0'
};
});
this._init();
this.init();
}
util.inherits(Worker, EventEmitter);
@ -426,11 +380,19 @@ util.inherits(Worker, EventEmitter);
* @private
*/
Worker.prototype._init = function _init() {
this.on('data', (data) => {
Worker.prototype.init = function init() {
this.child.on('data', (data) => {
this.parser.feed(data);
});
this.child.on('exit', (code, signal) => {
this.emit('exit', code, signal);
});
this.child.on('error', (err) => {
this.emit('error', err);
});
this.parser.on('error', (err) => {
this.emit('error', err);
});
@ -439,94 +401,7 @@ Worker.prototype._init = function _init() {
this.emit('packet', packet);
});
if (HAS_WORKERS) {
// Web workers
this._initWebWorkers();
} else if (HAS_CP) {
// Child process + pipes
this._initChildProcess();
} else {
throw new Error('Workers not available.');
}
this._listen();
};
/**
* Initialize worker (web workers).
* @private
*/
Worker.prototype._initWebWorkers = function _initWebWorkers() {
this.child = new global.Worker(this.file);
this.child.onerror = (event) => {
this.emit('error', new Error('Worker error.'));
this.emit('exit', -1, null);
};
this.child.onmessage = (event) => {
let data;
if (typeof event.data === 'string') {
data = Buffer.from(event.data, 'hex');
assert(data.length === event.data.length / 2);
} else {
assert(event.data && typeof event.data === 'object');
assert(event.data.data && typeof event.data.data.length === 'number');
data = event.data.data;
data.__proto__ = Buffer.prototype;
}
this.emit('data', data);
};
this.child.postMessage(JSON.stringify(this.env));
};
/**
* Initialize worker (node.js).
* @private
*/
Worker.prototype._initChildProcess = function _initChildProcess() {
let bin = process.argv[0];
let file = path.resolve(__dirname, this.file);
let env = Object.assign({}, process.env, this.env);
let options = { stdio: 'pipe', env: env };
this.child = cp.spawn(bin, [file], options);
this.child.unref();
this.child.stdin.unref();
this.child.stdout.unref();
this.child.stderr.unref();
this.child.on('error', (err) => {
this.emit('error', err);
});
this.child.on('exit', (code, signal) => {
this.emit('exit', code == null ? -1 : code, signal);
});
this.child.on('close', () => {
this.emit('exit', -1, null);
});
this.child.stdin.on('error', (err) => {
this.emit('error', err);
});
this.child.stdout.on('error', (err) => {
this.emit('error', err);
});
this.child.stderr.on('error', (err) => {
this.emit('error', err);
});
this.child.stdout.on('data', (data) => {
this.emit('data', data);
});
this.listen();
};
/**
@ -534,8 +409,8 @@ Worker.prototype._initChildProcess = function _initChildProcess() {
* @private
*/
Worker.prototype._listen = function _listen() {
this.on('exit', () => {
Worker.prototype.listen = function listen() {
this.on('exit', (code, signal) => {
this.killJobs();
});
@ -586,16 +461,7 @@ Worker.prototype.handlePacket = function handlePacket(packet) {
*/
Worker.prototype.write = function write(data) {
if (HAS_WORKERS) {
if (this.child.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
this.child.postMessage({ data }, [data]);
} else {
this.child.postMessage(data.toString('hex'));
}
return true;
}
return this.child.stdin.write(data);
return this.child.write(data);
};
/**
@ -624,12 +490,7 @@ Worker.prototype.sendEvent = function sendEvent(...items) {
*/
Worker.prototype.destroy = function destroy() {
if (HAS_WORKERS) {
this.child.terminate();
this.emit('exit', -1, 'SIGTERM');
return;
}
return this.child.kill('SIGTERM');
return this.child.destroy();
};
/**
@ -796,73 +657,6 @@ function getCores() {
return Math.max(2, os.cpus().length);
}
function bindExit() {
let onSighup, onSigint, onSigterm, onError;
if (!HAS_CP)
return;
if (exitBound)
return;
exitBound = true;
onSighup = () => {
process.exit(1 | 0x80);
};
onSigint = () => {
process.exit(2 | 0x80);
};
onSigterm = () => {
process.exit(15 | 0x80);
};
onError = (err) => {
if (err && err.stack)
console.error(err.stack + '');
else
console.error(err + '');
process.exit(1);
};
process.once('exit', () => {
for (let pool of pools)
pool.destroy();
});
if (process.listenerCount('SIGHUP') === 0)
process.once('SIGHUP', onSighup);
if (process.listenerCount('SIGINT') === 0)
process.once('SIGINT', onSigint);
if (process.listenerCount('SIGTERM') === 0)
process.once('SIGTERM', onSigterm);
if (process.listenerCount('uncaughtException') === 0)
process.once('uncaughtException', onError);
process.on('newListener', (name) => {
switch (name) {
case 'SIGHUP':
process.removeListener(name, onSighup);
break;
case 'SIGINT':
process.removeListener(name, onSigint);
break;
case 'SIGTERM':
process.removeListener(name, onSigterm);
break;
case 'uncaughtException':
process.removeListener(name, onError);
break;
}
});
}
/*
* Expose
*/