diff --git a/lib/workers/framer.js b/lib/workers/framer.js index e2a4bb00..2aef1903 100644 --- a/lib/workers/framer.js +++ b/lib/workers/framer.js @@ -8,13 +8,8 @@ 'use strict'; var EventEmitter = require('events').EventEmitter; -var BN = require('bn.js'); var util = require('../utils/util'); -var assert = require('assert'); var BufferWriter = require('../utils/writer'); -var KeyRing = require('../primitives/keyring'); -var Script = require('../script/script'); -var Witness = require('../script/witness'); /** * Framer @@ -30,114 +25,24 @@ function Framer() { util.inherits(Framer, EventEmitter); -Framer.prototype.packet = function packet(job, cmd, items) { - var payload = this.body(items); - var packet = new Buffer(25 + payload.length + 1); +Framer.prototype.packet = function _packet(packet) { + var bw = new BufferWriter(); + var data; - assert(cmd.length < 12); - assert(payload.length <= 0xffffffff); + bw.writeU32(packet.id); + bw.writeU8(packet.cmd); + bw.seek(4); - packet.writeUInt32LE(0xdeadbeef, 0, true); - packet.writeUInt32LE(job, 4, true); - packet.writeUInt8(cmd.length, 8, true); - packet.write(cmd, 9, 'ascii'); - packet.writeUInt32LE(payload.length, 21, true); - payload.copy(packet, 25); - packet[packet.length - 1] = 0x0a; + packet.toRaw(bw); - return packet; + bw.writeU8(0x0a); + + data = bw.render(); + data.writeUInt32LE(data.length - 10, 5, true); + + return data; }; -Framer.prototype.body = function body(items) { - return Framer.item(items); -}; - -Framer.item = function _item(item, writer) { - var bw = BufferWriter(writer); - var i, keys; - - switch (typeof item) { - case 'string': - bw.writeU8(1); - bw.writeVarString(item, 'utf8'); - break; - case 'number': - if (item > 0x7fffffff) { - bw.writeU8(3); - bw.writeU32(item); - } else { - bw.writeU8(2); - bw.write32(item); - } - break; - case 'boolean': - bw.writeU8(4); - bw.writeU8(item ? 1 : 0); - break; - case 'object': - case 'undefined': - if (item == null) { - bw.writeU8(0); - break; - } - if (item instanceof Script) { - bw.writeU8(40); - bw.writeVarBytes(item.toRaw()); - } else if (item instanceof Witness) { - bw.writeU8(41); - item.toRaw(bw); - } else if (isMTX(item)) { - bw.writeU8(42); - item.toExtended(true, bw); - } else if (isTX(item)) { - bw.writeU8(43); - item.toExtended(true, bw); - } else if (item instanceof KeyRing) { - bw.writeU8(44); - item.toRaw(bw); - } else if (BN.isBN(item)) { - bw.writeU8(10); - bw.writeVarBytes(item.toArrayLike(Buffer)); - } else if (Buffer.isBuffer(item)) { - bw.writeU8(5); - bw.writeVarBytes(item); - } else if (Array.isArray(item)) { - bw.writeU8(6); - bw.writeVarint(item.length); - for (i = 0; i < item.length; i++) - Framer.item(item[i], bw); - } else { - keys = Object.keys(item); - bw.writeU8(7); - bw.writeVarint(keys.length); - for (i = 0; i < keys.length; i++) { - bw.writeVarString(keys[i], 'utf8'); - Framer.item(item[keys[i]], bw); - } - } - break; - default: - throw new Error('Bad type.'); - } - - if (!writer) - bw = bw.render(); - - return bw; -}; - -/* - * Helpers - */ - -function isTX(tx) { - return tx && tx.witnessHash && tx.mutable === false; -} - -function isMTX(tx) { - return tx && tx.witnessHash && tx.mutable === true; -} - /* * Expose */ diff --git a/lib/workers/jobs.js b/lib/workers/jobs.js index c03efa3b..e8d79891 100644 --- a/lib/workers/jobs.js +++ b/lib/workers/jobs.js @@ -9,6 +9,7 @@ var ec = require('../crypto/ec'); var scrypt = require('../crypto/scrypt'); var mine = require('../mining/mine'); +var packets = require('./packets'); /** * Jobs to execute within the worker. @@ -26,26 +27,42 @@ var jobs = exports; * @throws on unknown command */ -jobs.execute = function execute(cmd, args) { - switch (cmd) { - case 'verify': - return jobs.verify(args[0], args[1]); - case 'verifyInput': - return jobs.verifyInput(args[0], args[1], args[2]); - case 'sign': - return jobs.sign(args[0], args[1], args[2]); - case 'signInput': - return jobs.signInput(args[0], args[1], args[2], args[3]); - case 'ecVerify': - return jobs.ecVerify(args[0], args[1], args[2]); - case 'ecSign': - return jobs.ecSign(args[0], args[1]); - case 'mine': - return jobs.mine(args[0], args[1], args[2], args[3]); - case 'scrypt': - return jobs.scrypt(args[0], args[1], args[2], args[3], args[4], args[5]); +jobs.execute = function execute(p) { + try { + return jobs._execute(p); + } catch (e) { + return new packets.ErrorResultPacket(e); + } +}; + +/** + * Execute a job on the worker. + * @param {String} cmd + * @param {Array} args + * @returns {Object} + * @throws on unknown command + */ + +jobs._execute = function execute(p) { + switch (p.cmd) { + case packets.types.VERIFY: + return jobs.verify(p.tx, p.flags); + case packets.types.VERIFYINPUT: + return jobs.verifyInput(p.tx, p.index, p.flags); + case packets.types.SIGN: + return jobs.sign(p.tx, p.rings, p.type); + case packets.types.SIGNINPUT: + return jobs.signInput(p.tx, p.index, p.rings, p.type); + case packets.types.ECVERIFY: + return jobs.ecVerify(p.msg, p.sig, p.key); + case packets.types.ECSIGN: + return jobs.ecSign(p.msg, p.key); + case packets.types.MINE: + return jobs.mine(p.data, p.target, p.min, p.max); + case packets.types.SCRYPT: + return jobs.scrypt(p.passwd, p.salt, p.N, p.r, p.p, p.len); default: - throw new Error('Unknown command: "' + cmd + '".'); + throw new Error('Unknown command: "' + p.cmd + '".'); } }; @@ -58,7 +75,8 @@ jobs.execute = function execute(cmd, args) { */ jobs.verify = function verify(tx, flags) { - return tx.verify(flags); + var result = tx.verify(flags); + return new packets.VerifyResultPacket(result); }; /** @@ -71,7 +89,8 @@ jobs.verify = function verify(tx, flags) { */ jobs.verifyInput = function verifyInput(tx, index, flags) { - return tx.verifyInput(index, flags); + var result = tx.verifyInput(index, flags); + return new packets.VerifyInputResultPacket(result); }; /** @@ -84,15 +103,7 @@ jobs.verifyInput = function verifyInput(tx, index, flags) { jobs.sign = function sign(tx, ring, type) { var total = tx.sign(ring, type); - var sigs = []; - var i, input; - - for (i = 0; i < tx.inputs.length; i++) { - input = tx.inputs[i]; - sigs.push([input.script, input.witness]); - } - - return [sigs, total]; + return packets.SignResultPacket.fromTX(tx, total); }; /** @@ -106,12 +117,7 @@ jobs.sign = function sign(tx, ring, type) { jobs.signInput = function signInput(tx, index, ring, type) { var result = tx.signInput(tx, index, ring, type); - var input = tx.inputs[index]; - - if (!result) - return null; - - return [input.script, input.witness]; + return packets.SignInputResultPacket.fromTX(tx, index, result); }; /** @@ -123,7 +129,8 @@ jobs.signInput = function signInput(tx, index, ring, type) { */ jobs.ecVerify = function ecVerify(msg, sig, key) { - return ec.verify(msg, sig, key); + var result = ec.verify(msg, sig, key); + return new packets.ECVerifyResultPacket(result); }; /** @@ -136,7 +143,8 @@ jobs.ecVerify = function ecVerify(msg, sig, key) { */ jobs.ecSign = function ecSign(msg, key) { - return ec.sign(msg, key); + var sig = ec.sign(msg, key); + return new packets.ECSignResultPacket(sig); }; /** @@ -149,7 +157,8 @@ jobs.ecSign = function ecSign(msg, key) { */ jobs.mine = function _mine(data, target, min, max) { - return mine(data, target, min, max); + var nonce = mine(data, target, min, max); + return new packets.MineResultPacket(nonce); }; /** @@ -165,5 +174,6 @@ jobs.mine = function _mine(data, target, min, max) { */ jobs.scrypt = function _scrypt(passwd, salt, N, r, p, len) { - return scrypt(passwd, salt, N, r, p, len); + var key = scrypt(passwd, salt, N, r, p, len); + return new packets.ScryptResultPacket(key); }; diff --git a/lib/workers/master.js b/lib/workers/master.js index ebf019f1..1afe806e 100644 --- a/lib/workers/master.js +++ b/lib/workers/master.js @@ -14,6 +14,7 @@ var Network = require('../protocol/network'); var jobs = require('./jobs'); var Parser = require('./parser-client'); var Framer = require('./framer'); +var packets = require('./packets'); var global = util.global; var server; @@ -112,14 +113,12 @@ Master.prototype.write = function write(data) { /** * Frame and send a packet. - * @param {String} job - * @param {String} cmd - * @param {Array} items + * @param {Packet} packet * @returns {Boolean} */ -Master.prototype.send = function send(job, cmd, items) { - return this.write(this.framer.packet(job, cmd, items)); +Master.prototype.send = function send(packet) { + return this.write(this.framer.packet(packet)); }; /** @@ -136,7 +135,7 @@ Master.prototype.sendEvent = function sendEvent() { for (i = 0; i < items.length; i++) items[i] = arguments[i]; - return this.send(0, 'event', items); + return this.send(new packets.EventPacket(items)); }; /** @@ -162,7 +161,7 @@ Master.prototype.log = function log() { for (i = 0; i < items.length; i++) items[i] = arguments[i]; - this.sendEvent('log', items); + this.send(new packets.LogPacket(items)); }; /** @@ -180,37 +179,29 @@ Master.prototype.listen = function listen(env) { util.error = util.log; this.on('error', function(err) { - self.sendEvent('worker error', fromError(err)); + self.send(new packets.ErrorPacket(err)); }); this.on('packet', function(packet) { var result; - if (packet.cmd === 'event') { - self.emit('event', packet.items); - self.emit.apply(self, packet.items); - return; + switch (packet.cmd) { + case packets.types.EVENT: + self.emit('event', packet.items); + self.emit.apply(self, packet.items); + break; + case packets.types.ERROR: + self.emit('error', packet.error); + break; + default: + result = jobs.execute(packet); + result.id = packet.id; + self.send(result); + break; } - - try { - result = jobs.execute(packet.cmd, packet.items); - } catch (e) { - self.send(packet.job, 'response', [fromError(e)]); - return; - } - - self.send(packet.job, 'response', [null, result]); }); }; -/* - * Helpers - */ - -function fromError(err) { - return [err.message, err.stack + '', err.type]; -} - /* * Expose */ diff --git a/lib/workers/packets.js b/lib/workers/packets.js new file mode 100644 index 00000000..053bd769 --- /dev/null +++ b/lib/workers/packets.js @@ -0,0 +1,823 @@ +/*! + * packets.js - worker packets for bcoin + * Copyright (c) 2014-2016, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +var assert = require('assert'); +var util = require('../utils/util'); +var BufferReader = require('../utils/reader'); +var Script = require('../script/script'); +var Witness = require('../script/witness'); +var Coin = require('../primitives/coin'); + +/* + * Constants + */ + +var packetTypes = { + EVENT: 0, + LOG: 1, + ERROR: 2, + ERRORRESULT: 3, + VERIFY: 4, + VERIFYRESULT: 5, + SIGN: 6, + SIGNRESULT: 7, + VERIFYINPUT: 8, + VERIFYINPUTRESULT: 9, + SIGNINPUT: 10, + SIGNINPUTRESULT: 11, + ECVERIFY: 12, + ECVERIFYRESULT: 13, + ECSIGN: 14, + ECSIGNRESULT: 15, + MINE: 16, + MINERESULT: 17, + SCRYPT: 18, + SCRYPTRESULT: 19 +}; + +/** + * Packet + * @constructor + */ + +function Packet() { + this.id = ++Packet.id >>> 0; + this.error = null; +} + +Packet.id = 0; + +Packet.prototype.cmd = -1; + +Packet.prototype.toRaw = function toRaw() { + throw new Error('Abstract method.'); +}; + +/** + * EventPacket + * @constructor + */ + +function EventPacket(items) { + Packet.call(this); + this.items = items || []; +} + +util.inherits(EventPacket, Packet); + +EventPacket.prototype.cmd = packetTypes.EVENT; + +EventPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarString(JSON.stringify(this.items), 'utf8'); +}; + +EventPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new EventPacket(); + packet.items = JSON.parse(br.readVarString('utf8')); + return packet; +}; + +/** + * LogPacket + * @constructor + */ + +function LogPacket(items) { + Packet.call(this); + this.items = items || []; +} + +util.inherits(LogPacket, Packet); + +LogPacket.prototype.cmd = packetTypes.LOG; + +LogPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarString(JSON.stringify(this.items), 'utf8'); +}; + +LogPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new LogPacket(); + packet.items = JSON.parse(br.readVarString('utf8')); + return packet; +}; + +/** + * ErrorPacket + * @constructor + */ + +function ErrorPacket(error) { + Packet.call(this); + this.error = error || new Error(); +} + +util.inherits(ErrorPacket, Packet); + +ErrorPacket.prototype.cmd = packetTypes.ERROR; + +ErrorPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarString(this.error.message + '', 'utf8'); + bw.writeVarString(this.error.stack + '', 'utf8'); + bw.writeVarString((this.error.type || ''), 'utf8'); +}; + +ErrorPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ErrorPacket(); + packet.error.message = br.readVarString('utf8'); + packet.error.stack = br.readVarString('utf8'); + packet.error.type = br.readVarString('utf8'); + return packet; +}; + +/** + * ErrorResultPacket + * @constructor + */ + +function ErrorResultPacket(error) { + Packet.call(this); + this.error = error || new Error(); +} + +util.inherits(ErrorResultPacket, Packet); + +ErrorResultPacket.prototype.cmd = packetTypes.ERRORRESULT; + +ErrorResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarString(this.error.message + '', 'utf8'); + bw.writeVarString(this.error.stack + '', 'utf8'); + bw.writeVarString((this.error.type || ''), 'utf8'); +}; + +ErrorResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ErrorResultPacket(); + packet.error.message = br.readVarString('utf8'); + packet.error.stack = br.readVarString('utf8'); + packet.error.type = br.readVarString('utf8'); + return packet; +}; + +/** + * VerifyPacket + * @constructor + */ + +function VerifyPacket(tx, flags) { + Packet.call(this); + this.tx = tx || null; + this.flags = flags != null ? flags : null; +} + +util.inherits(VerifyPacket, Packet); + +VerifyPacket.prototype.cmd = packetTypes.VERIFY; + +VerifyPacket.prototype.toRaw = function(bw) { + frameTX(this.tx, bw); + bw.writeU32(this.flags); +}; + +VerifyPacket.fromRaw = function fromRaw(TX, data) { + var br = new BufferReader(data, true); + var packet = new VerifyPacket(); + packet.tx = parseTX(TX, br); + packet.flags = br.readU32(); + return packet; +}; + +/** + * VerifyResultPacket + * @constructor + */ + +function VerifyResultPacket(value) { + Packet.call(this); + this.value = value; +} + +util.inherits(VerifyResultPacket, Packet); + +VerifyResultPacket.prototype.cmd = packetTypes.VERIFYRESULT; + +VerifyResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeU8(this.value ? 1 : 0); +}; + +VerifyResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new VerifyResultPacket(); + packet.value = br.readU8() === 1; + return packet; +}; + +/** + * SignPacket + * @constructor + */ + +function SignPacket(tx, rings, type) { + Packet.call(this); + this.tx = tx || null; + this.rings = rings || []; + this.type = type != null ? type : null; +} + +util.inherits(SignPacket, Packet); + +SignPacket.prototype.cmd = packetTypes.SIGN; + +SignPacket.prototype.toRaw = function toRaw(bw) { + var i, ring; + + frameTX(this.tx, bw); + + bw.writeU32(this.rings.length); + + for (i = 0; i < this.rings.length; i++) { + ring = this.rings[i]; + bw.writeBytes(ring.toRaw()); + } + + bw.write8(this.type != null ? this.type : -1); +}; + +SignPacket.fromRaw = function fromRaw(MTX, KeyRing, data) { + var br = new BufferReader(data, true); + var packet = new SignPacket(); + var i, count, ring; + + packet.tx = parseTX(MTX, br); + + count = br.readU32(); + + for (i = 0; i < count; i++) { + ring = KeyRing.fromRaw(br); + packet.rings.push(ring); + } + + packet.type = br.read8(); + + if (packet.type === -1) + packet.type = null; + + return packet; +}; + +/** + * SignResultPacket + * @constructor + */ + +function SignResultPacket(total, witness, script) { + Packet.call(this); + this.total = total || 0; + this.script = script || []; + this.witness = witness || []; +} + +util.inherits(SignResultPacket, Packet); + +SignResultPacket.prototype.cmd = packetTypes.SIGNRESULT; + +SignResultPacket.fromTX = function fromTX(tx, total) { + var packet = new SignResultPacket(total); + var i, input; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + packet.script.push(input.script); + packet.witness.push(input.witness); + } + + return packet; +}; + +SignResultPacket.prototype.toRaw = function toRaw(bw) { + var i; + + assert(this.script.length === this.witness.length); + + bw.writeVarint(this.total); + bw.writeVarint(this.script.length); + + for (i = 0; i < this.script.length; i++) { + this.script[i].toRaw(bw); + this.witness[i].toRaw(bw); + } +}; + +SignResultPacket.prototype.inject = function inject(tx) { + var i, input; + + assert(this.script.length === tx.inputs.length); + assert(this.witness.length === tx.inputs.length); + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + input.script = this.script[i]; + input.witness = this.witness[i]; + } +}; + +SignResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new SignResultPacket(); + var i, count; + + packet.total = br.readVarint(); + + count = br.readVarint(); + + for (i = 0; i < count; i++) { + packet.script.push(Script.fromRaw(br)); + packet.witness.push(Witness.fromRaw(br)); + } + + return packet; +}; + +/** + * VerifyInputPacket + * @constructor + */ + +function VerifyInputPacket(tx, index, flags) { + Packet.call(this); + this.tx = tx || null; + this.index = index; + this.flags = flags != null ? flags : null; +} + +util.inherits(VerifyInputPacket, Packet); + +VerifyInputPacket.prototype.cmd = packetTypes.VERIFYINPUT; + +VerifyInputPacket.prototype.toRaw = function toRaw(bw) { + frameTX(this.tx, bw); + bw.writeU32(this.index); + bw.write32(this.flags != null ? this.flags : -1); +}; + +VerifyInputPacket.fromRaw = function fromRaw(TX, data) { + var br = new BufferReader(data, true); + var packet = new VerifyInputPacket(); + + packet.tx = parseTX(TX, br); + packet.index = br.readU32(); + packet.flags = br.read32(); + + if (packet.flags === -1) + packet.flags = null; + + return packet; +}; + +/** + * VerifyInputResultPacket + * @constructor + */ + +function VerifyInputResultPacket(value) { + Packet.call(this); + this.value = value; +} + +util.inherits(VerifyInputResultPacket, Packet); + +VerifyInputResultPacket.prototype.cmd = packetTypes.VERIFYINPUTRESULT; + +VerifyInputResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeU8(this.value ? 1 : 0); +}; + +VerifyInputResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new VerifyInputResultPacket(); + packet.value = br.readU8() === 1; + return packet; +}; + +/** + * SignInputPacket + * @constructor + */ + +function SignInputPacket(tx, index, rings, type) { + Packet.call(this); + this.tx = tx || null; + this.index = index; + this.rings = rings || []; + this.type = type != null ? type : null; +} + +util.inherits(SignInputPacket, Packet); + +SignInputPacket.prototype.cmd = packetTypes.SIGNINPUT; + +SignInputPacket.prototype.toRaw = function toRaw(bw) { + var i, ring; + + frameTX(this.tx, bw); + bw.writeU32(this.index); + + bw.writeU32(this.rings.length); + + for (i = 0; i < this.rings.length; i++) { + ring = this.rings[i]; + bw.writeBytes(ring.toRaw()); + } + + bw.write8(this.type != null ? this.type : -1); +}; + +SignInputPacket.fromRaw = function fromRaw(MTX, KeyRing, data) { + var br = new BufferReader(data, true); + var packet = new SignInputPacket(); + var i, count, ring; + + packet.tx = parseTX(MTX, br); + packet.index = br.readU32(); + + count = br.readU32(); + + for (i = 0; i < count; i++) { + ring = KeyRing.fromRaw(br); + packet.rings.push(ring); + } + + packet.type = br.read8(); + + if (packet.type === -1) + packet.type = null; + + return packet; +}; + + +/** + * SignInputResultPacket + * @constructor + */ + +function SignInputResultPacket(value, witness, script) { + Packet.call(this); + this.value = value || false; + this.script = script || null; + this.witness = witness || null; +} + +util.inherits(SignInputResultPacket, Packet); + +SignInputResultPacket.prototype.cmd = packetTypes.SIGNINPUTRESULT; + +SignInputResultPacket.fromTX = function fromTX(tx, i, value) { + var packet = new SignInputResultPacket(value); + var input = tx.inputs[i]; + + assert(input); + + packet.script = input.script; + packet.witness = input.witness; + + return packet; +}; + +SignInputResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeU8(this.value ? 1 : 0); + this.script.toRaw(bw); + this.witness.toRaw(bw); +}; + +SignInputResultPacket.prototype.inject = function inject(tx, i) { + var input = tx.inputs[i]; + assert(input); + input.script = this.script; + input.witness = this.witness; +}; + +SignInputResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new SignInputResultPacket(); + packet.value = br.readU8() === 1; + packet.script = Script.fromRaw(br); + packet.witness = Witness.fromRaw(br); + return packet; +}; + + +/** + * ECVerifyPacket + * @constructor + */ + +function ECVerifyPacket(msg, sig, key) { + Packet.call(this); + this.msg = msg || null; + this.sig = sig || null; + this.key = key || null; +} + +util.inherits(ECVerifyPacket, Packet); + +ECVerifyPacket.prototype.cmd = packetTypes.ECVERIFY; + +ECVerifyPacket.prototype.toRaw = function(bw) { + bw.writeVarBytes(this.msg); + bw.writeVarBytes(this.sig); + bw.writeVarBytes(this.key); +}; + +ECVerifyPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ECVerifyPacket(); + packet.msg = br.readVarBytes(); + packet.sig = br.readVarBytes(); + packet.key = br.readVarBytes(); + return packet; +}; + +/** + * ECVerifyResultPacket + * @constructor + */ + +function ECVerifyResultPacket(value) { + Packet.call(this); + this.value = value; +} + +util.inherits(ECVerifyResultPacket, Packet); + +ECVerifyResultPacket.prototype.cmd = packetTypes.ECVERIFYRESULT; + +ECVerifyResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeU8(this.value ? 1 : 0); +}; + +ECVerifyResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ECVerifyResultPacket(); + packet.value = br.readU8() === 1; + return packet; +}; + +/** + * ECSignPacket + * @constructor + */ + +function ECSignPacket(msg, key) { + Packet.call(this); + this.msg = msg || null; + this.key = key || null; +} + +util.inherits(ECSignPacket, Packet); + +ECSignPacket.prototype.cmd = packetTypes.ECSIGN; + +ECSignPacket.prototype.toRaw = function(bw) { + bw.writeVarBytes(this.msg); + bw.writeVarBytes(this.key); +}; + +ECSignPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ECSignPacket(); + packet.msg = br.readVarBytes(); + packet.key = br.readVarBytes(); + return packet; +}; + +/** + * ECSignResultPacket + * @constructor + */ + +function ECSignResultPacket(sig) { + Packet.call(this); + this.sig = sig; +} + +util.inherits(ECSignResultPacket, Packet); + +ECSignResultPacket.prototype.cmd = packetTypes.ECSIGNRESULT; + +ECSignResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarBytes(this.sig); +}; + +ECSignResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ECSignResultPacket(); + packet.sig = br.readVarBytes(); + return packet; +}; + +/** + * MinePacket + * @constructor + */ + +function MinePacket(data, target, min, max) { + Packet.call(this); + this.data = data || null; + this.target = target || null; + this.min = min != null ? min : -1; + this.max = max != null ? max : -1; +} + +util.inherits(MinePacket, Packet); + +MinePacket.prototype.cmd = packetTypes.MINE; + +MinePacket.prototype.toRaw = function(bw) { + bw.writeBytes(this.data); + bw.writeBytes(this.target); + bw.writeU32(this.min); + bw.writeU32(this.max); +}; + +MinePacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new MinePacket(); + packet.data = br.readBytes(80); + packet.target = br.readBytes(32); + packet.min = br.readU32(); + packet.max = br.readU32(); + return packet; +}; + +/** + * MineResultPacket + * @constructor + */ + +function MineResultPacket(nonce) { + Packet.call(this); + this.nonce = nonce != null ? nonce : -1; +} + +util.inherits(MineResultPacket, Packet); + +MineResultPacket.prototype.cmd = packetTypes.MINERESULT; + +MineResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeU32(this.nonce); +}; + +MineResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new MineResultPacket(); + packet.nonce = br.readU32(); + if ((packet.nonce >> 0) === -1) + packet.nonce = -1; + return packet; +}; + +/** + * ScryptPacket + * @constructor + */ + +function ScryptPacket(passwd, salt, N, r, p, len) { + Packet.call(this); + this.passwd = passwd || null; + this.salt = salt || null; + this.N = N != null ? N : -1; + this.r = r != null ? r : -1; + this.p = p != null ? p : -1; + this.len = len != null ? len : -1; +} + +util.inherits(ScryptPacket, Packet); + +ScryptPacket.prototype.cmd = packetTypes.SCRYPT; + +ScryptPacket.prototype.toRaw = function(bw) { + bw.writeVarBytes(this.passwd); + bw.writeVarBytes(this.salt); + bw.writeU32(this.N); + bw.writeU32(this.r); + bw.writeU32(this.p); + bw.writeU32(this.len); +}; + +ScryptPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ScryptPacket(); + packet.passwd = br.readVarBytes(); + packet.salt = br.readVarBytes(); + packet.N = br.readU32(); + packet.r = br.readU32(); + packet.p = br.readU32(); + packet.len = br.readU32(); + return packet; +}; + +/** + * ScryptResultPacket + * @constructor + */ + +function ScryptResultPacket(key) { + Packet.call(this); + this.key = key || null; +} + +util.inherits(ScryptResultPacket, Packet); + +ScryptResultPacket.prototype.cmd = packetTypes.SCRYPTRESULT; + +ScryptResultPacket.prototype.toRaw = function toRaw(bw) { + bw.writeVarBytes(this.key); +}; + +ScryptResultPacket.fromRaw = function fromRaw(data) { + var br = new BufferReader(data, true); + var packet = new ScryptResultPacket(); + packet.key = br.readVarBytes(); + return packet; +}; + +/* + * Helpers + */ + +function frameTX(tx, bw) { + var i, input; + + tx.toRaw(bw); + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + + if (!input.coin) { + bw.writeU8(0); + continue; + } + + bw.writeU8(1); + bw.writeVarint(input.coin.value); + input.coin.script.toRaw(bw); + } +} + +function parseTX(TX, br) { + var tx = TX.fromRaw(br); + var i, input, prevout, coin; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + prevout = input.prevout; + + if (br.readU8() === 0) + continue; + + coin = new Coin(); + coin.value = br.readVarint(); + coin.script.fromRaw(br); + + coin.hash = prevout.hash; + coin.index = prevout.index; + + input.coin = coin; + } + + return tx; +} + +/* + * Expose + */ + +exports.types = packetTypes; +exports.EventPacket = EventPacket; +exports.LogPacket = LogPacket; +exports.ErrorPacket = ErrorPacket; +exports.ErrorResultPacket = ErrorResultPacket; +exports.VerifyPacket = VerifyPacket; +exports.VerifyResultPacket = VerifyResultPacket; +exports.SignPacket = SignPacket; +exports.SignResultPacket = SignResultPacket; +exports.VerifyInputPacket = VerifyInputPacket; +exports.VerifyInputResultPacket = VerifyInputResultPacket; +exports.SignInputPacket = SignInputPacket; +exports.SignInputResultPacket = SignInputResultPacket; +exports.ECVerifyPacket = ECVerifyPacket; +exports.ECVerifyResultPacket = ECVerifyResultPacket; +exports.ECSignPacket = ECSignPacket; +exports.ECSignResultPacket = ECSignResultPacket; +exports.MinePacket = MinePacket; +exports.MineResultPacket = MineResultPacket; +exports.ScryptPacket = ScryptPacket; +exports.ScryptResultPacket = ScryptResultPacket; diff --git a/lib/workers/parser-client.js b/lib/workers/parser-client.js index 9b296095..0afec599 100644 --- a/lib/workers/parser-client.js +++ b/lib/workers/parser-client.js @@ -23,22 +23,14 @@ function Parser() { return new Parser(); ServerParser.call(this); + + this.TX = TX; + this.MTX = MTX; + this.KeyRing = KeyRing; } util.inherits(Parser, ServerParser); -Parser.prototype.parseKeyRing = function parseKeyRing(data) { - return KeyRing.fromRaw(data); -}; - -Parser.prototype.parseMTX = function parseMTX(data) { - return MTX.fromExtended(data, true); -}; - -Parser.prototype.parseTX = function parseTX(data) { - return TX.fromExtended(data, true); -}; - /* * Expose */ diff --git a/lib/workers/parser.js b/lib/workers/parser.js index fcf21115..ea44f841 100644 --- a/lib/workers/parser.js +++ b/lib/workers/parser.js @@ -7,13 +7,10 @@ 'use strict'; -var EventEmitter = require('events').EventEmitter; -var BN = require('bn.js'); -var util = require('../utils/util'); var assert = require('assert'); -var BufferReader = require('../utils/reader'); -var Script = require('../script/script'); -var Witness = require('../script/witness'); +var EventEmitter = require('events').EventEmitter; +var util = require('../utils/util'); +var packets = require('./packets'); /** * Parser @@ -26,10 +23,14 @@ function Parser() { EventEmitter.call(this); - this.waiting = 25; - this.packet = null; + this.waiting = 9; + this.header = null; this.pending = []; this.total = 0; + + this.TX = null; + this.MTX = null; + this.KeyRing = null; } util.inherits(Parser, EventEmitter); @@ -91,27 +92,28 @@ Parser.prototype.read = function read(size) { }; Parser.prototype.parse = function parse(data) { - var packet = this.packet; + var header = this.header; + var packet; - if (!packet) { + if (!header) { try { - packet = this.parseHeader(data); + header = this.parseHeader(data); } catch (e) { this.emit('error', e); return; } - this.packet = packet; - this.waiting = packet.size + 1; + this.header = header; + this.waiting = header.size + 1; return; } - this.waiting = 25; - this.packet = null; + this.waiting = 9; + this.header = null; try { - packet.items = this.parseBody(data); + packet = this.parsePacket(header, data); } catch (e) { this.emit('error', e); return; @@ -122,95 +124,72 @@ Parser.prototype.parse = function parse(data) { return; } + packet.id = header.id; + this.emit('packet', packet); }; Parser.prototype.parseHeader = function parseHeader(data) { - var magic, job, len, cmd, size; - - magic = data.readUInt32LE(0, true); - - if (magic !== 0xdeadbeef) - throw new Error('Bad magic number: ' + magic.toString(16)); - - job = data.readUInt32LE(4, true); - - len = data[8]; - cmd = data.toString('ascii', 9, 9 + len); - - size = data.readUInt32LE(21, true); - - return new Packet(job, cmd, size); + var id = data.readUInt32LE(0, true); + var cmd = data.readUInt8(4, true); + var size = data.readUInt32LE(5, true); + return new Header(id, cmd, size); }; -Parser.prototype.parseBody = function parseBody(data) { - var br = BufferReader(data); - var i, count, items; - - switch (br.readU8()) { - case 0: - return null; - case 1: - return br.readVarString('utf8'); - case 2: - return br.read32(); - case 3: - return br.readU32(); - case 4: - return br.readU8() === 1; - case 5: - return br.readVarBytes(); - case 6: - items = []; - count = br.readVarint(); - for (i = 0; i < count; i++) - items.push(this.parseBody(br)); - return items; - case 7: - items = {}; - count = br.readVarint(); - for (i = 0; i < count; i++) - items[br.readVarString('utf8')] = this.parseBody(br); - return items; - case 10: - return new BN(br.readVarBytes()); - case 40: - return Script.fromRaw(br.readVarBytes()); - case 41: - return Witness.fromRaw(br); - case 42: - return this.parseMTX(br); - case 43: - return this.parseTX(br); - case 44: - return this.parseKeyRing(br); +Parser.prototype.parsePacket = function parsePacket(header, data) { + switch (header.cmd) { + case packets.types.EVENT: + return packets.EventPacket.fromRaw(data); + case packets.types.LOG: + return packets.LogPacket.fromRaw(data); + case packets.types.ERROR: + return packets.ErrorPacket.fromRaw(data); + case packets.types.VERIFY: + return packets.VerifyPacket.fromRaw(this.TX, data); + case packets.types.VERIFYRESULT: + return packets.VerifyResultPacket.fromRaw(data); + case packets.types.SIGN: + return packets.SignPacket.fromRaw(this.MTX, this.KeyRing, data); + case packets.types.SIGNRESULT: + return packets.SignResultPacket.fromRaw(data); + case packets.types.VERIFYINPUT: + return packets.VerifyInputPacket.fromRaw(this.TX, data); + case packets.types.VERIFYINPUTRESULT: + return packets.VerifyInputResultPacket.fromRaw(data); + case packets.types.SIGNINPUT: + return packets.SignInputPacket.fromRaw(this.MTX, this.KeyRing, data); + case packets.types.SIGNINPUTRESULT: + return packets.SignInputResultPacket.fromRaw(data); + case packets.types.ECVERIFY: + return packets.ECVerifyPacket.fromRaw(data); + case packets.types.ECVERIFYRESULT: + return packets.ECVerifyResultPacket.fromRaw(data); + case packets.types.ECSIGN: + return packets.ECSignPacket.fromRaw(data); + case packets.types.ECSIGNRESULT: + return packets.ECSignResultPacket.fromRaw(data); + case packets.types.MINE: + return packets.MinePacket.fromRaw(data); + case packets.types.MINERESULT: + return packets.MineResultPacket.fromRaw(data); + case packets.types.SCRYPT: + return packets.ScryptPacket.fromRaw(data); + case packets.types.SCRYPTRESULT: + return packets.ScryptResultPacket.fromRaw(data); default: - throw new Error('Bad type.'); + throw new Error('Unknown packet.'); } }; -Parser.prototype.parseKeyRing = function parseKeyRing(data) { - throw new Error('KeyRing in client output.'); -}; - -Parser.prototype.parseMTX = function parseMTX(data) { - throw new Error('MTX in client output.'); -}; - -Parser.prototype.parseTX = function parseTX(data) { - throw new Error('TX in client output.'); -}; - /** - * Packet + * Header * @constructor */ -function Packet(job, cmd, size) { - this.job = job; +function Header(id, cmd, size) { + this.id = id; this.cmd = cmd; this.size = size; - this.items = null; } /* diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index 29557f8f..69470b06 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -7,6 +7,7 @@ 'use strict'; +var assert = require('assert'); var EventEmitter = require('events').EventEmitter; var util = require('../utils/util'); var co = require('../utils/co'); @@ -15,6 +16,7 @@ var Network = require('../protocol/network'); var jobs = require('./jobs'); var Parser = require('./parser'); var Framer = require('./framer'); +var packets = require('./packets'); var os = require('os'); var cp = require('child_process'); @@ -224,20 +226,20 @@ WorkerPool.prototype.destroy = function destroy() { /** * Call a method for a worker to execute. - * @param {String} method - Method name. - * @param {Array} args - Arguments. + * @param {Packet} packet + * @param {Number} timeout * @returns {Promise} * the worker method specifies. */ -WorkerPool.prototype.execute = function execute(method, args, timeout) { +WorkerPool.prototype.execute = function execute(packet, timeout) { var result, child; if (!this.enabled || !WorkerPool.support) { return new Promise(function(resolve, reject) { util.nextTick(function() { try { - result = jobs.execute(method, args); + result = jobs._execute(packet); } catch (e) { reject(e); return; @@ -252,7 +254,7 @@ WorkerPool.prototype.execute = function execute(method, args, timeout) { child = this.alloc(); - return child.execute(method, args, timeout); + return child.execute(packet, timeout); }; /** @@ -262,21 +264,11 @@ WorkerPool.prototype.execute = function execute(method, args, timeout) { * @returns {Promise} - Returns Boolean. */ -WorkerPool.prototype.verify = function verify(tx, flags) { - return this.execute('verify', [tx, flags], -1); -}; - -/** - * Execute the tx input verification job (default timeout). - * @param {TX} tx - * @param {Number} index - * @param {VerifyFlags} flags - * @returns {Promise} - Returns Boolean. - */ - -WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) { - return this.execute('verifyInput', [tx, index, flags], -1); -}; +WorkerPool.prototype.verify = co(function* verify(tx, flags) { + var packet = new packets.VerifyPacket(tx, flags); + var result = yield this.execute(packet, -1); + return result.value; +}); /** * Execute the tx signing job (default timeout). @@ -287,43 +279,56 @@ WorkerPool.prototype.verifyInput = function verifyInput(tx, index, flags) { */ WorkerPool.prototype.sign = co(function* sign(tx, ring, type) { - var i, result, input, sig, sigs, total; + var rings = ring; + var packet, result; - result = yield this.execute('sign', [tx, ring, type], -1); + if (!Array.isArray(rings)) + rings = [rings]; - sigs = result[0]; - total = result[1]; + packet = new packets.SignPacket(tx, rings, type); + result = yield this.execute(packet, -1); - for (i = 0; i < sigs.length; i++) { - sig = sigs[i]; - input = tx.inputs[i]; - input.script = sig[0]; - input.witness = sig[1]; - } + result.inject(tx); - return total; + return result.total; +}); + +/** + * Execute the tx input verification job (default timeout). + * @param {TX} tx + * @param {Number} index + * @param {VerifyFlags} flags + * @returns {Promise} - Returns Boolean. + */ + +WorkerPool.prototype.verifyInput = co(function* verifyInput(tx, index, flags) { + var packet = new packets.VerifyInputPacket(tx, index, flags); + var result = yield this.execute(packet, -1); + return result.value; }); /** * Execute the tx input signing job (default timeout). * @param {MTX} tx * @param {Number} index - * @param {Buffer} key + * @param {KeyRing} ring * @param {SighashType} type * @returns {Promise} */ -WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) { - var sig = yield this.execute('signInput', [tx, index, key, type], -1); - var input = tx.inputs[index]; +WorkerPool.prototype.signInput = co(function* signInput(tx, index, ring, type) { + var rings = ring; + var packet, result; - if (!sig) - return false; + if (!Array.isArray(rings)) + rings = [rings]; - input.script = sig[0]; - input.witness = sig[1]; + packet = new packets.SignInputPacket(tx, index, rings, type); + result = yield this.execute(packet, -1); - return true; + result.inject(tx); + + return result.value; }); /** @@ -334,9 +339,11 @@ WorkerPool.prototype.signInput = co(function* signInput(tx, index, key, type) { * @returns {Promise} */ -WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) { - return this.execute('ecVerify', [msg, sig, key], -1); -}; +WorkerPool.prototype.ecVerify = co(function* ecVerify(msg, sig, key) { + var packet = new packets.ECVerifyPacket(msg, sig, key); + var result = yield this.execute(packet, -1); + return result.value; +}); /** * Execute the ec signing job (no timeout). @@ -345,9 +352,11 @@ WorkerPool.prototype.ecVerify = function ecVerify(msg, sig, key) { * @returns {Promise} */ -WorkerPool.prototype.ecSign = function ecSign(msg, key) { - return this.execute('ecSign', [msg, key], -1); -}; +WorkerPool.prototype.ecSign = co(function* ecSign(msg, key) { + var packet = new packets.SignPacket(msg, key); + var result = yield this.execute(packet, -1); + return result.sig; +}); /** * Execute the mining job (no timeout). @@ -358,9 +367,11 @@ WorkerPool.prototype.ecSign = function ecSign(msg, key) { * @returns {Promise} - Returns {Number}. */ -WorkerPool.prototype.mine = function mine(data, target, min, max) { - return this.execute('mine', [data, target, min, max], -1); -}; +WorkerPool.prototype.mine = co(function* mine(data, target, min, max) { + var packet = new packets.MinePacket(data, target, min, max); + var result = yield this.execute(packet, -1); + return result.nonce; +}); /** * Execute scrypt job (no timeout). @@ -374,9 +385,11 @@ WorkerPool.prototype.mine = function mine(data, target, min, max) { * @returns {Buffer} */ -WorkerPool.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { - return this.execute('scrypt', [passwd, salt, N, r, p, len], -1); -}; +WorkerPool.prototype.scrypt = co(function* scrypt(passwd, salt, N, r, p, len) { + var packet = new packets.ScryptPacket(passwd, salt, N, r, p, len); + var result = yield this.execute(packet, -1); + return result.key; +}); /** * Represents a worker. @@ -394,9 +407,9 @@ function Worker(id) { this.framer = new Framer(); this.parser = new Parser(); this.setMaxListeners(util.MAX_SAFE_INTEGER); - this.uid = 0; this.id = id != null ? id : -1; this.child = null; + this.pending = {}; this._init(); } @@ -471,6 +484,14 @@ Worker.prototype._init = function _init() { }); } + this.on('exit', function() { + self.killJobs(); + }); + + this.on('error', function() { + self.killJobs(); + }); + this.on('data', function(data) { self.parser.feed(data); }); @@ -494,44 +515,32 @@ Worker.prototype._init = function _init() { Worker.prototype._bind = function _bind() { var self = this; - this.on('worker error', function(err) { - self.emit('error', toError(err)); - }); - this.on('exit', function(code) { var i = WorkerPool.children.indexOf(self); if (i !== -1) WorkerPool.children.splice(i, 1); }); - this.on('log', function(items) { - util.log('Worker %d:', self.id); - util.log.apply(util, items); - }); - this.on('packet', function(packet) { - var err, result; - - if (packet.cmd === 'event') { - self.emit.apply(self, packet.items); - self.emit('event', packet.items); - return; + switch (packet.cmd) { + case packets.types.EVENT: + self.emit.apply(self, packet.items); + self.emit('event', packet.items); + break; + case packets.types.LOG: + util.log('Worker %d:', self.id); + util.log.apply(util, packet.items); + break; + case packets.types.ERROR: + self.emit('error', packet.error); + break; + case packets.types.ERRORRESULT: + self.rejectJob(packet.id, packet.error); + break; + default: + self.resolveJob(packet.id, packet); + break; } - - if (packet.cmd === 'response') { - err = packet.items[0]; - result = packet.items[1]; - - if (err) - err = toError(err); - - self.emit('response ' + packet.job, err, result); - - return; - } - - err = new Error('Unknown packet: ' + packet.cmd); - self.emit('error', err); }); WorkerPool.children.push(this); @@ -560,14 +569,12 @@ Worker.prototype.write = function write(data) { /** * Frame and send a packet. - * @param {String} job - * @param {String} cmd - * @param {Array} items + * @param {Packet} packet * @returns {Boolean} */ -Worker.prototype.send = function send(job, cmd, items) { - return this.write(this.framer.packet(job, cmd, items)); +Worker.prototype.send = function send(packet) { + return this.write(this.framer.packet(packet)); }; /** @@ -584,7 +591,7 @@ Worker.prototype.sendEvent = function sendEvent() { for (i = 0; i < items.length; i++) items[i] = arguments[i]; - return this.send(0, 'event', items); + return this.send(new packets.EventPacket(items)); }; /** @@ -602,66 +609,121 @@ Worker.prototype.destroy = function destroy() { /** * Call a method for a worker to execute. - * @private - * @param {Number} job - Job ID. - * @param {String} method - Method name. - * @param {Array} args - Arguments. + * @param {Packet} packet + * @param {Number} timeout * @returns {Promise} - * the worker method specifies. */ -Worker.prototype._execute = function _execute(method, args, timeout, callback) { +Worker.prototype.execute = function execute(packet, timeout) { var self = this; - var job = this.uid; - var event, timer; - - if (++this.uid === 0x100000000) - this.uid = 0; - - event = 'response ' + job; - - function listener(err, result) { - if (timer) { - clearTimeout(timer); - timer = null; - } - self.removeListener(event, listener); - self.removeListener('error', listener); - self.removeListener('exit', exitListener); - callback(err, result); - } - - function exitListener(code) { - listener(new Error('Worker exited: ' + code)); - } - - this.once(event, listener); - this.once('error', listener); - this.once('exit', exitListener); - - if (timeout !== -1) { - timer = setTimeout(function() { - listener(new Error('Worker timed out.')); - }, timeout); - } - - this.send(job, method, args); + return new Promise(function(resolve, reject) { + self._execute(packet, timeout, co.wrap(resolve, reject)); + }); }; /** * Call a method for a worker to execute. - * @param {Number} job - Job ID. - * @param {String} method - Method name. - * @param {Array} args - Arguments. - * @returns {Promise} + * @private + * @param {Packet} packet + * @param {Number} timeout + * @param {Function} callback * the worker method specifies. */ -Worker.prototype.execute = function execute(method, args, timeout) { +Worker.prototype._execute = function _execute(packet, timeout, callback) { + var job = new PendingJob(this, packet.id, callback); + + assert(!this.pending[packet.id]); + + this.pending[packet.id] = job; + + job.start(timeout); + + this.send(packet); +}; + +/** + * Resolve a job. + * @param {Number} id + * @param {Packet} result + */ + +Worker.prototype.resolveJob = function resolveJob(id, result) { + var job = this.pending[id]; + assert(job); + job.finish(null, result); +}; + +/** + * Reject a job. + * @param {Number} id + * @param {Error} err + */ + +Worker.prototype.rejectJob = function rejectJob(id, err) { + var job = this.pending[id]; + assert(job); + job.finish(err); +}; + +/** + * Kill all jobs associated with worker. + */ + +Worker.prototype.killJobs = function killJobs() { + var keys = Object.keys(this.pending); + var i, key, job; + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + job = this.pending[key]; + job.destroy(); + } +}; + +/** + * PendingWorker + * @constructor + */ + +function PendingJob(worker, id, callback) { + this.worker = worker; + this.id = id; + this.callback = callback; + this.timer = null; +} + +PendingJob.prototype.start = function start(timeout) { var self = this; - return new Promise(function(resolve, reject) { - self._execute(method, args, timeout, co.wrap(resolve, reject)); - }); + + if (timeout === -1) + return; + + this.timer = setTimeout(function() { + self.finish(new Error('Worker timed out.')); + }, timeout); +}; + +PendingJob.prototype.destroy = function destroy() { + this.finish(new Error('Job was destroyed.')); +}; + +PendingJob.prototype.finish = function finish(err, result) { + var callback = this.callback; + + assert(callback, 'Already finished.'); + + this.callback = null; + + if (this.timer != null) { + clearTimeout(this.timer); + this.timer = null; + } + + assert(this.worker.pending[this.id]); + delete this.worker.pending[this.id]; + + callback(err, result); }; /* @@ -675,13 +737,6 @@ function getCores() { return os.cpus().length; } -function toError(values) { - var err = new Error(values[0]); - err.stack = values[1]; - err.type = values[2]; - return err; -} - /* * Default */ diff --git a/test/chain-test.js b/test/chain-test.js index 1cbc6d0f..90e39058 100644 --- a/test/chain-test.js +++ b/test/chain-test.js @@ -15,7 +15,7 @@ describe('Chain', function() { var chain, wallet, node, miner, walletdb; var tip1, tip2, cb1, cb2, mineBlock; - this.timeout(8000); + this.timeout(5000); node = new bcoin.fullnode({ db: 'memory', apiKey: 'foo' }); // node.walletdb.client = new Client({ apiKey: 'foo', network: 'regtest' });