workers: major refactor.

This commit is contained in:
Christopher Jeffrey 2016-11-19 02:27:26 -08:00
parent 6f9ad90e07
commit 3f6e4f3847
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
11 changed files with 345 additions and 219 deletions

View File

@ -1,6 +1,7 @@
all:
@npm run browserify
@npm run uglify
@cp -f lib/workers/worker-browser.js browser/bcoin-worker.js
clean:
@npm run clean

View File

@ -17,7 +17,8 @@ proxy.on('error', function(err) {
var index = fs.readFileSync(__dirname + '/index.html');
var indexjs = fs.readFileSync(__dirname + '/index.js');
var bcoin = fs.readFileSync(__dirname + '/bcoin.js');
var worker = fs.readFileSync(__dirname + '/../lib/workers/worker-browser.js');
var master = fs.readFileSync(__dirname + '/bcoin-master.js');
var worker = fs.readFileSync(__dirname + '/bcoin-worker.js');
server.get('/favicon.ico', function(req, res, send, next) {
send(404, '', 'text');
@ -35,6 +36,10 @@ server.get('/bcoin.js', function(req, res, send, next) {
send(200, bcoin, 'js');
});
server.get('/bcoin-master.js', function(req, res, send, next) {
send(200, master, 'js');
});
server.get('/bcoin-worker.js', function(req, res, send, next) {
send(200, worker, 'js');
});

View File

@ -12,8 +12,6 @@ var BN = require('bn.js');
var utils = require('../utils/utils');
var assert = require('assert');
var BufferWriter = require('../utils/writer');
var MTX = require('../primitives/mtx');
var TX = require('../primitives/tx');
var KeyRing = require('../primitives/keyring');
var Script = require('../script/script');
var Witness = require('../script/witness');
@ -88,10 +86,10 @@ Framer.item = function _item(item, writer) {
} else if (item instanceof Witness) {
p.writeU8(41);
item.toRaw(p);
} else if (item instanceof MTX) {
} else if (isMTX(item)) {
p.writeU8(42);
item.toExtended(true, p);
} else if (item instanceof TX) {
} else if (isTX(item)) {
p.writeU8(43);
item.toExtended(true, p);
} else if (item instanceof KeyRing) {
@ -128,6 +126,18 @@ Framer.item = function _item(item, writer) {
return p;
};
/*
* Helpers
*/
function isTX(tx) {
return tx && tx.witnessHash && tx.mutable === false;
}
function isMTX(tx) {
return tx && tx.witnessHash && tx.mutable === true;
}
/*
* Expose
*/

View File

@ -19,11 +19,35 @@ var mine = require('../miner/mine');
var jobs = exports;
/**
* Master process.
* @type {Master}
* Execute a job on the worker.
* @param {String} cmd
* @param {Array} args
* @returns {Object}
* @throws on unknown command
*/
jobs.master = null;
jobs.execute = function execute(cmd, args) {
switch (cmd) {
case 'verify':
return jobs.verify(args[0], args[1]);
case 'verifyInput':
return jobs.verifyInput(args[0], args[1], args[2]);
case 'sign':
return jobs.sign(args[0], args[1], args[2]);
case 'signInput':
return jobs.signInput(args[0], args[1], args[2], args[3]);
case 'ecVerify':
return jobs.ecVerify(args[0], args[1], args[2]);
case 'ecSign':
return jobs.ecSign(args[0], args[1]);
case 'mine':
return jobs.mine(args[0], args[1], args[2], args[3]);
case 'scrypt':
return jobs.scrypt(args[0], args[1], args[2], args[3], args[4], args[5]);
default:
throw new Error('Unknown command: "' + cmd + '".');
}
};
/**
* Execute tx.verify() on worker.

223
lib/workers/master.js Normal file
View File

@ -0,0 +1,223 @@
/*!
* workers.js - worker processes for bcoin
* Copyright (c) 2014-2015, Fedor Indutny (MIT License)
* Copyright (c) 2014-2016, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
var EventEmitter = require('events').EventEmitter;
var utils = require('../utils/utils');
var global = utils.global;
var Network = require('../protocol/network');
var jobs = require('./jobs');
var Parser = require('./parser-client');
var Framer = require('./framer');
var global = utils.global;
var server;
/**
* Represents the master process.
* @exports Master
* @constructor
*/
function Master() {
if (!(this instanceof Master))
return new Master();
EventEmitter.call(this);
this.framer = new Framer();
this.parser = new Parser();
this._init();
}
utils.inherits(Master, EventEmitter);
/**
* Initialize master. Bind events.
* @private
*/
Master.prototype._init = function _init() {
var self = this;
if (utils.isBrowser) {
global.onerror = function onerror(err) {
self.emit('error', err);
};
global.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
} else {
process.stdin.on('data', function(data) {
self.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', utils.nop);
process.stdout.on('error', utils.nop);
process.stderr.on('error', utils.nop);
}
this.on('data', function(data) {
self.parser.feed(data);
});
this.parser.on('error', function(e) {
self.emit('error', e);
});
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
};
/**
* Set primary network.
* @param {NetworkType|Network} network
*/
Master.prototype.set = function set(network) {
return Network.set(network);
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Master.prototype.write = function write(data) {
if (utils.isBrowser) {
if (global.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
global.postMessage({ buf: data }, [data]);
} else {
global.postMessage(data.toString('hex'));
}
return true;
}
return process.stdout.write(data);
};
/**
* Frame and send a packet.
* @param {String} job
* @param {String} cmd
* @param {Array} items
* @returns {Boolean}
*/
Master.prototype.send = function send(job, cmd, items) {
return this.write(this.framer.packet(job, cmd, items));
};
/**
* Emit an event on the worker side.
* @param {String} event
* @param {...Object} arg
* @returns {Boolean}
*/
Master.prototype.sendEvent = function sendEvent() {
var items = new Array(arguments.length);
var i;
for (i = 0; i < items.length; i++)
items[i] = arguments[i];
return this.send(0, 'event', items);
};
/**
* Destroy the worker.
*/
Master.prototype.destroy = function destroy() {
if (utils.isBrowser)
return global.close();
return process.exit(0);
};
/**
* Write a message to stdout in the master process.
* @param {Object|String} obj
* @param {...String} args
*/
Master.prototype.log = function log() {
var items = new Array(arguments.length);
var i;
for (i = 0; i < items.length; i++)
items[i] = arguments[i];
this.sendEvent('log', items);
};
/**
* Listen for messages from master process (only if worker).
* @param {Object} env
* @returns {Master}
*/
Master.prototype.listen = function listen(env) {
var self = this;
Network.set(env.BCOIN_WORKER_NETWORK);
utils.log = this.log.bind(this);
utils.error = utils.log;
this.on('error', function(err) {
self.sendEvent('worker error', fromError(err));
});
this.on('packet', function(packet) {
var result;
if (packet.cmd === 'event') {
self.emit('event', packet.items);
self.emit.apply(self, packet.items);
return;
}
try {
result = jobs.execute(packet.cmd, packet.items);
} catch (e) {
self.send(packet.job, 'response', [fromError(e)]);
return;
}
self.send(packet.job, 'response', [null, result]);
});
};
/*
* Helpers
*/
function fromError(err) {
return [err.message, err.stack + '', err.type];
}
/*
* Expose
*/
server = new Master();
if (utils.isBrowser)
global.master = server;
module.exports = server;

View File

@ -0,0 +1,46 @@
/*!
* workers.js - worker processes for bcoin
* Copyright (c) 2014-2015, Fedor Indutny (MIT License)
* Copyright (c) 2014-2016, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
var utils = require('../utils/utils');
var ServerParser = require('./parser');
var MTX = require('../primitives/mtx');
var TX = require('../primitives/tx');
var KeyRing = require('../primitives/keyring');
/**
* Parser
* @constructor
*/
function Parser() {
if (!(this instanceof Parser))
return new Parser();
ServerParser.call(this);
}
utils.inherits(Parser, ServerParser);
Parser.prototype.parseKeyRing = function parseKeyRing(data) {
return KeyRing.fromRaw(data);
};
Parser.prototype.parseMTX = function parseMTX(data) {
return MTX.fromExtended(data, true);
};
Parser.prototype.parseTX = function parseTX(data) {
return TX.fromExtended(data, true);
};
/*
* Expose
*/
module.exports = Parser;

View File

@ -12,9 +12,6 @@ var BN = require('bn.js');
var utils = require('../utils/utils');
var assert = require('assert');
var BufferReader = require('../utils/reader');
var MTX = require('../primitives/mtx');
var TX = require('../primitives/tx');
var KeyRing = require('../primitives/keyring');
var Script = require('../script/script');
var Witness = require('../script/witness');
@ -147,10 +144,6 @@ Parser.prototype.parseHeader = function parseHeader(data) {
};
Parser.prototype.parseBody = function parseBody(data) {
return Parser.parseItem(data);
};
Parser.parseItem = function parseItem(data) {
var p = BufferReader(data);
var i, count, items;
@ -171,13 +164,13 @@ Parser.parseItem = function parseItem(data) {
items = [];
count = p.readVarint();
for (i = 0; i < count; i++)
items.push(Parser.parseItem(p));
items.push(this.parseBody(p));
return items;
case 7:
items = {};
count = p.readVarint();
for (i = 0; i < count; i++)
items[p.readVarString('utf8')] = Parser.parseItem(p);
items[p.readVarString('utf8')] = this.parseBody(p);
return items;
case 10:
return new BN(p.readVarBytes());
@ -186,16 +179,28 @@ Parser.parseItem = function parseItem(data) {
case 41:
return Witness.fromRaw(p);
case 42:
return MTX.fromExtended(p, true);
return this.parseMTX(p);
case 43:
return TX.fromExtended(p, true);
return this.parseTX(p);
case 44:
return KeyRing.fromRaw(p);
return this.parseKeyRing(p);
default:
throw new Error('Bad type.');
}
};
Parser.prototype.parseKeyRing = function parseKeyRing(data) {
throw new Error('KeyRing in client output.');
};
Parser.prototype.parseMTX = function parseMTX(data) {
throw new Error('MTX in client output.');
};
Parser.prototype.parseTX = function parseTX(data) {
throw new Error('TX in client output.');
};
/**
* Packet
* @constructor

View File

@ -9,17 +9,14 @@
/* jshint worker: true */
var bcoin, env;
self.importScripts('/bcoin.js');
bcoin = self.bcoin;
self.importScripts('/bcoin-master.js');
self.onmessage = function onmessage(event) {
var env;
self.onmessage = function() {};
env = JSON.parse(event.data);
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();
self.master.listen(env);
};

View File

@ -7,8 +7,6 @@
'use strict';
var env = process.env;
var bcoin = require('../env');
var master = require('./master');
bcoin.set(env.BCOIN_WORKER_NETWORK);
bcoin.workers.listen();
master.listen(process.env);

View File

@ -225,7 +225,7 @@ WorkerPool.prototype.execute = function execute(method, args, timeout) {
return new Promise(function(resolve, reject) {
utils.nextTick(function() {
try {
result = jobs[method].apply(jobs, args);
result = jobs.execute(method, args);
} catch (e) {
reject(e);
return;
@ -652,184 +652,6 @@ Worker.prototype.execute = function execute(method, args, timeout) {
});
};
/**
* Represents the master process.
* @exports Master
* @constructor
*/
function Master() {
if (!(this instanceof Master))
return new Master();
EventEmitter.call(this);
this.framer = new Framer();
this.parser = new Parser();
this._init();
}
utils.inherits(Master, EventEmitter);
/**
* Initialize master. Bind events.
* @private
*/
Master.prototype._init = function _init() {
var self = this;
if (utils.isBrowser) {
global.onerror = function onerror(err) {
self.emit('error', err);
};
global.onmessage = function onmessage(event) {
var data;
if (typeof event.data !== 'string') {
data = event.data.buf;
data.__proto__ = Buffer.prototype;
} else {
data = new Buffer(event.data, 'hex');
}
self.emit('data', data);
};
} else {
process.stdin.on('data', function(data) {
self.emit('data', data);
});
// Nowhere to send these errors:
process.stdin.on('error', utils.nop);
process.stdout.on('error', utils.nop);
process.stderr.on('error', utils.nop);
}
this.on('data', function(data) {
self.parser.feed(data);
});
this.parser.on('error', function(e) {
self.emit('error', e);
});
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
};
/**
* Send data to worker.
* @param {Buffer} data
* @returns {Boolean}
*/
Master.prototype.write = function write(data) {
if (utils.isBrowser) {
if (global.postMessage.length === 2) {
data.__proto__ = Uint8Array.prototype;
global.postMessage({ buf: data }, [data]);
} else {
global.postMessage(data.toString('hex'));
}
return true;
}
return process.stdout.write(data);
};
/**
* Frame and send a packet.
* @param {String} job
* @param {String} cmd
* @param {Array} items
* @returns {Boolean}
*/
Master.prototype.send = function send(job, cmd, items) {
return this.write(this.framer.packet(job, cmd, items));
};
/**
* Emit an event on the worker side.
* @param {String} event
* @param {...Object} arg
* @returns {Boolean}
*/
Master.prototype.sendEvent = function sendEvent() {
var items = new Array(arguments.length);
var i;
for (i = 0; i < items.length; i++)
items[i] = arguments[i];
return this.send(0, 'event', items);
};
/**
* Destroy the worker.
*/
Master.prototype.destroy = function destroy() {
if (utils.isBrowser)
return global.close();
return process.exit(0);
};
/**
* Write a message to stdout in the master process.
* @param {Object|String} obj
* @param {...String} args
*/
Master.prototype.log = function log() {
var items = new Array(arguments.length);
var i;
for (i = 0; i < items.length; i++)
items[i] = arguments[i];
this.sendEvent('log', items);
};
/**
* Listen for messages from master process (only if worker).
* @returns {Master}
*/
Master.listen = function listen() {
var master = new Master();
utils.log = master.log.bind(master);
utils.error = utils.log;
master.on('error', function(err) {
master.sendEvent('worker error', fromError(err));
});
master.on('packet', function(packet) {
var result;
if (packet.cmd === 'event') {
master.emit('event', packet.items);
master.emit.apply(master, packet.items);
return;
}
try {
result = jobs[packet.cmd].apply(jobs, packet.items);
} catch (e) {
master.send(packet.job, 'response', [fromError(e)]);
return;
}
master.send(packet.job, 'response', [null, result]);
});
jobs.master = master;
return master;
};
/*
* Helpers
*/
@ -837,6 +659,7 @@ Master.listen = function listen() {
function getCores() {
if (os.unsupported)
return 2;
return os.cpus().length;
}
@ -847,10 +670,6 @@ function toError(values) {
return err;
}
function fromError(err) {
return [err.message, err.stack + '', err.type];
}
/*
* Default
*/
@ -888,7 +707,5 @@ exports.set({
exports.WorkerPool = WorkerPool;
exports.Worker = Worker;
exports.Master = Master;
exports.Framer = Framer;
exports.Parser = Parser;
exports.listen = Master.listen;

View File

@ -12,9 +12,9 @@
"preferGlobal": true,
"scripts": {
"test": "mocha --reporter spec test/*-test.js",
"browserify": "browserify --im -o browser/bcoin.js lib/bcoin.js",
"uglify": "uglifyjs -m -o browser/bcoin.min.js browser/bcoin.js",
"clean": "rm browser/bcoin.js browser/bcoin.min.js",
"browserify": "browserify --im -o browser/bcoin.js lib/bcoin.js && browserify --im -o browser/bcoin-master.js lib/workers/master.js",
"uglify": "uglifyjs -m -o browser/bcoin.min.js browser/bcoin.js && uglifyjs -m -o browser/bcoin-master.min.js browser/bcoin-master.js",
"clean": "rm browser/bcoin.js browser/bcoin.min.js browser/bcoin-master.js browser/bcoin-master.min.js",
"lint": "jshint lib/ || exit 0"
},
"repository": "git://github.com/bcoin-org/bcoin.git",