worker: more refactoring.

This commit is contained in:
Christopher Jeffrey 2016-12-02 18:14:17 -08:00
parent 1f47d5ead5
commit bdc7ec0198
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 249 additions and 135 deletions

View File

@ -7,6 +7,7 @@
'use strict';
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var util = require('../utils/util');
var Network = require('../protocol/network');
@ -31,6 +32,8 @@ function Master() {
this.framer = new Framer();
this.parser = new Parser();
this.env = {};
this.listening = false;
this._init();
}
@ -45,41 +48,67 @@ util.inherits(Master, EventEmitter);
Master.prototype._init = function _init() {
var self = this;
if (util.isBrowser) {
global.onerror = function onerror(err) {
self.emit('error', err);
};
global.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
} else {
process.stdin.on('data', function(data) {
self.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', util.nop);
process.stdout.on('error', util.nop);
process.stderr.on('error', util.nop);
}
this.on('data', function(data) {
self.parser.feed(data);
});
this.parser.on('error', function(e) {
self.emit('error', e);
this.parser.on('error', function(err) {
self.emit('error', err);
});
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
if (util.isBrowser) {
// Web workers
this._initWebWorkers();
} else {
// Child process + pipes
this._initChildProcess();
}
};
/**
* Initialize master (web workers).
* @private
*/
Master.prototype._initWebWorkers = function _initWebWorkers() {
var self = this;
global.onerror = function onerror(err) {
self.emit('error', err);
};
global.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
};
/**
* Initialize master (node.js).
* @private
*/
Master.prototype._initChildProcess = function _initChildProcess() {
var self = this;
process.stdin.on('data', function(data) {
self.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', util.nop);
process.stdout.on('error', util.nop);
process.stderr.on('error', util.nop);
};
/**
@ -172,6 +201,11 @@ Master.prototype.log = function log() {
Master.prototype.listen = function listen(env) {
var self = this;
assert(!this.listening, 'Already listening.');
this.env = env;
this.listening = true;
Network.set(env.BCOIN_WORKER_NETWORK);
util.log = this.log.bind(this);
@ -182,23 +216,39 @@ Master.prototype.listen = function listen(env) {
});
this.on('packet', function(packet) {
var result;
switch (packet.cmd) {
case packets.types.EVENT:
self.emit('event', packet.items);
self.emit.apply(self, packet.items);
break;
case packets.types.ERROR:
self.emit('error', packet.error);
break;
default:
result = jobs.execute(packet);
result.id = packet.id;
self.send(result);
break;
try {
self.handlePacket(packet);
} catch (e) {
self.emit('error', e);
}
});
return this;
};
/**
* Handle packet.
* @private
* @param {Packet}
*/
Master.prototype.handlePacket = function handlePacket(packet) {
var result;
switch (packet.cmd) {
case packets.types.EVENT:
this.emit('event', packet.items);
this.emit.apply(this, packet.items);
break;
case packets.types.ERROR:
this.emit('error', packet.error);
break;
default:
result = jobs.execute(packet);
result.id = packet.id;
this.send(result);
break;
}
};
/*

View File

@ -47,7 +47,6 @@ var packetTypes = {
function Packet() {
this.id = ++Packet.id >>> 0;
this.error = null;
}
Packet.id = 0;

View File

@ -410,6 +410,9 @@ function Worker(id) {
this.id = id != null ? id : -1;
this.child = null;
this.pending = {};
this.env = {
BCOIN_WORKER_NETWORK: Network.type
};
this._init();
}
@ -423,92 +426,105 @@ util.inherits(Worker, EventEmitter);
Worker.prototype._init = function _init() {
var self = this;
var penv;
penv = {
BCOIN_WORKER_NETWORK: Network.type
};
if (util.isBrowser) {
this.child = new global.Worker('/bcoin-worker.js');
this.child.onerror = function onerror(err) {
self.emit('error', err);
self.emit('exit', -1, null);
};
this.child.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
this.child.postMessage(JSON.stringify(penv));
} else {
this.child = cp.spawn(process.argv[0], [__dirname + '/worker.js'], {
stdio: 'pipe',
env: util.merge({}, process.env, penv)
});
this.child.on('error', function(err) {
self.emit('error', err);
});
this.child.on('exit', function(code, signal) {
self.emit('exit', code == null ? -1 : code, signal);
});
this.child.on('close', function() {
self.emit('exit', -1, null);
});
this.child.stdin.on('error', function(err) {
self.emit('error', err);
});
this.child.stdout.on('error', function(err) {
self.emit('error', err);
});
this.child.stderr.on('error', function(err) {
self.emit('error', err);
});
this.child.stdout.on('data', function(data) {
self.emit('data', data);
});
}
this.on('exit', function() {
self.killJobs();
});
this.on('error', function() {
self.killJobs();
});
this.on('data', function(data) {
self.parser.feed(data);
});
this.parser.on('error', function(e) {
self.emit('error', e);
this.parser.on('error', function(err) {
self.emit('error', err);
});
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
if (util.isBrowser) {
// Web workers
this._initWebWorkers();
} else {
// Child process + pipes
this._initChildProcess();
}
this._bind();
this._listen();
};
/**
* Initialize worker. Bind to more events.
* Initialize worker (web workers).
* @private
*/
Worker.prototype._initWebWorkers = function _initWebWorkers() {
var self = this;
this.child = new global.Worker('/bcoin-worker.js');
this.child.onerror = function onerror(err) {
self.emit('error', err);
self.emit('exit', -1, null);
};
this.child.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
this.child.postMessage(JSON.stringify(this.env));
};
/**
* Initialize worker (node.js).
* @private
*/
Worker.prototype._initChildProcess = function _initChildProcess() {
var self = this;
var file = process.argv[0];
var argv = [__dirname + '/worker.js'];
var env = util.merge({}, process.env, this.env);
var options = { stdio: 'pipe', env: env };
this.child = cp.spawn(file, argv, options);
this.child.on('error', function(err) {
self.emit('error', err);
});
this.child.on('exit', function(code, signal) {
self.emit('exit', code == null ? -1 : code, signal);
});
this.child.on('close', function() {
self.emit('exit', -1, null);
});
this.child.stdin.on('error', function(err) {
self.emit('error', err);
});
this.child.stdout.on('error', function(err) {
self.emit('error', err);
});
this.child.stderr.on('error', function(err) {
self.emit('error', err);
});
this.child.stdout.on('data', function(data) {
self.emit('data', data);
});
};
/**
* Bind to exit listener.
* @private
*/
@ -521,33 +537,64 @@ Worker.prototype._bind = function _bind() {
WorkerPool.children.splice(i, 1);
});
this.on('packet', function(packet) {
switch (packet.cmd) {
case packets.types.EVENT:
self.emit.apply(self, packet.items);
self.emit('event', packet.items);
break;
case packets.types.LOG:
util.log('Worker %d:', self.id);
util.log.apply(util, packet.items);
break;
case packets.types.ERROR:
self.emit('error', packet.error);
break;
case packets.types.ERRORRESULT:
self.rejectJob(packet.id, packet.error);
break;
default:
self.resolveJob(packet.id, packet);
break;
}
});
WorkerPool.children.push(this);
WorkerPool._bindExit();
};
/**
* Listen for packets.
* @private
*/
Worker.prototype._listen = function _listen() {
var self = this;
this.on('exit', function() {
self.killJobs();
});
this.on('error', function() {
self.killJobs();
});
this.on('packet', function(packet) {
try {
self.handlePacket(packet);
} catch (e) {
self.emit('error', e);
}
});
};
/**
* Handle packet.
* @private
* @param {Packet} packet
*/
Worker.prototype.handlePacket = function handlePacket(packet) {
switch (packet.cmd) {
case packets.types.EVENT:
this.emit.apply(this, packet.items);
this.emit('event', packet.items);
break;
case packets.types.LOG:
util.log('Worker %d:', this.id);
util.log.apply(util, packet.items);
break;
case packets.types.ERROR:
this.emit('error', packet.error);
break;
case packets.types.ERRORRESULT:
this.rejectJob(packet.id, packet.error);
break;
default:
this.resolveJob(packet.id, packet);
break;
}
};
/**
* Send data to worker.
* @param {Buffer} data
@ -684,6 +731,9 @@ Worker.prototype.killJobs = function killJobs() {
/**
* PendingWorker
* @constructor
* @param {Worker} worker
* @param {Number} id
* @param {Function} callback
*/
function PendingJob(worker, id, callback) {
@ -693,6 +743,11 @@ function PendingJob(worker, id, callback) {
this.timer = null;
}
/**
* Start the timer.
* @param {Number} timeout
*/
PendingJob.prototype.start = function start(timeout) {
var self = this;
@ -704,10 +759,20 @@ PendingJob.prototype.start = function start(timeout) {
}, timeout);
};
/**
* Destroy the job with an error.
*/
PendingJob.prototype.destroy = function destroy() {
this.finish(new Error('Job was destroyed.'));
};
/**
* Complete job with error and result.
* @param {Error} err
* @param {Object} reject
*/
PendingJob.prototype.finish = function finish(err, result) {
var callback = this.callback;