From c543d205c03a5bfe542faac9dc71759fe920fb45 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Mon, 21 Mar 2016 21:06:29 -0700 Subject: [PATCH] workers parsing. --- lib/bcoin.js | 2 +- lib/bcoin/utils.js | 8 -- lib/bcoin/workers.js | 225 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 205 insertions(+), 30 deletions(-) diff --git a/lib/bcoin.js b/lib/bcoin.js index 1f396176..f7829ebc 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -24,7 +24,7 @@ bcoin.debugLogs = +process.env.BCOIN_DEBUG === 1; bcoin.debugFile = +process.env.BCOIN_DEBUGFILE !== 0; bcoin.profile = +process.env.BCOIN_PROFILE === 1; bcoin.fresh = +process.env.BCOIN_FRESH === 1; -bcoin.useWorkers = +process.env.BCOIN_WORKERS === 1; +bcoin.useWorkers = +process.env.BCOIN_WORKERS > 0; bcoin.ensurePrefix = function ensurePrefix() { if (bcoin.isBrowser) diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 358abeaa..47ca2ebd 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -1101,14 +1101,6 @@ utils.read32 = function read32(arr, off) { | (arr[off + 1] << 8) | (arr[off + 2] << 16) | (arr[off + 3] << 24); - - // arr = (arr[off]) - // | (arr[off + 1] << 8) - // | (arr[off + 2] << 16) - // | (arr[off + 3] << 24); - // if (arr & 0x80000000) - // arr = ~arr + 1; - // return arr; }; utils.read32BE = function read32BE(arr, off) { diff --git a/lib/bcoin/workers.js b/lib/bcoin/workers.js index f36c0d8c..d67290a7 100644 --- a/lib/bcoin/workers.js +++ b/lib/bcoin/workers.js @@ -20,7 +20,7 @@ var HEADER_SIZE = 12; * Master */ -workers.MAX_WORKERS = 30; +workers.MAX_WORKERS = +process.env.BCOIN_WORKERS || 30; workers.TIMEOUT = 10000; workers.children = {}; workers.uid = 0; @@ -53,8 +53,8 @@ workers.spawn = function spawn(index) { delete workers.children[index]; }); - child.stdout.on('data', parser(function(id, result) { - child.emit('completed ' + id, result[0], result[1]); + child.stdout.on('data', parser(function(id, body) { + child.emit('completed ' + id, body.items[0], body.items[1]); })); return child; @@ -93,7 +93,7 @@ workers.call = function call(method, args, callback) { return callback(new Error('Worker timed out.')); }, workers.TIMEOUT); - child.stdin.write(createPacket(id, [method, args])); + child.stdin.write(createPacket(id, method, args)); }; /** @@ -116,7 +116,7 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb // Important: we need to serialize // the coins for the worker. - tx = this.toExtended(true).toString('hex'); + tx = this.toExtended(true); return workers.call('verify', [tx, index, force, flags], callback); }; @@ -128,28 +128,26 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb workers.listen = function listen() { bcoin.debug = function debug() { process.stderr.write('Worker %s: ', process.env.BCOIN_WORKER); - console.error.apply(console.error, arguments); + return console.error.apply(console.error, arguments); }; utils.debug = bcoin.debug; utils.print = bcoin.debug; - process.stdin.on('data', parser(function(id, data) { - var method = data[0]; - var args = data[1]; + process.stdin.on('data', parser(function(id, body) { var res; try { - res = workers[method].apply(workers[method], args); + res = workers[body.name].apply(workers[body.name], body.items); } catch (e) { utils.debug(e.stack + ''); - return process.stdout.write(createPacket(id, [{ + return process.stdout.write(createPacket(id, null, [{ message: e.message, stack: e.stack + '' }])); } - return process.stdout.write(createPacket(id, [null, res])); + return process.stdout.write(createPacket(id, null, [null, res])); })); }; @@ -160,7 +158,7 @@ workers.listen = function listen() { */ workers.verify = function verify(tx, index, force, flags) { - tx = bcoin.tx.fromExtended(new Buffer(tx, 'hex'), true); + tx = bcoin.tx.fromExtended(tx, true); return tx.verify(index, force, flags); }; @@ -168,20 +166,204 @@ workers.verify = function verify(tx, index, force, flags) { * Helpers */ -function createPacket(id, json) { - var json = new Buffer(JSON.stringify(json), 'utf8'); - var p = new BufferWriter(); +/* +function createPacket(id, name, items) { + var p, i; + + for (i = 0; i < items.length; i++) { + if (Buffer.isBuffer(items[i])) + items[i] = [0, items[i].toString('hex')]; + } + + items = JSON.stringify({ name: name, items: items }); + items = new Buffer(items, 'utf8'); + + p = new BufferWriter(); p.writeU32(0xdeadbeef); p.writeU32(id); - p.writeU32(json.length + 1); - p.writeBytes(json); + p.writeU32(items.length + 1); + p.writeBytes(items); p.writeU8(0x0a); + return p.render(); } -function parsePacket(data) { - return JSON.parse(data.toString('utf8')); +function parseBody(data) { + data = JSON.parse(data.toString('utf8')); + for (var i = 0; i < data.items.length; i++) { + if (data.items[i] && data.items[i][0] === 0 && data.items[i].length === 2) + data.items[i] = new Buffer(data.items[i][1], 'hex'); + } + return data; } +*/ + +function createPacket(id, name, items) { + var p = new BufferWriter(); + var payload = createBody(name, items); + p.writeU32(0xdeadbeef); + p.writeU32(id); + p.writeU32(payload.length); + p.writeBytes(payload); + return p.render(); +} + +/* +function createBody(name, items) { + var p = new BufferWriter(); + var msg = []; + var count = 0; + var i; + + for (i = 0; i < items.length; i++) { + if (Buffer.isBuffer(items[i])) { + msg.push(0); + count++; + continue; + } + msg.push(items[i]); + } + + if (name) + p.writeVarString(name, 'ascii'); + else + p.writeUIntv(0); + + p.writeVarString(JSON.stringify(msg), 'utf8'); + p.writeUIntv(count); + + for (i = 0; i < items.length; i++) { + if (Buffer.isBuffer(items[i])) { + p.writeU8(i); + p.writeVarBytes(items[i]); + } + } + + p.writeU8(0x0a); + + return p.render(); +} + +function parseBody(data) { + var p = new BufferReader(data, true); + var name, items, bufferCount, i; + + p.start(); + + name = p.readVarString('ascii'); + items = JSON.parse(p.readVarString('utf8')); + bufferCount = p.readUIntv(); + + for (i = 0; i < bufferCount; i++) + items[p.readU8()] = p.readVarBytes(); + + assert(p.readU8() === 0x0a); + + p.end(); + + return { + name: name || null, + items: items + }; +} +*/ + +function createBody(name, items) { + var p = new BufferWriter(); + var msg = []; + var count = 0; + var i; + + if (name) + p.writeVarString(name, 'ascii'); + else + p.writeUIntv(0); + + p.writeUIntv(items.length); + + for (i = 0; i < items.length; i++) { + switch (typeof items[i]) { + case 'string': + p.writeU8(1); + p.writeVarString(items[i], 'utf8'); + break; + case 'number': + p.writeU8(2); + p.write32(items[i]); + break; + case 'boolean': + p.writeU8(3); + p.writeU8(items[i] ? 1 : 0); + break; + case 'object': + case 'undefined': + if (items[i] == null) { + p.writeU8(0); + } else if (Buffer.isBuffer(items[i])) { + p.writeU8(5); + p.writeVarBytes(items[i]); + } else { + p.writeU8(4); + p.writeVarString(JSON.stringify(items[i]), 'utf8'); + } + break; + default: + assert(false, 'Bad type: ' + (typeof items[i])); + } + } + + p.writeU8(0x0a); + + return p.render(); +} + +function parseBody(data) { + var p = new BufferReader(data, true); + var name, count, i; + var items = []; + + p.start(); + + name = p.readVarString('ascii'); + count = p.readUIntv(); + + for (i = 0; i < count; i++) { + switch (p.readU8()) { + case 0: + items.push(null); + break; + case 1: + items.push(p.readVarString('utf8')); + break; + case 2: + items.push(p.read32()); + break; + case 3: + items.push(p.readU8() === 1); + break; + case 4: + items.push(items.parse(p.readVarString('utf8'))); + break; + case 5: + items.push(p.readVarBytes()); + break; + } + } + + assert(p.readU8() === 0x0a); + + p.end(); + + return { + name: name || null, + items: items + }; +} + + + + + function parseHeader(data) { return { @@ -191,6 +373,7 @@ function parseHeader(data) { }; } + function parser(onPacket) { var waiting = HEADER_SIZE; var wait = 0; @@ -238,7 +421,7 @@ function parser(onPacket) { } try { - packet = parsePacket(packet); + packet = parseBody(packet); } catch (e) { utils.debug(e.stack + ''); return;