workers: minor.

This commit is contained in:
Christopher Jeffrey 2017-07-07 08:15:10 -07:00
parent 647b6909c6
commit a93f82db73
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
6 changed files with 37 additions and 25 deletions

View File

@ -31,6 +31,7 @@ const node = new bcoin.fullnode({
logLevel: 'debug', logLevel: 'debug',
db: 'leveldb', db: 'leveldb',
persistent: true, persistent: true,
workers: true,
listen: true, listen: true,
loader: require loader: require
}); });

View File

@ -18,6 +18,7 @@ const node = bcoin.spvnode({
logLevel: 'debug', logLevel: 'debug',
db: 'leveldb', db: 'leveldb',
persistent: true, persistent: true,
workers: true,
listen: true, listen: true,
loader: require loader: require
}); });

View File

@ -28,8 +28,8 @@ function Child(file, env) {
EventEmitter.call(this); EventEmitter.call(this);
children.add(this);
bindExit(); bindExit();
children.add(this);
this.init(file, env); this.init(file, env);
} }
@ -112,31 +112,43 @@ Child.prototype.destroy = function destroy() {
this.child.kill('SIGTERM'); this.child.kill('SIGTERM');
}; };
/* /**
* Helpers * Cleanup all child processes.
* @private
*/ */
function bindExit() { function bindExit() {
let onSighup, onSigint, onSigterm, onError;
if (exitBound) if (exitBound)
return; return;
exitBound = true; exitBound = true;
onSighup = () => { listenExit(() => {
for (let child of children)
child.destroy();
});
}
/**
* Listen for exit.
* @param {Function} handler
* @private
*/
function listenExit(handler) {
let onSighup = () => {
process.exit(1 | 0x80); process.exit(1 | 0x80);
}; };
onSigint = () => { let onSigint = () => {
process.exit(2 | 0x80); process.exit(2 | 0x80);
}; };
onSigterm = () => { let onSigterm = () => {
process.exit(15 | 0x80); process.exit(15 | 0x80);
}; };
onError = (err) => { let onError = (err) => {
if (err && err.stack) if (err && err.stack)
console.error(err.stack + ''); console.error(err.stack + '');
else else
@ -145,10 +157,7 @@ function bindExit() {
process.exit(1); process.exit(1);
}; };
process.once('exit', () => { process.once('exit', handler);
for (let child of children)
child.destroy();
});
if (process.listenerCount('SIGHUP') === 0) if (process.listenerCount('SIGHUP') === 0)
process.once('SIGHUP', onSighup); process.once('SIGHUP', onSighup);

View File

@ -27,7 +27,7 @@ const jobs = exports;
jobs.execute = function execute(p) { jobs.execute = function execute(p) {
try { try {
return jobs._execute(p); return jobs.handle(p);
} catch (e) { } catch (e) {
return new packets.ErrorResultPacket(e); return new packets.ErrorResultPacket(e);
} }
@ -41,7 +41,7 @@ jobs.execute = function execute(p) {
* @throws on unknown command * @throws on unknown command
*/ */
jobs._execute = function execute(p) { jobs.handle = function handle(p) {
switch (p.cmd) { switch (p.cmd) {
case packets.types.VERIFY: case packets.types.VERIFY:
return jobs.verify(p.tx, p.view, p.flags); return jobs.verify(p.tx, p.view, p.flags);

View File

@ -1,5 +1,5 @@
/*! /*!
* workers.js - worker processes for bcoin * master.js - master process for bcoin
* Copyright (c) 2014-2015, Fedor Indutny (MIT License) * Copyright (c) 2014-2015, Fedor Indutny (MIT License)
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin * https://github.com/bcoin-org/bcoin

View File

@ -25,11 +25,11 @@ const packets = require('./packets');
* @constructor * @constructor
* @param {Object} options * @param {Object} options
* @param {Number} [options.size=num-cores] - Max pool size. * @param {Number} [options.size=num-cores] - Max pool size.
* @param {Number} [options.timeout=10000] - Execution timeout. * @param {Number} [options.timeout=120000] - Execution timeout.
* @property {Number} size * @property {Number} size
* @property {Number} timeout * @property {Number} timeout
* @property {Map} children * @property {Map} children
* @property {Number} nonce * @property {Number} uid
*/ */
function WorkerPool(options) { function WorkerPool(options) {
@ -38,13 +38,14 @@ function WorkerPool(options) {
EventEmitter.call(this); EventEmitter.call(this);
this.enabled = false;
this.size = getCores(); this.size = getCores();
this.timeout = 60000; this.timeout = 120000;
this.children = new Map();
this.nonce = 0;
this.enabled = true;
this.file = process.env.BCOIN_WORKER_FILE || 'worker.js'; this.file = process.env.BCOIN_WORKER_FILE || 'worker.js';
this.children = new Map();
this.uid = 0;
this.set(options); this.set(options);
} }
@ -142,7 +143,7 @@ WorkerPool.prototype.spawn = function spawn(id) {
*/ */
WorkerPool.prototype.alloc = function alloc() { WorkerPool.prototype.alloc = function alloc() {
let id = this.nonce++ % this.size; let id = this.uid++ % this.size;
if (!this.children.has(id)) if (!this.children.has(id))
this.children.set(id, this.spawn(id)); this.children.set(id, this.spawn(id));
@ -191,7 +192,7 @@ WorkerPool.prototype.execute = function execute(packet, timeout) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
setImmediate(() => { setImmediate(() => {
try { try {
result = jobs._execute(packet); result = jobs.handle(packet);
} catch (e) { } catch (e) {
reject(e); reject(e);
return; return;
@ -590,7 +591,7 @@ function PendingJob(worker, id, resolve, reject) {
*/ */
PendingJob.prototype.start = function start(timeout) { PendingJob.prototype.start = function start(timeout) {
if (!timeout || timeout === -1) if (!timeout || timeout <= 0)
return; return;
this.timer = setTimeout(() => { this.timer = setTimeout(() => {