From a5ad8dcbfa4d7a4644a47cb21396bf35f790aa93 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Mon, 2 May 2016 19:47:41 -0700 Subject: [PATCH] refactor workers. add web workers. --- lib/bcoin.js | 6 + lib/bcoin/env.js | 9 + lib/bcoin/miner.js | 16 +- lib/bcoin/tx.js | 20 +- lib/bcoin/utils.js | 19 ++ lib/bcoin/wallet.js | 1 - lib/bcoin/worker.js | 24 ++ lib/bcoin/workers.js | 770 +++++++++++++++++++++++++++++++------------ 8 files changed, 655 insertions(+), 210 deletions(-) create mode 100644 lib/bcoin/worker.js diff --git a/lib/bcoin.js b/lib/bcoin.js index df7c3f23..d4e2b4be 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -8,6 +8,8 @@ */ var Environment = require('./bcoin/env'); +var utils = require('./bcoin/utils'); +var global = utils.global; var env = {}; /** @@ -35,5 +37,9 @@ function BCoin(options) { } BCoin.env = Environment; +BCoin.utils = utils; + +if (utils.isBrowser) + global.bcoin = BCoin; module.exports = BCoin; diff --git a/lib/bcoin/env.js b/lib/bcoin/env.js index 4e7addfc..18a64b9e 100644 --- a/lib/bcoin/env.js +++ b/lib/bcoin/env.js @@ -104,6 +104,7 @@ try { * @property {Function} http.server - {@link HTTPServer} constructor. * @property {Object} workers - See {@link module:workers}. * @property {TimeData} time - For adjusted time. + * @property {Workers?} workerPool - Default global worker pool. */ function Environment(options) { @@ -222,6 +223,14 @@ function Environment(options) { : null; this.time = new this.timedata(); + this.workerPool = null; + + if (this.workers) { + this.workerPool = new this.workers({ + size: this.maxWorkers, + timeout: this.workerTimeout + }); + } } /** diff --git a/lib/bcoin/miner.js b/lib/bcoin/miner.js index b0297ddb..3df17b7b 100644 --- a/lib/bcoin/miner.js +++ b/lib/bcoin/miner.js @@ -58,6 +58,15 @@ function Miner(options) { this.block = null; + this.workerPool = null; + + if (bcoin.useWorkers) { + this.workerPool = new bcoin.workers({ + size: 1, + timeout: -1 + }); + } + this._init(); } @@ -255,6 +264,7 @@ Miner.prototype.createBlock = function createBlock(version, callback) { return callback(err); attempt = new MinerBlock({ + workerPool: self.workerPool, tip: self.chain.tip, version: version, target: target, @@ -343,6 +353,7 @@ function MinerBlock(options) { return new MinerBlock(options); this.options = options; + this.workerPool = options.workerPool; this.tip = options.tip; this.height = options.tip.height + 1; this.target = utils.fromCompact(options.target).toBuffer('le', 32); @@ -620,7 +631,10 @@ MinerBlock.prototype.mineSync = function mineSync() { */ MinerBlock.prototype.mineAsync = function mine(callback) { - return this.mine(callback); + if (!this.workerPool) + return this.mine(callback); + + this.workerPool.mine(this, callback); }; /** diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index cfcdab7d..cf407063 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -611,17 +611,29 @@ TX.prototype.verify = function verify(index, force, flags) { */ TX.prototype.verifyAsync = function verifyAsync(index, force, flags, callback) { - var self = this; var res; - utils.nextTick(function() { + callback = utils.asyncify(callback); + + if (!bcoin.workerPool) { try { - res = self.verify(index, force, flags); + res = this.verify(index, force, flags); } catch (e) { return callback(e); } return callback(null, res); - }); + } + + if (!force && this.ts !== 0) + return callback(null, true); + + if (this.inputs.length === 0) + return callback(null, false); + + if (this.isCoinbase()) + return callback(null, true); + + bcoin.workerPool.verify(this, index, force, flags, callback); }; /** diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 633875fe..b1eab1b8 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -2738,3 +2738,22 @@ utils.binarySearch = function binarySearch(items, key, insert, compare) { return start - 1; }; + +/** + * Reference to the global object. + * @const {Object} + */ + +utils.global = (function() { + if (this) + return this; + + if (typeof window !== 'undefined') + return window; + + if (typeof self !== 'undefined') + return self; + + if (typeof global !== 'undefined') + return global; +})(); diff --git a/lib/bcoin/wallet.js b/lib/bcoin/wallet.js index 2fdc950c..550e89e3 100644 --- a/lib/bcoin/wallet.js +++ b/lib/bcoin/wallet.js @@ -556,7 +556,6 @@ Wallet.prototype.createPath = function createPath(cosignerIndex, change, index) + '/' + index; }; - /** * Parse a path. * @param {String} path diff --git a/lib/bcoin/worker.js b/lib/bcoin/worker.js new file mode 100644 index 00000000..fd22b766 --- /dev/null +++ b/lib/bcoin/worker.js @@ -0,0 +1,24 @@ +/*! + * worker.js - worker thread/process for bcoin + * Copyright (c) 2014-2015, Fedor Indutny (MIT License) + * Copyright (c) 2014-2016, Christopher Jeffrey (MIT License). + * https://github.com/indutny/bcoin + */ + +var penv, env; + +if (typeof importScripts !== 'undefined') { + self.importScripts('/bcoin.js'); + self.onmessage = function onmessage(event) { + self.onmessage = function() {}; + + penv = JSON.parse(event.data); + + env = self.bcoin.env(penv.BCOIN_WORKER_NETWORK); + env.workers.listen(+penv.BCOIN_WORKER_ID); + }; +} else { + penv = process.env; + env = require('./env')(penv.BCOIN_WORKER_NETWORK); + env.workers.listen(+penv.BCOIN_WORKER_ID); +} diff --git a/lib/bcoin/workers.js b/lib/bcoin/workers.js index ec074f63..a75eb923 100644 --- a/lib/bcoin/workers.js +++ b/lib/bcoin/workers.js @@ -1,44 +1,55 @@ /*! * workers.js - worker processes for bcoin * Copyright (c) 2014-2015, Fedor Indutny (MIT License) + * Copyright (c) 2014-2016, Christopher Jeffrey (MIT License). * https://github.com/indutny/bcoin */ module.exports = function(bcoin) { -/** - * @exports workers - */ - -var workers = {}; - +var EventEmitter = require('events').EventEmitter; var bn = require('bn.js'); var constants = bcoin.protocol.constants; +var network = bcoin.protocol.network; var utils = require('./utils'); +var global = utils.global; var assert = utils.assert; var BufferWriter = require('./writer'); var BufferReader = require('./reader'); -var cp = require('child_process'); -var HEADER_SIZE = 12; /** - * Max number of workers in the pool. - * @const {Number} - * @default + * A worker pool. + * @exports Workers + * @constructor + * @param {Object} options + * @param {Number} [options.size=num-cores] - Max pool size. + * @param {Number} [options.timeout=10000] - Execution timeout. + * @property {Number} size + * @property {Number} timeout + * @property {Object} children + * @property {Number} uid */ -workers.MAX_WORKERS = bcoin.maxWorkers || 6; +function Workers(options) { + if (!(this instanceof Workers)) + return new Workers(options); + + EventEmitter.call(this); + + this.uid = 0; + this.size = options.size || Workers.CORES; + this.timeout = options.timeout || 10000; + this.children = {}; +} + +utils.inherits(Workers, EventEmitter); /** - * Callback timeout. + * Number of CPUs/cores available. * @const {Number} - * @default */ -workers.TIMEOUT = bcoin.workerTimeout || 10000; - -workers.children = {}; -workers.uid = 0; +Workers.CORES = getCores(); /** * Spawn a new worker. @@ -46,55 +57,53 @@ workers.uid = 0; * @returns {ChildProcess} */ -workers.spawn = function spawn(index) { +Workers.prototype.spawn = function spawn(id) { + var self = this; var child; - bcoin.debug('Spawning worker process: %d', index); + bcoin.debug('Spawning worker process: %d', id); - child = cp.spawn(process.argv[0], [__filename], { - stdio: ['pipe', 'pipe', 'inherit'], - env: utils.merge({}, process.env, { - BCOIN_WORKER_ID: index + '', - BCOIN_WORKER_OPTIONS: JSON.stringify(bcoin.options) - }) - }); + child = new Worker(id); child.on('error', function(err) { - bcoin.debug('Worker %d error: %s', index, err.message); + bcoin.debug('Worker %d error: %s', child.id, err.message); }); child.on('exit', function(code) { - bcoin.debug('Worker %d exited: %s', index, code); - if (workers.children[index] === child) - delete workers.children[index]; + bcoin.debug('Worker %d exited: %s', child.id, code); + if (self.children[child.id] === child) + delete self.children[child.id]; }); - child.on('close', function() { - bcoin.debug('Worker %d closed', index); - if (workers.children[index] === child) - delete workers.children[index]; + child.on('packet', function(job, body) { + if (body.name === 'event') { + child.emit.apply(child, body.items); + self.emit.apply(self, body.items); + return; + } + if (body.name === 'response') { + child.emit('response ' + job, body.items[0], body.items[1]); + return; + } + self.emit('error', new Error('Unknown packet: ' + body.name)); }); - child.stdout.on('data', parser(function(id, body) { - child.emit('completed ' + id, body.items[0], body.items[1]); - })); - return child; }; /** - * Allocate a new worker, will not go above `MAX_WORKERS` + * Allocate a new worker, will not go above `size` option * and will automatically load balance the workers based * on job ID. - * @param {Number} id + * @param {Number} job * @returns {ChildProcess} */ -workers.alloc = function alloc(id) { - var index = id % workers.MAX_WORKERS; - if (!workers.children[index]) - workers.children[index] = workers.spawn(index); - return workers.children[index]; +Workers.prototype.alloc = function alloc(job) { + var id = job % this.size; + if (!this.children[id]) + this.children[id] = this.spawn(id); + return this.children[id]; }; /** @@ -105,94 +114,406 @@ workers.alloc = function alloc(id) { * the worker method specifies. */ -workers.call = function call(method, args, callback) { - var id = workers.uid++; - var event, child, timeout; +Workers.prototype.execute = function execute(method, args, timeout, callback) { + var job = this.uid; + var child; - if (id > 0xffffffff) { - workers.uid = 0; - id = workers.uid++; + if (job > 0xffffffff) { + this.uid = 0; + job = this.uid++; } - event = 'completed ' + id; - child = workers.alloc(id); + child = this.alloc(job); + + child.execute(job, method, args, timeout || this.timeout, callback); + + return child; +}; + +/** + * Execute the tx verification job (default timeout). + * @param {TX} tx + * @param {Number} index + * @param {Boolean} force + * @param {VerifyFlags} flags + * @param {Function} callback - Returns [Error, Boolean]. + */ + +Workers.prototype.verify = function verify(tx, index, force, flags, callback) { + return this.execute('verify', [tx, index, force, flags], null, callback); +}; + +/** + * Execute the mining job (no timeout). + * @param {MinerBlock} attempt + * @param {Function} callback - Returns [Error, {@link MinerBlock}]. + */ + +Workers.prototype.mine = function mine(attempt, callback) { + var data = { + tip: attempt.tip.toRaw(), + version: attempt.block.version, + target: attempt.block.bits, + address: attempt.options.address, + coinbaseFlags: attempt.options.coinbaseFlags, + witness: attempt.options.witness + }; + return this.execute('mine', [data], -1, callback); +}; + +/** + * Represents a worker. + * @exports Worker + * @constructor + * @param {Number} id - Worker ID. + * @property {Number} id + */ + +function Worker(id) { + var self = this; + var cp; + + if (!(this instanceof Worker)) + return new Worker(id); + + EventEmitter.call(this); + + this.id = id; + this.framer = new Framer(); + this.parser = new Parser(); + + if (bcoin.isBrowser) { + this.child = new global.Worker('/bcoin-worker.js'); + + this.child.onerror = function onerror(err) { + self.emit('error', err); + self.emit('exit', 1, null); + }; + + this.child.onmessage = function onmessage(event) { + var data; + if (typeof event.data !== 'string') { + data = event.data.buf; + data.__proto__ = Buffer.prototype; + } else { + data = new Buffer(event.data, 'hex'); + } + self.emit('data', data); + }; + + this.child.postMessage(JSON.stringify({ + BCOIN_WORKER_ID: id + '', + BCOIN_WORKER_NETWORK: network.type + })); + } else { + cp = require('child_' + 'process'); + + this.child = cp.spawn(process.argv[0], [__dirname + '/worker.js'], { + stdio: 'pipe', + env: utils.merge({}, process.env, { + BCOIN_WORKER_ID: id + '', + BCOIN_WORKER_NETWORK: network.type + }) + }); + + this.child.on('error', function(err) { + self.emit('error', err); + }); + + this.child.on('exit', function(code, signal) { + self.emit('exit', code == null ? -1 : code, signal); + }); + + this.child.on('close', function() { + self.emit('exit', -1, null); + }); + + this.child.stdout.on('data', function(data) { + self.emit('data', data); + }); + + this.child.stderr.setEncoding('utf8'); + this.child.stderr.on('data', function(data) { + bcoin.debug(data); + }); + } + + this.on('data', function(data) { + self.parser.feed(data); + }); + + this.parser.on('error', function(e) { + self.emit('error', e); + }); + + this.parser.on('packet', function(job, body) { + self.emit('packet', job, body); + }); +} + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Worker.prototype.write = function write(data) { + if (bcoin.isBrowser) { + if (this.child.postMessage.length === 2) { + data.__proto__ = Uint8Array.prototype; + this.child.postMessage({ buf: data }, [data]); + } else { + this.child.postMessage(data.toString('hex')); + } + return true; + } + return this.child.stdin.write(data); +}; + +/** + * Frame and send a packet. + * @param {String} job + * @param {String} name + * @param {Array} items + */ + +Worker.prototype.send = function send(job, name, items) { + return this.write(this.framer.packet(job, name, items)); +}; + +/** + * Emit an event on the worker side. + * @param {String} event + * @param {...Object} arg + */ + +Worker.prototype.sendEvent = function sendEvent() { + var items = Array.prototype.slice.call(arguments); + return this.write(this.framer.packet(0, 'event', items)); +}; + +/** + * Destroy the worker. + */ + +Worker.prototype.destroy = function destroy() { + if (utils.isBrowser) { + this.child.terminate(); + this.emit('exit', -1, 'SIGTERM'); + return; + } + return this.child.kill('SIGTERM'); +}; + +/** + * Call a method for a worker to execute. + * @param {Number} job - Job ID. + * @param {String} method - Method name. + * @param {Array} args - Arguments. + * @param {Function} callback - Returns whatever + * the worker method specifies. + */ + +Worker.prototype.execute = function execute(job, method, args, timeout, callback) { + var self = this; + var event = 'response ' + job; + var timer; + + assert(job <= 0xffffffff); function listener(err, result) { - if (timeout) { - clearTimeout(timeout); - timeout = null; + if (timer) { + clearTimeout(timer); + timer = null; } + self.removeListener('error', listener); + self.removeListener(event, listener); callback(err, result); } - child.once(event, listener); + this.once(event, listener); + this.once('error', listener); - if (method !== 'mine') { - timeout = setTimeout(function() { - child.removeListener(event, listener); - return callback(new Error('Worker timed out.')); - }, workers.TIMEOUT); + if (timeout !== -1) { + timer = setTimeout(function() { + self.removeListener(event, listener); + callback(new Error('Worker timed out.')); + }, timeout); } - child.stdin.write(createPacket(id, method, args)); + this.send(job, method, args); }; -bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callback) { - callback = utils.asyncify(callback); +utils.inherits(Worker, EventEmitter); - if (!force && this.ts !== 0) - return callback(null, true); +/** + * Represents the master process. + * @exports Master + * @constructor + * @param {Number} id - Worker ID. + * @property {Number} id + */ - if (this.inputs.length === 0) - return callback(null, false); +function Master(id) { + var self = this; - if (this.isCoinbase()) - return callback(null, true); + if (!(this instanceof Master)) + return new Master(id); - return workers.call('verify', [this, index, force, flags], callback); + EventEmitter.call(this); + + this.id = id; + this.framer = new Framer(); + this.parser = new Parser(); + + if (bcoin.isBrowser) { + global.onerror = function onerror(err) { + self.emit('error', err); + }; + global.onmessage = function onmessage(event) { + var data; + if (typeof event.data !== 'string') { + data = event.data.buf; + data.__proto__ = Buffer.prototype; + } else { + data = new Buffer(event.data, 'hex'); + } + self.emit('data', data); + }; + } else { + process.stdin.on('data', function(data) { + self.emit('data', data); + }); + } + + this.on('data', function(data) { + self.parser.feed(data); + }); + + this.parser.on('error', function(e) { + self.emit('error', e); + }); + + this.parser.on('packet', function(job, body) { + self.emit('packet', job, body); + }); +} + +utils.inherits(Master, EventEmitter); + +/** + * Send data to worker. + * @param {Buffer} data + * @returns {Boolean} + */ + +Master.prototype.write = function write(data) { + if (bcoin.isBrowser) { + if (global.postMessage.length === 2) { + data.__proto__ = Uint8Array.prototype; + global.postMessage({ buf: data }, [data]); + } else { + global.postMessage(data.toString('hex')); + } + return true; + } + return process.stdout.write(data); }; -bcoin.miner.minerblock.prototype.mineAsync = function mineAsync(callback) { - var attempt = { - tip: this.tip.toRaw(), - version: this.block.version, - target: this.block.bits, - address: this.options.address, - coinbaseFlags: this.options.coinbaseFlags, - witness: this.options.witness - }; - return workers.call('mine', [attempt], callback); +/** + * Frame and send a packet. + * @param {String} job + * @param {String} name + * @param {Array} items + */ + +Master.prototype.send = function send(job, name, items) { + return this.write(this.framer.packet(job, name, items)); +}; + +/** + * Emit an event on the worker side. + * @param {String} event + * @param {...Object} arg + */ + +Master.prototype.sendEvent = function sendEvent() { + var items = Array.prototype.slice.call(arguments); + return this.write(this.framer.packet(0, 'event', items)); +}; + +/** + * Write a debug message, prefixing + * it with the worker ID. + * @param {...String} args + */ + +Master.prototype.log = function log() { + if (bcoin.isBrowser) + return console.error.apply(console.error, arguments); + process.stderr.write('Worker ' + this.id + ': '); + return console.error.apply(console.error, arguments); +}; + +/** + * Destroy the worker. + */ + +Master.prototype.destroy = function destroy() { + if (bcoin.isBrowser) + return global.close(); + return process.exit(0); }; /** * Listen for messages from master process (only if worker). + * @param {Number} id - Worker id. + * @returns {Master} */ -workers.listen = function listen() { - bcoin.debug = function debug() { - process.stderr.write('Worker ' + process.env.BCOIN_WORKER_ID + ': '); - return console.error.apply(console.error, arguments); - }; +Master.listen = function listen(id) { + var master = new Master(id); + var log = master.log.bind(master); - utils.print = bcoin.debug; + bcoin.debug = log; + utils.print = log; + utils.error = log; - process.stdin.on('data', parser(function(id, body) { + master.on('error', function(err) { + bcoin.debug('Master error: %s', err.message); + }); + + master.on('packet', function(job, body) { var res; - try { - res = workers[body.name].apply(workers[body.name], body.items); - } catch (e) { - bcoin.debug(e.stack + ''); - return process.stdout.write(createPacket(id, null, [{ - message: e.message, - stack: e.stack + '' - }])); + if (body.name === 'event') { + master.emit.apply(master, body.items); + return; } - return process.stdout.write(createPacket(id, null, [null, res])); - })); + try { + res = jobs[body.name].apply(jobs, body.items); + } catch (e) { + bcoin.debug(e.stack + ''); + return master.send(job, 'response', [{ + message: e.message, + stack: e.stack + '' + }]); + } + + return master.send(job, 'response', [null, res]); + }); + + return master; }; +/** + * Jobs to execute within the worker. + * @const {Object} + */ + +var jobs = {}; + /** * Execute tx.verify() on worker. * @see TX#verify @@ -203,24 +524,24 @@ workers.listen = function listen() { * @returns {Boolean} */ -workers.verify = function verify(tx, index, force, flags) { +jobs.verify = function verify(tx, index, force, flags) { return tx.verify(index, force, flags); }; /** * Mine a block on worker. - * @param {Object} attempt - Naked minerblock. + * @param {Object} attempt - Naked {@link MinerBlock}. * @returns {Block} */ -workers.mine = function mine(attempt) { - attempt = new bcoin.miner.minerblock({ - tip: bcoin.chainblock.fromRaw(null, attempt.tip), - version: attempt.version, - target: attempt.target, - address: attempt.address, - coinbaseFlags: attempt.coinbaseFlags, - witness: attempt.witness, +jobs.mine = function mine(data) { + var attempt = new bcoin.miner.minerblock({ + tip: bcoin.chainblock.fromRaw(null, data.tip), + version: data.version, + target: data.target, + address: data.address, + coinbaseFlags: data.coinbaseFlags, + witness: data.witness, dsha256: utils.dsha256 }); attempt.on('status', function(stat) { @@ -235,17 +556,31 @@ workers.mine = function mine(attempt) { return attempt.mineSync(); }; -function createPacket(id, name, items) { +/** + * Framer + * @constructor + */ + +function Framer() { + if (!(this instanceof Framer)) + return new Framer(); + + EventEmitter.call(this); +} + +utils.inherits(Framer, EventEmitter); + +Framer.prototype.packet = function packet(job, name, items) { var p = new BufferWriter(); - var payload = createBody(name, items); + var payload = this.body(name, items); p.writeU32(0xdeadbeef); - p.writeU32(id); + p.writeU32(job); p.writeU32(payload.length); p.writeBytes(payload); return p.render(); } -function createBody(name, items) { +Framer.prototype.body = function body(name, items) { var p = new BufferWriter(); if (name) @@ -253,14 +588,14 @@ function createBody(name, items) { else p.writeVarint(0); - frameItem(items, p); + Framer.item(items, p); p.writeU8(0x0a); return p.render(); -} +}; -function frameItem(item, p) { +Framer.item = function _item(item, p) { var i, keys; switch (typeof item) { @@ -300,14 +635,14 @@ function frameItem(item, p) { p.writeU8(5); p.writeVarint(item.length); for (i = 0; i < item.length; i++) - frameItem(item[i], p); + Framer.item(item[i], p); } else { keys = Object.keys(item); p.writeU8(6); p.writeVarint(keys.length); for (i = 0; i < keys.length; i++) { p.writeVarString(keys[i], 'utf8'); - frameItem(item[keys[i]], p); + Framer.item(item[keys[i]], p); } } } @@ -315,14 +650,96 @@ function frameItem(item, p) { default: assert(false, 'Bad type: ' + typeof item); } +}; + +/** + * Parser + * @constructor + */ + +function Parser() { + if (!(this instanceof Parser)) + return new Parser(); + + EventEmitter.call(this); + + this.waiting = 12; + this.header = null; + this.pending = []; + this.pendingTotal = 0; } -function parseBody(data) { +utils.inherits(Parser, EventEmitter); + +Parser.prototype.feed = function feed(data) { + var chunk, header, body, rest; + + this.pendingTotal += data.length; + this.pending.push(data); + + if (this.pendingTotal < this.waiting) + return; + + chunk = Buffer.concat(this.pending); + + if (chunk.length > this.waiting) { + rest = chunk.slice(this.waiting); + chunk = chunk.slice(0, this.waiting); + } + + if (!this.header) { + this.header = this.parseHeader(chunk); + this.waiting = this.header.size; + this.pending.length = 0; + this.pendingTotal = 0; + + if (this.header.magic !== 0xdeadbeef) { + this.header = null; + this.waiting = 12; + this.emit('error', new Error('Bad magic number.')); + return; + } + + if (rest) + this.feed(rest); + + return; + } + + header = this.header; + + this.pending.length = 0; + this.pendingTotal = 0; + this.waiting = 12; + this.header = null; + + try { + body = this.parseBody(chunk); + } catch (e) { + this.emit('error', e); + return; + } + + this.emit('packet', header.job, body); + + if (rest) + this.feed(rest); +}; + +Parser.prototype.parseHeader = function parseHeader(data) { + return { + magic: utils.readU32(data, 0), + job: utils.readU32(data, 4), + size: utils.readU32(data, 8) + }; +}; + +Parser.prototype.parseBody = function parseBody(data) { var p = new BufferReader(data, true); var name, items; name = p.readVarString('ascii'); - items = parseItem(p); + items = Parser.parseItem(p); assert(p.readU8() === 0x0a); @@ -330,9 +747,9 @@ function parseBody(data) { name: name || null, items: items }; -} +}; -function parseItem(p) { +Parser.parseItem = function parseItem(p) { var i, count, items; switch (p.readU8()) { @@ -350,13 +767,13 @@ function parseItem(p) { items = []; count = p.readVarint(); for (i = 0; i < count; i++) - items.push(parseItem(p)); + items.push(Parser.parseItem(p)); return items; case 6: items = {}; count = p.readVarint(); for (i = 0; i < count; i++) - items[p.readVarString('utf8')] = parseItem(p); + items[p.readVarString('utf8')] = Parser.parseItem(p); return items; case 40: return bcoin.block.fromRaw(p.readVarBytes()); @@ -369,85 +786,30 @@ function parseItem(p) { default: assert(false, 'Bad type.'); } -} - -function parseHeader(data) { - return { - magic: utils.readU32(data, 0), - id: utils.readU32(data, 4), - size: utils.readU32(data, 8) - }; -} - -function parser(onPacket) { - var waiting = HEADER_SIZE; - var wait = 0; - var buf = []; - var read = 0; - var header = null; - - return function parse(data) { - var packet, rest; - - read += data.length; - buf.push(data); - - if (read < waiting) - return; - - buf = Buffer.concat(buf); - - if (buf.length > waiting) { - packet = buf.slice(0, waiting); - rest = buf.slice(waiting); - } else { - packet = buf; - } - - if (!header) { - header = parseHeader(packet); - - if (header.magic !== 0xdeadbeef) { - buf = []; - waiting = HEADER_SIZE; - read = 0; - bcoin.debug('Bad magic number: %d', header.magic); - return; - } - - buf = []; - waiting = header.size; - read = 0; - - if (rest) - parse(rest); - - return; - } - - try { - packet = parseBody(packet); - } catch (e) { - bcoin.debug(e.stack + ''); - return; - } - - onPacket(header.id, packet); - - buf = []; - waiting = HEADER_SIZE; - read = 0; - header = null; - - if (rest) - parse(rest); - }; -} - -return workers; }; -if (process.env.BCOIN_WORKER_ID) { - var env = require('./env')(JSON.parse(process.env.BCOIN_WORKER_OPTIONS)); - env.workers.listen(); +/** + * Helper to retrieve number of cores. + * @returns {Number} + */ + +function getCores() { + var os; + + if (utils.isBrowser) + return 4; + + os = require('o' + 's'); + + return os.cpus().length; } + +Workers.workers = Workers; +Workers.worker = Worker; +Workers.master = Master; +Workers.framer = Framer; +Workers.parser = Parser; +Workers.listen = Master.listen; + +return Workers; +};