workers: improve protocol.

This commit is contained in:
Christopher Jeffrey 2016-12-02 11:06:01 -08:00
parent 89f478bba1
commit a4d30f00a8
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
8 changed files with 1178 additions and 423 deletions

View File

@ -8,13 +8,8 @@
'use strict'; 'use strict';
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var BN = require('bn.js');
var util = require('../utils/util'); var util = require('../utils/util');
var assert = require('assert');
var BufferWriter = require('../utils/writer'); var BufferWriter = require('../utils/writer');
var KeyRing = require('../primitives/keyring');
var Script = require('../script/script');
var Witness = require('../script/witness');
/** /**
* Framer * Framer
@ -30,114 +25,24 @@ function Framer() {
util.inherits(Framer, EventEmitter); util.inherits(Framer, EventEmitter);
Framer.prototype.packet = function packet(job, cmd, items) { Framer.prototype.packet = function _packet(packet) {
var payload = this.body(items); var bw = new BufferWriter();
var packet = new Buffer(25 + payload.length + 1); var data;
assert(cmd.length < 12); bw.writeU32(packet.id);
assert(payload.length <= 0xffffffff); bw.writeU8(packet.cmd);
bw.seek(4);
packet.writeUInt32LE(0xdeadbeef, 0, true); packet.toRaw(bw);
packet.writeUInt32LE(job, 4, true);
packet.writeUInt8(cmd.length, 8, true);
packet.write(cmd, 9, 'ascii');
packet.writeUInt32LE(payload.length, 21, true);
payload.copy(packet, 25);
packet[packet.length - 1] = 0x0a;
return packet; bw.writeU8(0x0a);
data = bw.render();
data.writeUInt32LE(data.length - 10, 5, true);
return data;
}; };
Framer.prototype.body = function body(items) {
return Framer.item(items);
};
Framer.item = function _item(item, writer) {
var bw = BufferWriter(writer);
var i, keys;
switch (typeof item) {
case 'string':
bw.writeU8(1);
bw.writeVarString(item, 'utf8');
break;
case 'number':
if (item > 0x7fffffff) {
bw.writeU8(3);
bw.writeU32(item);
} else {
bw.writeU8(2);
bw.write32(item);
}
break;
case 'boolean':
bw.writeU8(4);
bw.writeU8(item ? 1 : 0);
break;
case 'object':
case 'undefined':
if (item == null) {
bw.writeU8(0);
break;
}
if (item instanceof Script) {
bw.writeU8(40);
bw.writeVarBytes(item.toRaw());
} else if (item instanceof Witness) {
bw.writeU8(41);
item.toRaw(bw);
} else if (isMTX(item)) {
bw.writeU8(42);
item.toExtended(true, bw);
} else if (isTX(item)) {
bw.writeU8(43);
item.toExtended(true, bw);
} else if (item instanceof KeyRing) {
bw.writeU8(44);
item.toRaw(bw);
} else if (BN.isBN(item)) {
bw.writeU8(10);
bw.writeVarBytes(item.toArrayLike(Buffer));
} else if (Buffer.isBuffer(item)) {
bw.writeU8(5);
bw.writeVarBytes(item);
} else if (Array.isArray(item)) {
bw.writeU8(6);
bw.writeVarint(item.length);
for (i = 0; i < item.length; i++)
Framer.item(item[i], bw);
} else {
keys = Object.keys(item);
bw.writeU8(7);
bw.writeVarint(keys.length);
for (i = 0; i < keys.length; i++) {
bw.writeVarString(keys[i], 'utf8');
Framer.item(item[keys[i]], bw);
}
}
break;
default:
throw new Error('Bad type.');
}
if (!writer)
bw = bw.render();
return bw;
};
/*
* Helpers
*/
function isTX(tx) {
return tx && tx.witnessHash && tx.mutable === false;
}
function isMTX(tx) {
return tx && tx.witnessHash && tx.mutable === true;
}
/* /*
* Expose * Expose
*/ */

View File

@ -9,6 +9,7 @@
var ec = require('../crypto/ec'); var ec = require('../crypto/ec');
var scrypt = require('../crypto/scrypt'); var scrypt = require('../crypto/scrypt');
var mine = require('../mining/mine'); var mine = require('../mining/mine');
var packets = require('./packets');
/** /**
* Jobs to execute within the worker. * Jobs to execute within the worker.
@ -26,26 +27,42 @@ var jobs = exports;
* @throws on unknown command * @throws on unknown command
*/ */
jobs.execute = function execute(cmd, args) { jobs.execute = function execute(p) {
switch (cmd) { try {
case 'verify': return jobs._execute(p);
return jobs.verify(args[0], args[1]); } catch (e) {
case 'verifyInput': return new packets.ErrorResultPacket(e);
return jobs.verifyInput(args[0], args[1], args[2]); }
case 'sign': };
return jobs.sign(args[0], args[1], args[2]);
case 'signInput': /**
return jobs.signInput(args[0], args[1], args[2], args[3]); * Execute a job on the worker.
case 'ecVerify': * @param {String} cmd
return jobs.ecVerify(args[0], args[1], args[2]); * @param {Array} args
case 'ecSign': * @returns {Object}
return jobs.ecSign(args[0], args[1]); * @throws on unknown command
case 'mine': */
return jobs.mine(args[0], args[1], args[2], args[3]);
case 'scrypt': jobs._execute = function execute(p) {
return jobs.scrypt(args[0], args[1], args[2], args[3], args[4], args[5]); switch (p.cmd) {
case packets.types.VERIFY:
return jobs.verify(p.tx, p.flags);
case packets.types.VERIFYINPUT:
return jobs.verifyInput(p.tx, p.index, p.flags);
case packets.types.SIGN:
return jobs.sign(p.tx, p.rings, p.type);
case packets.types.SIGNINPUT:
return jobs.signInput(p.tx, p.index, p.rings, p.type);
case packets.types.ECVERIFY:
return jobs.ecVerify(p.msg, p.sig, p.key);
case packets.types.ECSIGN:
return jobs.ecSign(p.msg, p.key);
case packets.types.MINE:
return jobs.mine(p.data, p.target, p.min, p.max);
case packets.types.SCRYPT:
return jobs.scrypt(p.passwd, p.salt, p.N, p.r, p.p, p.len);
default: default:
throw new Error('Unknown command: "' + cmd + '".'); throw new Error('Unknown command: "' + p.cmd + '".');
} }
}; };
@ -58,7 +75,8 @@ jobs.execute = function execute(cmd, args) {
*/ */
jobs.verify = function verify(tx, flags) { jobs.verify = function verify(tx, flags) {
return tx.verify(flags); var result = tx.verify(flags);
return new packets.VerifyResultPacket(result);
}; };
/** /**
@ -71,7 +89,8 @@ jobs.verify = function verify(tx, flags) {
*/ */
jobs.verifyInput = function verifyInput(tx, index, flags) { jobs.verifyInput = function verifyInput(tx, index, flags) {
return tx.verifyInput(index, flags); var result = tx.verifyInput(index, flags);
return new packets.VerifyInputResultPacket(result);
}; };
/** /**
@ -84,15 +103,7 @@ jobs.verifyInput = function verifyInput(tx, index, flags) {
jobs.sign = function sign(tx, ring, type) { jobs.sign = function sign(tx, ring, type) {
var total = tx.sign(ring, type); var total = tx.sign(ring, type);
var sigs = []; return packets.SignResultPacket.fromTX(tx, total);
var i, input;
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
sigs.push([input.script, input.witness]);
}
return [sigs, total];
}; };
/** /**
@ -106,12 +117,7 @@ jobs.sign = function sign(tx, ring, type) {
jobs.signInput = function signInput(tx, index, ring, type) { jobs.signInput = function signInput(tx, index, ring, type) {
var result = tx.signInput(tx, index, ring, type); var result = tx.signInput(tx, index, ring, type);
var input = tx.inputs[index]; return packets.SignInputResultPacket.fromTX(tx, index, result);
if (!result)
return null;
return [input.script, input.witness];
}; };
/** /**
@ -123,7 +129,8 @@ jobs.signInput = function signInput(tx, index, ring, type) {
*/ */
jobs.ecVerify = function ecVerify(msg, sig, key) { jobs.ecVerify = function ecVerify(msg, sig, key) {
return ec.verify(msg, sig, key); var result = ec.verify(msg, sig, key);
return new packets.ECVerifyResultPacket(result);
}; };
/** /**
@ -136,7 +143,8 @@ jobs.ecVerify = function ecVerify(msg, sig, key) {
*/ */
jobs.ecSign = function ecSign(msg, key) { jobs.ecSign = function ecSign(msg, key) {
return ec.sign(msg, key); var sig = ec.sign(msg, key);
return new packets.ECSignResultPacket(sig);
}; };
/** /**
@ -149,7 +157,8 @@ jobs.ecSign = function ecSign(msg, key) {
*/ */
jobs.mine = function _mine(data, target, min, max) { jobs.mine = function _mine(data, target, min, max) {
return mine(data, target, min, max); var nonce = mine(data, target, min, max);
return new packets.MineResultPacket(nonce);
}; };
/** /**
@ -165,5 +174,6 @@ jobs.mine = function _mine(data, target, min, max) {
*/ */
jobs.scrypt = function _scrypt(passwd, salt, N, r, p, len) { jobs.scrypt = function _scrypt(passwd, salt, N, r, p, len) {
return scrypt(passwd, salt, N, r, p, len); var key = scrypt(passwd, salt, N, r, p, len);
return new packets.ScryptResultPacket(key);
}; };

View File

@ -14,6 +14,7 @@ var Network = require('../protocol/network');
var jobs = require('./jobs'); var jobs = require('./jobs');
var Parser = require('./parser-client'); var Parser = require('./parser-client');
var Framer = require('./framer'); var Framer = require('./framer');
var packets = require('./packets');
var global = util.global; var global = util.global;
var server; var server;
@ -112,14 +113,12 @@ Master.prototype.write = function write(data) {
/** /**
* Frame and send a packet. * Frame and send a packet.
* @param {String} job * @param {Packet} packet
* @param {String} cmd
* @param {Array} items
* @returns {Boolean} * @returns {Boolean}
*/ */
Master.prototype.send = function send(job, cmd, items) { Master.prototype.send = function send(packet) {
return this.write(this.framer.packet(job, cmd, items)); return this.write(this.framer.packet(packet));
}; };
/** /**
@ -136,7 +135,7 @@ Master.prototype.sendEvent = function sendEvent() {
for (i = 0; i < items.length; i++) for (i = 0; i < items.length; i++)
items[i] = arguments[i]; items[i] = arguments[i];
return this.send(0, 'event', items); return this.send(new packets.EventPacket(items));
}; };
/** /**
@ -162,7 +161,7 @@ Master.prototype.log = function log() {
for (i = 0; i < items.length; i++) for (i = 0; i < items.length; i++)
items[i] = arguments[i]; items[i] = arguments[i];
this.sendEvent('log', items); this.send(new packets.LogPacket(items));
}; };
/** /**
@ -180,37 +179,29 @@ Master.prototype.listen = function listen(env) {
util.error = util.log; util.error = util.log;
this.on('error', function(err) { this.on('error', function(err) {
self.sendEvent('worker error', fromError(err)); self.send(new packets.ErrorPacket(err));
}); });
this.on('packet', function(packet) { this.on('packet', function(packet) {
var result; var result;
if (packet.cmd === 'event') { switch (packet.cmd) {
self.emit('event', packet.items); case packets.types.EVENT:
self.emit.apply(self, packet.items); self.emit('event', packet.items);
return; self.emit.apply(self, packet.items);
break;
case packets.types.ERROR:
self.emit('error', packet.error);
break;
default:
result = jobs.execute(packet);
result.id = packet.id;
self.send(result);
break;
} }
try {
result = jobs.execute(packet.cmd, packet.items);
} catch (e) {
self.send(packet.job, 'response', [fromError(e)]);
return;
}
self.send(packet.job, 'response', [null, result]);
}); });
}; };
/*
* Helpers
*/
function fromError(err) {
return [err.message, err.stack + '', err.type];
}
/* /*
* Expose * Expose
*/ */

823
lib/workers/packets.js Normal file
View File

@ -0,0 +1,823 @@
/*!
* packets.js - worker packets for bcoin
* Copyright (c) 2014-2016, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
var assert = require('assert');
var util = require('../utils/util');
var BufferReader = require('../utils/reader');
var Script = require('../script/script');
var Witness = require('../script/witness');
var Coin = require('../primitives/coin');
/*
* Constants
*/
var packetTypes = {
EVENT: 0,
LOG: 1,
ERROR: 2,
ERRORRESULT: 3,
VERIFY: 4,
VERIFYRESULT: 5,
SIGN: 6,
SIGNRESULT: 7,
VERIFYINPUT: 8,
VERIFYINPUTRESULT: 9,
SIGNINPUT: 10,
SIGNINPUTRESULT: 11,
ECVERIFY: 12,
ECVERIFYRESULT: 13,
ECSIGN: 14,
ECSIGNRESULT: 15,
MINE: 16,
MINERESULT: 17,
SCRYPT: 18,
SCRYPTRESULT: 19
};
/**
* Packet
* @constructor
*/
function Packet() {
this.id = ++Packet.id >>> 0;
this.error = null;
}
Packet.id = 0;
Packet.prototype.cmd = -1;
Packet.prototype.toRaw = function toRaw() {
throw new Error('Abstract method.');
};
/**
* EventPacket
* @constructor
*/
function EventPacket(items) {
Packet.call(this);
this.items = items || [];
}
util.inherits(EventPacket, Packet);
EventPacket.prototype.cmd = packetTypes.EVENT;
EventPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarString(JSON.stringify(this.items), 'utf8');
};
EventPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new EventPacket();
packet.items = JSON.parse(br.readVarString('utf8'));
return packet;
};
/**
* LogPacket
* @constructor
*/
function LogPacket(items) {
Packet.call(this);
this.items = items || [];
}
util.inherits(LogPacket, Packet);
LogPacket.prototype.cmd = packetTypes.LOG;
LogPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarString(JSON.stringify(this.items), 'utf8');
};
LogPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new LogPacket();
packet.items = JSON.parse(br.readVarString('utf8'));
return packet;
};
/**
* ErrorPacket
* @constructor
*/
function ErrorPacket(error) {
Packet.call(this);
this.error = error || new Error();
}
util.inherits(ErrorPacket, Packet);
ErrorPacket.prototype.cmd = packetTypes.ERROR;
ErrorPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarString(this.error.message + '', 'utf8');
bw.writeVarString(this.error.stack + '', 'utf8');
bw.writeVarString((this.error.type || ''), 'utf8');
};
ErrorPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ErrorPacket();
packet.error.message = br.readVarString('utf8');
packet.error.stack = br.readVarString('utf8');
packet.error.type = br.readVarString('utf8');
return packet;
};
/**
* ErrorResultPacket
* @constructor
*/
function ErrorResultPacket(error) {
Packet.call(this);
this.error = error || new Error();
}
util.inherits(ErrorResultPacket, Packet);
ErrorResultPacket.prototype.cmd = packetTypes.ERRORRESULT;
ErrorResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarString(this.error.message + '', 'utf8');
bw.writeVarString(this.error.stack + '', 'utf8');
bw.writeVarString((this.error.type || ''), 'utf8');
};
ErrorResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ErrorResultPacket();
packet.error.message = br.readVarString('utf8');
packet.error.stack = br.readVarString('utf8');
packet.error.type = br.readVarString('utf8');
return packet;
};
/**
* VerifyPacket
* @constructor
*/
function VerifyPacket(tx, flags) {
Packet.call(this);
this.tx = tx || null;
this.flags = flags != null ? flags : null;
}
util.inherits(VerifyPacket, Packet);
VerifyPacket.prototype.cmd = packetTypes.VERIFY;
VerifyPacket.prototype.toRaw = function(bw) {
frameTX(this.tx, bw);
bw.writeU32(this.flags);
};
VerifyPacket.fromRaw = function fromRaw(TX, data) {
var br = new BufferReader(data, true);
var packet = new VerifyPacket();
packet.tx = parseTX(TX, br);
packet.flags = br.readU32();
return packet;
};
/**
* VerifyResultPacket
* @constructor
*/
function VerifyResultPacket(value) {
Packet.call(this);
this.value = value;
}
util.inherits(VerifyResultPacket, Packet);
VerifyResultPacket.prototype.cmd = packetTypes.VERIFYRESULT;
VerifyResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeU8(this.value ? 1 : 0);
};
VerifyResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new VerifyResultPacket();
packet.value = br.readU8() === 1;
return packet;
};
/**
* SignPacket
* @constructor
*/
function SignPacket(tx, rings, type) {
Packet.call(this);
this.tx = tx || null;
this.rings = rings || [];
this.type = type != null ? type : null;
}
util.inherits(SignPacket, Packet);
SignPacket.prototype.cmd = packetTypes.SIGN;
SignPacket.prototype.toRaw = function toRaw(bw) {
var i, ring;
frameTX(this.tx, bw);
bw.writeU32(this.rings.length);
for (i = 0; i < this.rings.length; i++) {
ring = this.rings[i];
bw.writeBytes(ring.toRaw());
}
bw.write8(this.type != null ? this.type : -1);
};
SignPacket.fromRaw = function fromRaw(MTX, KeyRing, data) {
var br = new BufferReader(data, true);
var packet = new SignPacket();
var i, count, ring;
packet.tx = parseTX(MTX, br);
count = br.readU32();
for (i = 0; i < count; i++) {
ring = KeyRing.fromRaw(br);
packet.rings.push(ring);
}
packet.type = br.read8();
if (packet.type === -1)
packet.type = null;
return packet;
};
/**
* SignResultPacket
* @constructor
*/
function SignResultPacket(total, witness, script) {
Packet.call(this);
this.total = total || 0;
this.script = script || [];
this.witness = witness || [];
}
util.inherits(SignResultPacket, Packet);
SignResultPacket.prototype.cmd = packetTypes.SIGNRESULT;
SignResultPacket.fromTX = function fromTX(tx, total) {
var packet = new SignResultPacket(total);
var i, input;
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
packet.script.push(input.script);
packet.witness.push(input.witness);
}
return packet;
};
SignResultPacket.prototype.toRaw = function toRaw(bw) {
var i;
assert(this.script.length === this.witness.length);
bw.writeVarint(this.total);
bw.writeVarint(this.script.length);
for (i = 0; i < this.script.length; i++) {
this.script[i].toRaw(bw);
this.witness[i].toRaw(bw);
}
};
SignResultPacket.prototype.inject = function inject(tx) {
var i, input;
assert(this.script.length === tx.inputs.length);
assert(this.witness.length === tx.inputs.length);
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
input.script = this.script[i];
input.witness = this.witness[i];
}
};
SignResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new SignResultPacket();
var i, count;
packet.total = br.readVarint();
count = br.readVarint();
for (i = 0; i < count; i++) {
packet.script.push(Script.fromRaw(br));
packet.witness.push(Witness.fromRaw(br));
}
return packet;
};
/**
* VerifyInputPacket
* @constructor
*/
function VerifyInputPacket(tx, index, flags) {
Packet.call(this);
this.tx = tx || null;
this.index = index;
this.flags = flags != null ? flags : null;
}
util.inherits(VerifyInputPacket, Packet);
VerifyInputPacket.prototype.cmd = packetTypes.VERIFYINPUT;
VerifyInputPacket.prototype.toRaw = function toRaw(bw) {
frameTX(this.tx, bw);
bw.writeU32(this.index);
bw.write32(this.flags != null ? this.flags : -1);
};
VerifyInputPacket.fromRaw = function fromRaw(TX, data) {
var br = new BufferReader(data, true);
var packet = new VerifyInputPacket();
packet.tx = parseTX(TX, br);
packet.index = br.readU32();
packet.flags = br.read32();
if (packet.flags === -1)
packet.flags = null;
return packet;
};
/**
* VerifyInputResultPacket
* @constructor
*/
function VerifyInputResultPacket(value) {
Packet.call(this);
this.value = value;
}
util.inherits(VerifyInputResultPacket, Packet);
VerifyInputResultPacket.prototype.cmd = packetTypes.VERIFYINPUTRESULT;
VerifyInputResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeU8(this.value ? 1 : 0);
};
VerifyInputResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new VerifyInputResultPacket();
packet.value = br.readU8() === 1;
return packet;
};
/**
* SignInputPacket
* @constructor
*/
function SignInputPacket(tx, index, rings, type) {
Packet.call(this);
this.tx = tx || null;
this.index = index;
this.rings = rings || [];
this.type = type != null ? type : null;
}
util.inherits(SignInputPacket, Packet);
SignInputPacket.prototype.cmd = packetTypes.SIGNINPUT;
SignInputPacket.prototype.toRaw = function toRaw(bw) {
var i, ring;
frameTX(this.tx, bw);
bw.writeU32(this.index);
bw.writeU32(this.rings.length);
for (i = 0; i < this.rings.length; i++) {
ring = this.rings[i];
bw.writeBytes(ring.toRaw());
}
bw.write8(this.type != null ? this.type : -1);
};
SignInputPacket.fromRaw = function fromRaw(MTX, KeyRing, data) {
var br = new BufferReader(data, true);
var packet = new SignInputPacket();
var i, count, ring;
packet.tx = parseTX(MTX, br);
packet.index = br.readU32();
count = br.readU32();
for (i = 0; i < count; i++) {
ring = KeyRing.fromRaw(br);
packet.rings.push(ring);
}
packet.type = br.read8();
if (packet.type === -1)
packet.type = null;
return packet;
};
/**
* SignInputResultPacket
* @constructor
*/
function SignInputResultPacket(value, witness, script) {
Packet.call(this);
this.value = value || false;
this.script = script || null;
this.witness = witness || null;
}
util.inherits(SignInputResultPacket, Packet);
SignInputResultPacket.prototype.cmd = packetTypes.SIGNINPUTRESULT;
SignInputResultPacket.fromTX = function fromTX(tx, i, value) {
var packet = new SignInputResultPacket(value);
var input = tx.inputs[i];
assert(input);
packet.script = input.script;
packet.witness = input.witness;
return packet;
};
SignInputResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeU8(this.value ? 1 : 0);
this.script.toRaw(bw);
this.witness.toRaw(bw);
};
SignInputResultPacket.prototype.inject = function inject(tx, i) {
var input = tx.inputs[i];
assert(input);
input.script = this.script;
input.witness = this.witness;
};
SignInputResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new SignInputResultPacket();
packet.value = br.readU8() === 1;
packet.script = Script.fromRaw(br);
packet.witness = Witness.fromRaw(br);
return packet;
};
/**
* ECVerifyPacket
* @constructor
*/
function ECVerifyPacket(msg, sig, key) {
Packet.call(this);
this.msg = msg || null;
this.sig = sig || null;
this.key = key || null;
}
util.inherits(ECVerifyPacket, Packet);
ECVerifyPacket.prototype.cmd = packetTypes.ECVERIFY;
ECVerifyPacket.prototype.toRaw = function(bw) {
bw.writeVarBytes(this.msg);
bw.writeVarBytes(this.sig);
bw.writeVarBytes(this.key);
};
ECVerifyPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ECVerifyPacket();
packet.msg = br.readVarBytes();
packet.sig = br.readVarBytes();
packet.key = br.readVarBytes();
return packet;
};
/**
* ECVerifyResultPacket
* @constructor
*/
function ECVerifyResultPacket(value) {
Packet.call(this);
this.value = value;
}
util.inherits(ECVerifyResultPacket, Packet);
ECVerifyResultPacket.prototype.cmd = packetTypes.ECVERIFYRESULT;
ECVerifyResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeU8(this.value ? 1 : 0);
};
ECVerifyResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ECVerifyResultPacket();
packet.value = br.readU8() === 1;
return packet;
};
/**
* ECSignPacket
* @constructor
*/
function ECSignPacket(msg, key) {
Packet.call(this);
this.msg = msg || null;
this.key = key || null;
}
util.inherits(ECSignPacket, Packet);
ECSignPacket.prototype.cmd = packetTypes.ECSIGN;
ECSignPacket.prototype.toRaw = function(bw) {
bw.writeVarBytes(this.msg);
bw.writeVarBytes(this.key);
};
ECSignPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ECSignPacket();
packet.msg = br.readVarBytes();
packet.key = br.readVarBytes();
return packet;
};
/**
* ECSignResultPacket
* @constructor
*/
function ECSignResultPacket(sig) {
Packet.call(this);
this.sig = sig;
}
util.inherits(ECSignResultPacket, Packet);
ECSignResultPacket.prototype.cmd = packetTypes.ECSIGNRESULT;
ECSignResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarBytes(this.sig);
};
ECSignResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ECSignResultPacket();
packet.sig = br.readVarBytes();
return packet;
};
/**
* MinePacket
* @constructor
*/
function MinePacket(data, target, min, max) {
Packet.call(this);
this.data = data || null;
this.target = target || null;
this.min = min != null ? min : -1;
this.max = max != null ? max : -1;
}
util.inherits(MinePacket, Packet);
MinePacket.prototype.cmd = packetTypes.MINE;
MinePacket.prototype.toRaw = function(bw) {
bw.writeBytes(this.data);
bw.writeBytes(this.target);
bw.writeU32(this.min);
bw.writeU32(this.max);
};
MinePacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new MinePacket();
packet.data = br.readBytes(80);
packet.target = br.readBytes(32);
packet.min = br.readU32();
packet.max = br.readU32();
return packet;
};
/**
* MineResultPacket
* @constructor
*/
function MineResultPacket(nonce) {
Packet.call(this);
this.nonce = nonce != null ? nonce : -1;
}
util.inherits(MineResultPacket, Packet);
MineResultPacket.prototype.cmd = packetTypes.MINERESULT;
MineResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeU32(this.nonce);
};
MineResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new MineResultPacket();
packet.nonce = br.readU32();
if ((packet.nonce >> 0) === -1)
packet.nonce = -1;
return packet;
};
/**
* ScryptPacket
* @constructor
*/
function ScryptPacket(passwd, salt, N, r, p, len) {
Packet.call(this);
this.passwd = passwd || null;
this.salt = salt || null;
this.N = N != null ? N : -1;
this.r = r != null ? r : -1;
this.p = p != null ? p : -1;
this.len = len != null ? len : -1;
}
util.inherits(ScryptPacket, Packet);
ScryptPacket.prototype.cmd = packetTypes.SCRYPT;
ScryptPacket.prototype.toRaw = function(bw) {
bw.writeVarBytes(this.passwd);
bw.writeVarBytes(this.salt);
bw.writeU32(this.N);
bw.writeU32(this.r);
bw.writeU32(this.p);
bw.writeU32(this.len);
};
ScryptPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ScryptPacket();
packet.passwd = br.readVarBytes();
packet.salt = br.readVarBytes();
packet.N = br.readU32();
packet.r = br.readU32();
packet.p = br.readU32();
packet.len = br.readU32();
return packet;
};
/**
* ScryptResultPacket
* @constructor
*/
function ScryptResultPacket(key) {
Packet.call(this);
this.key = key || null;
}
util.inherits(ScryptResultPacket, Packet);
ScryptResultPacket.prototype.cmd = packetTypes.SCRYPTRESULT;
ScryptResultPacket.prototype.toRaw = function toRaw(bw) {
bw.writeVarBytes(this.key);
};
ScryptResultPacket.fromRaw = function fromRaw(data) {
var br = new BufferReader(data, true);
var packet = new ScryptResultPacket();
packet.key = br.readVarBytes();
return packet;
};
/*
* Helpers
*/
function frameTX(tx, bw) {
var i, input;
tx.toRaw(bw);
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
if (!input.coin) {
bw.writeU8(0);
continue;
}
bw.writeU8(1);
bw.writeVarint(input.coin.value);
input.coin.script.toRaw(bw);
}
}
function parseTX(TX, br) {
var tx = TX.fromRaw(br);
var i, input, prevout, coin;
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
prevout = input.prevout;
if (br.readU8() === 0)
continue;
coin = new Coin();
coin.value = br.readVarint();
coin.script.fromRaw(br);
coin.hash = prevout.hash;
coin.index = prevout.index;
input.coin = coin;
}
return tx;
}
/*
* Expose
*/
exports.types = packetTypes;
exports.EventPacket = EventPacket;
exports.LogPacket = LogPacket;
exports.ErrorPacket = ErrorPacket;
exports.ErrorResultPacket = ErrorResultPacket;
exports.VerifyPacket = VerifyPacket;
exports.VerifyResultPacket = VerifyResultPacket;
exports.SignPacket = SignPacket;
exports.SignResultPacket = SignResultPacket;
exports.VerifyInputPacket = VerifyInputPacket;
exports.VerifyInputResultPacket = VerifyInputResultPacket;
exports.SignInputPacket = SignInputPacket;
exports.SignInputResultPacket = SignInputResultPacket;
exports.ECVerifyPacket = ECVerifyPacket;
exports.ECVerifyResultPacket = ECVerifyResultPacket;
exports.ECSignPacket = ECSignPacket;
exports.ECSignResultPacket = ECSignResultPacket;
exports.MinePacket = MinePacket;
exports.MineResultPacket = MineResultPacket;
exports.ScryptPacket = ScryptPacket;
exports.ScryptResultPacket = ScryptResultPacket;

View File

@ -23,22 +23,14 @@ function Parser() {
return new Parser(); return new Parser();
ServerParser.call(this); ServerParser.call(this);
this.TX = TX;
this.MTX = MTX;
this.KeyRing = KeyRing;
} }
util.inherits(Parser, ServerParser); util.inherits(Parser, ServerParser);
Parser.prototype.parseKeyRing = function parseKeyRing(data) {
return KeyRing.fromRaw(data);
};
Parser.prototype.parseMTX = function parseMTX(data) {
return MTX.fromExtended(data, true);
};
Parser.prototype.parseTX = function parseTX(data) {
return TX.fromExtended(data, true);
};
/* /*
* Expose * Expose
*/ */

View File

@ -7,13 +7,10 @@
'use strict'; 'use strict';
var EventEmitter = require('events').EventEmitter;
var BN = require('bn.js');
var util = require('../utils/util');
var assert = require('assert'); var assert = require('assert');
var BufferReader = require('../utils/reader'); var EventEmitter = require('events').EventEmitter;
var Script = require('../script/script'); var util = require('../utils/util');
var Witness = require('../script/witness'); var packets = require('./packets');
/** /**
* Parser * Parser
@ -26,10 +23,14 @@ function Parser() {
EventEmitter.call(this); EventEmitter.call(this);
this.waiting = 25; this.waiting = 9;
this.packet = null; this.header = null;
this.pending = []; this.pending = [];
this.total = 0; this.total = 0;
this.TX = null;
this.MTX = null;
this.KeyRing = null;
} }
util.inherits(Parser, EventEmitter); util.inherits(Parser, EventEmitter);
@ -91,27 +92,28 @@ Parser.prototype.read = function read(size) {
}; };
Parser.prototype.parse = function parse(data) { Parser.prototype.parse = function parse(data) {
var packet = this.packet; var header = this.header;
var packet;
if (!packet) { if (!header) {
try { try {
packet = this.parseHeader(data); header = this.parseHeader(data);
} catch (e) { } catch (e) {
this.emit('error', e); this.emit('error', e);
return; return;
} }
this.packet = packet; this.header = header;
this.waiting = packet.size + 1; this.waiting = header.size + 1;
return; return;
} }
this.waiting = 25; this.waiting = 9;
this.packet = null; this.header = null;
try { try {
packet.items = this.parseBody(data); packet = this.parsePacket(header, data);
} catch (e) { } catch (e) {
this.emit('error', e); this.emit('error', e);
return; return;
@ -122,95 +124,72 @@ Parser.prototype.parse = function parse(data) {
return; return;
} }
packet.id = header.id;
this.emit('packet', packet); this.emit('packet', packet);
}; };
Parser.prototype.parseHeader = function parseHeader(data) { Parser.prototype.parseHeader = function parseHeader(data) {
var magic, job, len, cmd, size; var id = data.readUInt32LE(0, true);
var cmd = data.readUInt8(4, true);
magic = data.readUInt32LE(0, true); var size = data.readUInt32LE(5, true);
return new Header(id, cmd, size);
if (magic !== 0xdeadbeef)
throw new Error('Bad magic number: ' + magic.toString(16));
job = data.readUInt32LE(4, true);
len = data[8];
cmd = data.toString('ascii', 9, 9 + len);
size = data.readUInt32LE(21, true);
return new Packet(job, cmd, size);
}; };
Parser.prototype.parseBody = function parseBody(data) { Parser.prototype.parsePacket = function parsePacket(header, data) {
var br = BufferReader(data); switch (header.cmd) {
var i, count, items; case packets.types.EVENT:
return packets.EventPacket.fromRaw(data);
switch (br.readU8()) { case packets.types.LOG:
case 0: return packets.LogPacket.fromRaw(data);
return null; case packets.types.ERROR:
case 1: return packets.ErrorPacket.fromRaw(data);
return br.readVarString('utf8'); case packets.types.VERIFY:
case 2: return packets.VerifyPacket.fromRaw(this.TX, data);
return br.read32(); case packets.types.VERIFYRESULT:
case 3: return packets.VerifyResultPacket.fromRaw(data);
return br.readU32(); case packets.types.SIGN:
case 4: return packets.SignPacket.fromRaw(this.MTX, this.KeyRing, data);
return br.readU8() === 1; case packets.types.SIGNRESULT:
case 5: return packets.SignResultPacket.fromRaw(data);
return br.readVarBytes(); case packets.types.VERIFYINPUT:
case 6: return packets.VerifyInputPacket.fromRaw(this.TX, data);
items = []; case packets.types.VERIFYINPUTRESULT:
count = br.readVarint(); return packets.VerifyInputResultPacket.fromRaw(data);
for (i = 0; i < count; i++) case packets.types.SIGNINPUT:
items.push(this.parseBody(br)); return packets.SignInputPacket.fromRaw(this.MTX, this.KeyRing, data);
return items; case packets.types.SIGNINPUTRESULT:
case 7: return packets.SignInputResultPacket.fromRaw(data);
items = {}; case packets.types.ECVERIFY:
count = br.readVarint(); return packets.ECVerifyPacket.fromRaw(data);
for (i = 0; i < count; i++) case packets.types.ECVERIFYRESULT:
items[br.readVarString('utf8')] = this.parseBody(br); return packets.ECVerifyResultPacket.fromRaw(data);
return items; case packets.types.ECSIGN:
case 10: return packets.ECSignPacket.fromRaw(data);
return new BN(br.readVarBytes()); case packets.types.ECSIGNRESULT:
case 40: return packets.ECSignResultPacket.fromRaw(data);
return Script.fromRaw(br.readVarBytes()); case packets.types.MINE:
case 41: return packets.MinePacket.fromRaw(data);
return Witness.fromRaw(br); case packets.types.MINERESULT:
case 42: return packets.MineResultPacket.fromRaw(data);
return this.parseMTX(br); case packets.types.SCRYPT:
case 43: return packets.ScryptPacket.fromRaw(data);
return this.parseTX(br); case packets.types.SCRYPTRESULT:
case 44: return packets.ScryptResultPacket.fromRaw(data);
return this.parseKeyRing(br);
default: default:
throw new Error('Bad type.'); throw new Error('Unknown packet.');
} }
}; };
Parser.prototype.parseKeyRing = function parseKeyRing(data) {
throw new Error('KeyRing in client output.');
};
Parser.prototype.parseMTX = function parseMTX(data) {
throw new Error('MTX in client output.');
};
Parser.prototype.parseTX = function parseTX(data) {
throw new Error('TX in client output.');
};
/** /**
* Packet * Header
* @constructor * @constructor
*/ */
function Packet(job, cmd, size) { function Header(id, cmd, size) {
this.job = job; this.id = id;
this.cmd = cmd; this.cmd = cmd;
this.size = size; this.size = size;
this.items = null;
} }
/* /*

View File

@ -7,6 +7,7 @@
'use strict'; 'use strict';
var assert = require('assert');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var util = require('../utils/util'); var util = require('../utils/util');
var co = require('../utils/co'); var co = require('../utils/co');
@ -15,6 +16,7 @@ var Network = require('../protocol/network');
var jobs = require('./jobs'); var jobs = require('./jobs');
var Parser = require('./parser'); var Parser = require('./parser');
var Framer = require('./framer'); var Framer = require('./framer');
var packets = require('./packets');
var os = require('os'); var os = require('os');
var cp = require('child_process'); var cp = require('child_process');
@ -224,20 +226,20 @@ WorkerPool.prototype.destroy = function destroy() {
/** /**
* Call a method for a worker to execute. * Call a method for a worker to execute.
* @param {String} method - Method name. * @param {Packet} packet
* @param {Array} args - Arguments. * @param {Number} timeout
* @returns {Promise} * @returns {Promise}
* the worker method specifies. * the worker method specifies.
*/ */
WorkerPool.prototype.execute = function execute(method, args, timeout) { WorkerPool.prototype.execute = function execute(packet, timeout) {
var result, child; var result, child;
if (!this.enabled || !WorkerPool.support) { if (!this.enabled || !WorkerPool.support) {
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
util.nextTick(function() { util.nextTick(function() {
try { try {
result = jobs.execute(method, args); result = jobs._execute(packet);
} catch (e) { } catch (e) {
reject(e); reject(e);
return; return;
@ -252,7 +254,7 @@ WorkerPool.prototype.execute = function execute(method, args, timeout) {
child = this.alloc(); child = this.alloc();
return child.execute(method, args, timeout); return child.execute(packet, timeout);
}; };
/** /**
@ -262,21 +264,11 @@ WorkerPool.prototype.execute = function execute(method, args, timeout) {
* @returns {Promise} - Returns Boolean. * @returns {Promise} - Returns Boolean.
*/ */
WorkerPool.prototype.verify = function verify(tx, flags) { WorkerPool.prototype.verify = co(function* verify(tx, flags) {
return this.execute('verify', [tx, flags], -1); var packet = new packets.VerifyPacket(tx, flags);
}; var result = yield this.execute(packet, -1);
return result.value;
/** });
* Execute the tx input verification job (default timeout).
* @param {TX} tx
* @param {Number} index
* @param {VerifyFlags} flags
* @returns {Promise} - Returns Boolean.
*/
WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) {
return this.execute('verifyInput', [tx, index, flags], -1);
};
/** /**
* Execute the tx signing job (default timeout). * Execute the tx signing job (default timeout).
@ -287,43 +279,56 @@ WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) {
*/ */
WorkerPool.prototype.sign = co(function* sign(tx, ring, type) { WorkerPool.prototype.sign = co(function* sign(tx, ring, type) {
var i, result, input, sig, sigs, total; var rings = ring;
var packet, result;
result = yield this.execute('sign', [tx, ring, type], -1); if (!Array.isArray(rings))
rings = [rings];
sigs = result[0]; packet = new packets.SignPacket(tx, rings, type);
total = result[1]; result = yield this.execute(packet, -1);
for (i = 0; i < sigs.length; i++) { result.inject(tx);
sig = sigs[i];
input = tx.inputs[i];
input.script = sig[0];
input.witness = sig[1];
}
return total; return result.total;
});
/**
* Execute the tx input verification job (default timeout).
* @param {TX} tx
* @param {Number} index
* @param {VerifyFlags} flags
* @returns {Promise} - Returns Boolean.
*/
WorkerPool.prototype.verifyInput = co(function* verifyInput(tx, index, flags) {
var packet = new packets.VerifyInputPacket(tx, index, flags);
var result = yield this.execute(packet, -1);
return result.value;
}); });
/** /**
* Execute the tx input signing job (default timeout). * Execute the tx input signing job (default timeout).
* @param {MTX} tx * @param {MTX} tx
* @param {Number} index * @param {Number} index
* @param {Buffer} key * @param {KeyRing} ring
* @param {SighashType} type * @param {SighashType} type
* @returns {Promise} * @returns {Promise}
*/ */
WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) { WorkerPool.prototype.signInput = co(function* signInput(tx, index, ring, type) {
var sig = yield this.execute('signInput', [tx, index, key, type], -1); var rings = ring;
var input = tx.inputs[index]; var packet, result;
if (!sig) if (!Array.isArray(rings))
return false; rings = [rings];
input.script = sig[0]; packet = new packets.SignInputPacket(tx, index, rings, type);
input.witness = sig[1]; result = yield this.execute(packet, -1);
return true; result.inject(tx);
return result.value;
}); });
/** /**
@ -334,9 +339,11 @@ WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) {
* @returns {Promise} * @returns {Promise}
*/ */
WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) { WorkerPool.prototype.ecVerify = co(function* ecVerify(msg, sig, key) {
return this.execute('ecVerify', [msg, sig, key], -1); var packet = new packets.ECVerifyPacket(msg, sig, key);
}; var result = yield this.execute(packet, -1);
return result.value;
});
/** /**
* Execute the ec signing job (no timeout). * Execute the ec signing job (no timeout).
@ -345,9 +352,11 @@ WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) {
* @returns {Promise} * @returns {Promise}
*/ */
WorkerPool.prototype.ecSign = function ecSign(msg, key) { WorkerPool.prototype.ecSign = co(function* ecSign(msg, key) {
return this.execute('ecSign', [msg, key], -1); var packet = new packets.SignPacket(msg, key);
}; var result = yield this.execute(packet, -1);
return result.sig;
});
/** /**
* Execute the mining job (no timeout). * Execute the mining job (no timeout).
@ -358,9 +367,11 @@ WorkerPool.prototype.ecSign = function ecSign(msg, key) {
* @returns {Promise} - Returns {Number}. * @returns {Promise} - Returns {Number}.
*/ */
WorkerPool.prototype.mine = function mine(data, target, min, max) { WorkerPool.prototype.mine = co(function* mine(data, target, min, max) {
return this.execute('mine', [data, target, min, max], -1); var packet = new packets.MinePacket(data, target, min, max);
}; var result = yield this.execute(packet, -1);
return result.nonce;
});
/** /**
* Execute scrypt job (no timeout). * Execute scrypt job (no timeout).
@ -374,9 +385,11 @@ WorkerPool.prototype.mine = function mine(data, target, min, max) {
* @returns {Buffer} * @returns {Buffer}
*/ */
WorkerPool.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { WorkerPool.prototype.scrypt = co(function* scrypt(passwd, salt, N, r, p, len) {
return this.execute('scrypt', [passwd, salt, N, r, p, len], -1); var packet = new packets.ScryptPacket(passwd, salt, N, r, p, len);
}; var result = yield this.execute(packet, -1);
return result.key;
});
/** /**
* Represents a worker. * Represents a worker.
@ -394,9 +407,9 @@ function Worker(id) {
this.framer = new Framer(); this.framer = new Framer();
this.parser = new Parser(); this.parser = new Parser();
this.setMaxListeners(util.MAX_SAFE_INTEGER); this.setMaxListeners(util.MAX_SAFE_INTEGER);
this.uid = 0;
this.id = id != null ? id : -1; this.id = id != null ? id : -1;
this.child = null; this.child = null;
this.pending = {};
this._init(); this._init();
} }
@ -471,6 +484,14 @@ Worker.prototype._init = function _init() {
}); });
} }
this.on('exit', function() {
self.killJobs();
});
this.on('error', function() {
self.killJobs();
});
this.on('data', function(data) { this.on('data', function(data) {
self.parser.feed(data); self.parser.feed(data);
}); });
@ -494,44 +515,32 @@ Worker.prototype._init = function _init() {
Worker.prototype._bind = function _bind() { Worker.prototype._bind = function _bind() {
var self = this; var self = this;
this.on('worker error', function(err) {
self.emit('error', toError(err));
});
this.on('exit', function(code) { this.on('exit', function(code) {
var i = WorkerPool.children.indexOf(self); var i = WorkerPool.children.indexOf(self);
if (i !== -1) if (i !== -1)
WorkerPool.children.splice(i, 1); WorkerPool.children.splice(i, 1);
}); });
this.on('log', function(items) {
util.log('Worker %d:', self.id);
util.log.apply(util, items);
});
this.on('packet', function(packet) { this.on('packet', function(packet) {
var err, result; switch (packet.cmd) {
case packets.types.EVENT:
if (packet.cmd === 'event') { self.emit.apply(self, packet.items);
self.emit.apply(self, packet.items); self.emit('event', packet.items);
self.emit('event', packet.items); break;
return; case packets.types.LOG:
util.log('Worker %d:', self.id);
util.log.apply(util, packet.items);
break;
case packets.types.ERROR:
self.emit('error', packet.error);
break;
case packets.types.ERRORRESULT:
self.rejectJob(packet.id, packet.error);
break;
default:
self.resolveJob(packet.id, packet);
break;
} }
if (packet.cmd === 'response') {
err = packet.items[0];
result = packet.items[1];
if (err)
err = toError(err);
self.emit('response ' + packet.job, err, result);
return;
}
err = new Error('Unknown packet: ' + packet.cmd);
self.emit('error', err);
}); });
WorkerPool.children.push(this); WorkerPool.children.push(this);
@ -560,14 +569,12 @@ Worker.prototype.write = function write(data) {
/** /**
* Frame and send a packet. * Frame and send a packet.
* @param {String} job * @param {Packet} packet
* @param {String} cmd
* @param {Array} items
* @returns {Boolean} * @returns {Boolean}
*/ */
Worker.prototype.send = function send(job, cmd, items) { Worker.prototype.send = function send(packet) {
return this.write(this.framer.packet(job, cmd, items)); return this.write(this.framer.packet(packet));
}; };
/** /**
@ -584,7 +591,7 @@ Worker.prototype.sendEvent = function sendEvent() {
for (i = 0; i < items.length; i++) for (i = 0; i < items.length; i++)
items[i] = arguments[i]; items[i] = arguments[i];
return this.send(0, 'event', items); return this.send(new packets.EventPacket(items));
}; };
/** /**
@ -602,66 +609,121 @@ Worker.prototype.destroy = function destroy() {
/** /**
* Call a method for a worker to execute. * Call a method for a worker to execute.
* @private * @param {Packet} packet
* @param {Number} job - Job ID. * @param {Number} timeout
* @param {String} method - Method name.
* @param {Array} args - Arguments.
* @returns {Promise} * @returns {Promise}
* the worker method specifies.
*/ */
Worker.prototype._execute = function _execute(method, args, timeout, callback) { Worker.prototype.execute = function execute(packet, timeout) {
var self = this; var self = this;
var job = this.uid; return new Promise(function(resolve, reject) {
var event, timer; self._execute(packet, timeout, co.wrap(resolve, reject));
});
if (++this.uid === 0x100000000)
this.uid = 0;
event = 'response ' + job;
function listener(err, result) {
if (timer) {
clearTimeout(timer);
timer = null;
}
self.removeListener(event, listener);
self.removeListener('error', listener);
self.removeListener('exit', exitListener);
callback(err, result);
}
function exitListener(code) {
listener(new Error('Worker exited: ' + code));
}
this.once(event, listener);
this.once('error', listener);
this.once('exit', exitListener);
if (timeout !== -1) {
timer = setTimeout(function() {
listener(new Error('Worker timed out.'));
}, timeout);
}
this.send(job, method, args);
}; };
/** /**
* Call a method for a worker to execute. * Call a method for a worker to execute.
* @param {Number} job - Job ID. * @private
* @param {String} method - Method name. * @param {Packet} packet
* @param {Array} args - Arguments. * @param {Number} timeout
* @returns {Promise} * @param {Function} callback
* the worker method specifies. * the worker method specifies.
*/ */
Worker.prototype.execute = function execute(method, args, timeout) { Worker.prototype._execute = function _execute(packet, timeout, callback) {
var job = new PendingJob(this, packet.id, callback);
assert(!this.pending[packet.id]);
this.pending[packet.id] = job;
job.start(timeout);
this.send(packet);
};
/**
* Resolve a job.
* @param {Number} id
* @param {Packet} result
*/
Worker.prototype.resolveJob = function resolveJob(id, result) {
var job = this.pending[id];
assert(job);
job.finish(null, result);
};
/**
* Reject a job.
* @param {Number} id
* @param {Error} err
*/
Worker.prototype.rejectJob = function rejectJob(id, err) {
var job = this.pending[id];
assert(job);
job.finish(err);
};
/**
* Kill all jobs associated with worker.
*/
Worker.prototype.killJobs = function killJobs() {
var keys = Object.keys(this.pending);
var i, key, job;
for (i = 0; i < keys.length; i++) {
key = keys[i];
job = this.pending[key];
job.destroy();
}
};
/**
* PendingWorker
* @constructor
*/
function PendingJob(worker, id, callback) {
this.worker = worker;
this.id = id;
this.callback = callback;
this.timer = null;
}
PendingJob.prototype.start = function start(timeout) {
var self = this; var self = this;
return new Promise(function(resolve, reject) {
self._execute(method, args, timeout, co.wrap(resolve, reject)); if (timeout === -1)
}); return;
this.timer = setTimeout(function() {
self.finish(new Error('Worker timed out.'));
}, timeout);
};
PendingJob.prototype.destroy = function destroy() {
this.finish(new Error('Job was destroyed.'));
};
PendingJob.prototype.finish = function finish(err, result) {
var callback = this.callback;
assert(callback, 'Already finished.');
this.callback = null;
if (this.timer != null) {
clearTimeout(this.timer);
this.timer = null;
}
assert(this.worker.pending[this.id]);
delete this.worker.pending[this.id];
callback(err, result);
}; };
/* /*
@ -675,13 +737,6 @@ function getCores() {
return os.cpus().length; return os.cpus().length;
} }
function toError(values) {
var err = new Error(values[0]);
err.stack = values[1];
err.type = values[2];
return err;
}
/* /*
* Default * Default
*/ */

View File

@ -15,7 +15,7 @@ describe('Chain', function() {
var chain, wallet, node, miner, walletdb; var chain, wallet, node, miner, walletdb;
var tip1, tip2, cb1, cb2, mineBlock; var tip1, tip2, cb1, cb2, mineBlock;
this.timeout(8000); this.timeout(5000);
node = new bcoin.fullnode({ db: 'memory', apiKey: 'foo' }); node = new bcoin.fullnode({ db: 'memory', apiKey: 'foo' });
// node.walletdb.client = new Client({ apiKey: 'foo', network: 'regtest' }); // node.walletdb.client = new Client({ apiKey: 'foo', network: 'regtest' });