diff --git a/lib/workers/master.js b/lib/workers/master.js index 300fbb9d..a6f7f60a 100644 --- a/lib/workers/master.js +++ b/lib/workers/master.js @@ -7,6 +7,7 @@ 'use strict'; +var assert = require('assert'); var EventEmitter = require('events').EventEmitter; var util = require('../utils/util'); var Network = require('../protocol/network'); @@ -31,6 +32,8 @@ function Master() { this.framer = new Framer(); this.parser = new Parser(); + this.env = {}; + this.listening = false; this._init(); } @@ -45,41 +48,67 @@ util.inherits(Master, EventEmitter); Master.prototype._init = function _init() { var self = this; - if (util.isBrowser) { - global.onerror = function onerror(err) { - self.emit('error', err); - }; - global.onmessage = function onmessage(event) { - var data; - if (typeof event.data !== 'string') { - data = event.data.buf; - data.__proto__ = Buffer.prototype; - } else { - data = new Buffer(event.data, 'hex'); - } - self.emit('data', data); - }; - } else { - process.stdin.on('data', function(data) { - self.emit('data', data); - }); - // Nowhere to send these errors: - process.stdin.on('error', util.nop); - process.stdout.on('error', util.nop); - process.stderr.on('error', util.nop); - } - this.on('data', function(data) { self.parser.feed(data); }); - this.parser.on('error', function(e) { - self.emit('error', e); + this.parser.on('error', function(err) { + self.emit('error', err); }); this.parser.on('packet', function(packet) { self.emit('packet', packet); }); + + if (util.isBrowser) { + // Web workers + this._initWebWorkers(); + } else { + // Child process + pipes + this._initChildProcess(); + } +}; + +/** + * Initialize master (web workers). + * @private + */ + +Master.prototype._initWebWorkers = function _initWebWorkers() { + var self = this; + + global.onerror = function onerror(err) { + self.emit('error', err); + }; + + global.onmessage = function onmessage(event) { + var data; + if (typeof event.data !== 'string') { + data = event.data.buf; + data.__proto__ = Buffer.prototype; + } else { + data = new Buffer(event.data, 'hex'); + } + self.emit('data', data); + }; +}; + +/** + * Initialize master (node.js). + * @private + */ + +Master.prototype._initChildProcess = function _initChildProcess() { + var self = this; + + process.stdin.on('data', function(data) { + self.emit('data', data); + }); + + // Nowhere to send these errors: + process.stdin.on('error', util.nop); + process.stdout.on('error', util.nop); + process.stderr.on('error', util.nop); }; /** @@ -172,6 +201,11 @@ Master.prototype.log = function log() { Master.prototype.listen = function listen(env) { var self = this; + assert(!this.listening, 'Already listening.'); + + this.env = env; + this.listening = true; + Network.set(env.BCOIN_WORKER_NETWORK); util.log = this.log.bind(this); @@ -182,23 +216,39 @@ Master.prototype.listen = function listen(env) { }); this.on('packet', function(packet) { - var result; - - switch (packet.cmd) { - case packets.types.EVENT: - self.emit('event', packet.items); - self.emit.apply(self, packet.items); - break; - case packets.types.ERROR: - self.emit('error', packet.error); - break; - default: - result = jobs.execute(packet); - result.id = packet.id; - self.send(result); - break; + try { + self.handlePacket(packet); + } catch (e) { + self.emit('error', e); } }); + + return this; +}; + +/** + * Handle packet. + * @private + * @param {Packet} + */ + +Master.prototype.handlePacket = function handlePacket(packet) { + var result; + + switch (packet.cmd) { + case packets.types.EVENT: + this.emit('event', packet.items); + this.emit.apply(this, packet.items); + break; + case packets.types.ERROR: + this.emit('error', packet.error); + break; + default: + result = jobs.execute(packet); + result.id = packet.id; + this.send(result); + break; + } }; /* diff --git a/lib/workers/packets.js b/lib/workers/packets.js index 053bd769..b478a3a4 100644 --- a/lib/workers/packets.js +++ b/lib/workers/packets.js @@ -47,7 +47,6 @@ var packetTypes = { function Packet() { this.id = ++Packet.id >>> 0; - this.error = null; } Packet.id = 0; diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index 69470b06..00e3008c 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -410,6 +410,9 @@ function Worker(id) { this.id = id != null ? id : -1; this.child = null; this.pending = {}; + this.env = { + BCOIN_WORKER_NETWORK: Network.type + }; this._init(); } @@ -423,92 +426,105 @@ util.inherits(Worker, EventEmitter); Worker.prototype._init = function _init() { var self = this; - var penv; - - penv = { - BCOIN_WORKER_NETWORK: Network.type - }; - - if (util.isBrowser) { - this.child = new global.Worker('/bcoin-worker.js'); - - this.child.onerror = function onerror(err) { - self.emit('error', err); - self.emit('exit', -1, null); - }; - - this.child.onmessage = function onmessage(event) { - var data; - if (typeof event.data !== 'string') { - data = event.data.buf; - data.__proto__ = Buffer.prototype; - } else { - data = new Buffer(event.data, 'hex'); - } - self.emit('data', data); - }; - - this.child.postMessage(JSON.stringify(penv)); - } else { - this.child = cp.spawn(process.argv[0], [__dirname + '/worker.js'], { - stdio: 'pipe', - env: util.merge({}, process.env, penv) - }); - - this.child.on('error', function(err) { - self.emit('error', err); - }); - - this.child.on('exit', function(code, signal) { - self.emit('exit', code == null ? -1 : code, signal); - }); - - this.child.on('close', function() { - self.emit('exit', -1, null); - }); - - this.child.stdin.on('error', function(err) { - self.emit('error', err); - }); - - this.child.stdout.on('error', function(err) { - self.emit('error', err); - }); - - this.child.stderr.on('error', function(err) { - self.emit('error', err); - }); - - this.child.stdout.on('data', function(data) { - self.emit('data', data); - }); - } - - this.on('exit', function() { - self.killJobs(); - }); - - this.on('error', function() { - self.killJobs(); - }); this.on('data', function(data) { self.parser.feed(data); }); - this.parser.on('error', function(e) { - self.emit('error', e); + this.parser.on('error', function(err) { + self.emit('error', err); }); this.parser.on('packet', function(packet) { self.emit('packet', packet); }); + if (util.isBrowser) { + // Web workers + this._initWebWorkers(); + } else { + // Child process + pipes + this._initChildProcess(); + } + this._bind(); + this._listen(); }; /** - * Initialize worker. Bind to more events. + * Initialize worker (web workers). + * @private + */ + +Worker.prototype._initWebWorkers = function _initWebWorkers() { + var self = this; + + this.child = new global.Worker('/bcoin-worker.js'); + + this.child.onerror = function onerror(err) { + self.emit('error', err); + self.emit('exit', -1, null); + }; + + this.child.onmessage = function onmessage(event) { + var data; + if (typeof event.data !== 'string') { + data = event.data.buf; + data.__proto__ = Buffer.prototype; + } else { + data = new Buffer(event.data, 'hex'); + } + self.emit('data', data); + }; + + this.child.postMessage(JSON.stringify(this.env)); +}; + +/** + * Initialize worker (node.js). + * @private + */ + +Worker.prototype._initChildProcess = function _initChildProcess() { + var self = this; + var file = process.argv[0]; + var argv = [__dirname + '/worker.js']; + var env = util.merge({}, process.env, this.env); + var options = { stdio: 'pipe', env: env }; + + this.child = cp.spawn(file, argv, options); + + this.child.on('error', function(err) { + self.emit('error', err); + }); + + this.child.on('exit', function(code, signal) { + self.emit('exit', code == null ? -1 : code, signal); + }); + + this.child.on('close', function() { + self.emit('exit', -1, null); + }); + + this.child.stdin.on('error', function(err) { + self.emit('error', err); + }); + + this.child.stdout.on('error', function(err) { + self.emit('error', err); + }); + + this.child.stderr.on('error', function(err) { + self.emit('error', err); + }); + + this.child.stdout.on('data', function(data) { + self.emit('data', data); + }); +}; + +/** + * Bind to exit listener. * @private */ @@ -521,33 +537,64 @@ Worker.prototype._bind = function _bind() { WorkerPool.children.splice(i, 1); }); - this.on('packet', function(packet) { - switch (packet.cmd) { - case packets.types.EVENT: - self.emit.apply(self, packet.items); - self.emit('event', packet.items); - break; - case packets.types.LOG: - util.log('Worker %d:', self.id); - util.log.apply(util, packet.items); - break; - case packets.types.ERROR: - self.emit('error', packet.error); - break; - case packets.types.ERRORRESULT: - self.rejectJob(packet.id, packet.error); - break; - default: - self.resolveJob(packet.id, packet); - break; - } - }); - WorkerPool.children.push(this); WorkerPool._bindExit(); }; +/** + * Listen for packets. + * @private + */ + +Worker.prototype._listen = function _listen() { + var self = this; + + this.on('exit', function() { + self.killJobs(); + }); + + this.on('error', function() { + self.killJobs(); + }); + + this.on('packet', function(packet) { + try { + self.handlePacket(packet); + } catch (e) { + self.emit('error', e); + } + }); +}; + +/** + * Handle packet. + * @private + * @param {Packet} packet + */ + +Worker.prototype.handlePacket = function handlePacket(packet) { + switch (packet.cmd) { + case packets.types.EVENT: + this.emit.apply(this, packet.items); + this.emit('event', packet.items); + break; + case packets.types.LOG: + util.log('Worker %d:', this.id); + util.log.apply(util, packet.items); + break; + case packets.types.ERROR: + this.emit('error', packet.error); + break; + case packets.types.ERRORRESULT: + this.rejectJob(packet.id, packet.error); + break; + default: + this.resolveJob(packet.id, packet); + break; + } +}; + /** * Send data to worker. * @param {Buffer} data @@ -684,6 +731,9 @@ Worker.prototype.killJobs = function killJobs() { /** * PendingWorker * @constructor + * @param {Worker} worker + * @param {Number} id + * @param {Function} callback */ function PendingJob(worker, id, callback) { @@ -693,6 +743,11 @@ function PendingJob(worker, id, callback) { this.timer = null; } +/** + * Start the timer. + * @param {Number} timeout + */ + PendingJob.prototype.start = function start(timeout) { var self = this; @@ -704,10 +759,20 @@ PendingJob.prototype.start = function start(timeout) { }, timeout); }; +/** + * Destroy the job with an error. + */ + PendingJob.prototype.destroy = function destroy() { this.finish(new Error('Job was destroyed.')); }; +/** + * Complete job with error and result. + * @param {Error} err + * @param {Object} reject + */ + PendingJob.prototype.finish = function finish(err, result) { var callback = this.callback;