workers parsing.

This commit is contained in:
Christopher Jeffrey 2016-03-21 21:06:29 -07:00
parent b792976586
commit c543d205c0
3 changed files with 205 additions and 30 deletions

View File

@ -24,7 +24,7 @@ bcoin.debugLogs = +process.env.BCOIN_DEBUG === 1;
bcoin.debugFile = +process.env.BCOIN_DEBUGFILE !== 0;
bcoin.profile = +process.env.BCOIN_PROFILE === 1;
bcoin.fresh = +process.env.BCOIN_FRESH === 1;
bcoin.useWorkers = +process.env.BCOIN_WORKERS === 1;
bcoin.useWorkers = +process.env.BCOIN_WORKERS > 0;
bcoin.ensurePrefix = function ensurePrefix() {
if (bcoin.isBrowser)

View File

@ -1101,14 +1101,6 @@ utils.read32 = function read32(arr, off) {
| (arr[off + 1] << 8)
| (arr[off + 2] << 16)
| (arr[off + 3] << 24);
// arr = (arr[off])
// | (arr[off + 1] << 8)
// | (arr[off + 2] << 16)
// | (arr[off + 3] << 24);
// if (arr & 0x80000000)
// arr = ~arr + 1;
// return arr;
};
utils.read32BE = function read32BE(arr, off) {

View File

@ -20,7 +20,7 @@ var HEADER_SIZE = 12;
* Master
*/
workers.MAX_WORKERS = 30;
workers.MAX_WORKERS = +process.env.BCOIN_WORKERS || 30;
workers.TIMEOUT = 10000;
workers.children = {};
workers.uid = 0;
@ -53,8 +53,8 @@ workers.spawn = function spawn(index) {
delete workers.children[index];
});
child.stdout.on('data', parser(function(id, result) {
child.emit('completed ' + id, result[0], result[1]);
child.stdout.on('data', parser(function(id, body) {
child.emit('completed ' + id, body.items[0], body.items[1]);
}));
return child;
@ -93,7 +93,7 @@ workers.call = function call(method, args, callback) {
return callback(new Error('Worker timed out.'));
}, workers.TIMEOUT);
child.stdin.write(createPacket(id, [method, args]));
child.stdin.write(createPacket(id, method, args));
};
/**
@ -116,7 +116,7 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb
// Important: we need to serialize
// the coins for the worker.
tx = this.toExtended(true).toString('hex');
tx = this.toExtended(true);
return workers.call('verify', [tx, index, force, flags], callback);
};
@ -128,28 +128,26 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb
workers.listen = function listen() {
bcoin.debug = function debug() {
process.stderr.write('Worker %s: ', process.env.BCOIN_WORKER);
console.error.apply(console.error, arguments);
return console.error.apply(console.error, arguments);
};
utils.debug = bcoin.debug;
utils.print = bcoin.debug;
process.stdin.on('data', parser(function(id, data) {
var method = data[0];
var args = data[1];
process.stdin.on('data', parser(function(id, body) {
var res;
try {
res = workers[method].apply(workers[method], args);
res = workers[body.name].apply(workers[body.name], body.items);
} catch (e) {
utils.debug(e.stack + '');
return process.stdout.write(createPacket(id, [{
return process.stdout.write(createPacket(id, null, [{
message: e.message,
stack: e.stack + ''
}]));
}
return process.stdout.write(createPacket(id, [null, res]));
return process.stdout.write(createPacket(id, null, [null, res]));
}));
};
@ -160,7 +158,7 @@ workers.listen = function listen() {
*/
workers.verify = function verify(tx, index, force, flags) {
tx = bcoin.tx.fromExtended(new Buffer(tx, 'hex'), true);
tx = bcoin.tx.fromExtended(tx, true);
return tx.verify(index, force, flags);
};
@ -168,20 +166,204 @@ workers.verify = function verify(tx, index, force, flags) {
* Helpers
*/
function createPacket(id, json) {
var json = new Buffer(JSON.stringify(json), 'utf8');
var p = new BufferWriter();
/*
function createPacket(id, name, items) {
var p, i;
for (i = 0; i < items.length; i++) {
if (Buffer.isBuffer(items[i]))
items[i] = [0, items[i].toString('hex')];
}
items = JSON.stringify({ name: name, items: items });
items = new Buffer(items, 'utf8');
p = new BufferWriter();
p.writeU32(0xdeadbeef);
p.writeU32(id);
p.writeU32(json.length + 1);
p.writeBytes(json);
p.writeU32(items.length + 1);
p.writeBytes(items);
p.writeU8(0x0a);
return p.render();
}
function parsePacket(data) {
return JSON.parse(data.toString('utf8'));
function parseBody(data) {
data = JSON.parse(data.toString('utf8'));
for (var i = 0; i < data.items.length; i++) {
if (data.items[i] && data.items[i][0] === 0 && data.items[i].length === 2)
data.items[i] = new Buffer(data.items[i][1], 'hex');
}
return data;
}
*/
function createPacket(id, name, items) {
var p = new BufferWriter();
var payload = createBody(name, items);
p.writeU32(0xdeadbeef);
p.writeU32(id);
p.writeU32(payload.length);
p.writeBytes(payload);
return p.render();
}
/*
function createBody(name, items) {
var p = new BufferWriter();
var msg = [];
var count = 0;
var i;
for (i = 0; i < items.length; i++) {
if (Buffer.isBuffer(items[i])) {
msg.push(0);
count++;
continue;
}
msg.push(items[i]);
}
if (name)
p.writeVarString(name, 'ascii');
else
p.writeUIntv(0);
p.writeVarString(JSON.stringify(msg), 'utf8');
p.writeUIntv(count);
for (i = 0; i < items.length; i++) {
if (Buffer.isBuffer(items[i])) {
p.writeU8(i);
p.writeVarBytes(items[i]);
}
}
p.writeU8(0x0a);
return p.render();
}
function parseBody(data) {
var p = new BufferReader(data, true);
var name, items, bufferCount, i;
p.start();
name = p.readVarString('ascii');
items = JSON.parse(p.readVarString('utf8'));
bufferCount = p.readUIntv();
for (i = 0; i < bufferCount; i++)
items[p.readU8()] = p.readVarBytes();
assert(p.readU8() === 0x0a);
p.end();
return {
name: name || null,
items: items
};
}
*/
function createBody(name, items) {
var p = new BufferWriter();
var msg = [];
var count = 0;
var i;
if (name)
p.writeVarString(name, 'ascii');
else
p.writeUIntv(0);
p.writeUIntv(items.length);
for (i = 0; i < items.length; i++) {
switch (typeof items[i]) {
case 'string':
p.writeU8(1);
p.writeVarString(items[i], 'utf8');
break;
case 'number':
p.writeU8(2);
p.write32(items[i]);
break;
case 'boolean':
p.writeU8(3);
p.writeU8(items[i] ? 1 : 0);
break;
case 'object':
case 'undefined':
if (items[i] == null) {
p.writeU8(0);
} else if (Buffer.isBuffer(items[i])) {
p.writeU8(5);
p.writeVarBytes(items[i]);
} else {
p.writeU8(4);
p.writeVarString(JSON.stringify(items[i]), 'utf8');
}
break;
default:
assert(false, 'Bad type: ' + (typeof items[i]));
}
}
p.writeU8(0x0a);
return p.render();
}
function parseBody(data) {
var p = new BufferReader(data, true);
var name, count, i;
var items = [];
p.start();
name = p.readVarString('ascii');
count = p.readUIntv();
for (i = 0; i < count; i++) {
switch (p.readU8()) {
case 0:
items.push(null);
break;
case 1:
items.push(p.readVarString('utf8'));
break;
case 2:
items.push(p.read32());
break;
case 3:
items.push(p.readU8() === 1);
break;
case 4:
items.push(items.parse(p.readVarString('utf8')));
break;
case 5:
items.push(p.readVarBytes());
break;
}
}
assert(p.readU8() === 0x0a);
p.end();
return {
name: name || null,
items: items
};
}
function parseHeader(data) {
return {
@ -191,6 +373,7 @@ function parseHeader(data) {
};
}
function parser(onPacket) {
var waiting = HEADER_SIZE;
var wait = 0;
@ -238,7 +421,7 @@ function parser(onPacket) {
}
try {
packet = parsePacket(packet);
packet = parseBody(packet);
} catch (e) {
utils.debug(e.stack + '');
return;