diff --git a/lib/net/bip150.js b/lib/net/bip150.js index 1a8c05f8..1b714743 100644 --- a/lib/net/bip150.js +++ b/lib/net/bip150.js @@ -65,7 +65,7 @@ function BIP150(bip151, hostname, outbound, db, identity) { this.challengeSent = false; this.auth = false; this.completed = false; - this.callback = null; + this.job = null; this.timeout = null; } @@ -235,44 +235,56 @@ BIP150.prototype.destroy = function destroy() { } }; -BIP150.prototype.complete = function complete(err) { +BIP150.prototype.cleanup = function cleanup(err) { + var job = this.job; + assert(!this.completed, 'Already completed.'); - assert(this.callback, 'No completion callback.'); + assert(job, 'No completion job.'); this.completed = true; + this.job = null; if (this.timeout != null) { clearTimeout(this.timeout); this.timeout = null; } - this.callback(err); - this.callback = null; + return job; +}; + +BIP150.prototype.resolve = function resolve(result) { + var job = this.cleanup(); + job.resolve(result); +}; + +BIP150.prototype.reject = function reject(err) { + var job = this.cleanup(); + job.reject(err); }; BIP150.prototype.wait = function wait(timeout) { var self = this; return new Promise(function(resolve, reject) { - self._wait(timeout, co.wrap(resolve, reject)); + self._wait(timeout, resolve, reject); }); }; -BIP150.prototype._wait = function wait(timeout, callback) { +BIP150.prototype._wait = function wait(timeout, resolve, reject) { var self = this; assert(!this.auth, 'Cannot wait for init after handshake.'); - this.callback = callback; + this.job = co.job(resolve, reject); if (this.outbound && !this.peerIdentity) - return this.complete(new Error('No identity for ' + this.hostname + '.')); + return this.reject(new Error('No identity for ' + this.hostname + '.')); this.timeout = setTimeout(function() { - self.complete(new Error('BIP150 handshake timed out.')); + self.reject(new Error('BIP150 handshake timed out.')); }, timeout); this.once('auth', function() { - self.complete(); + self.resolve(); }); }; diff --git a/lib/net/bip151.js b/lib/net/bip151.js index b55b0aa5..1e58e66d 100644 --- a/lib/net/bip151.js +++ b/lib/net/bip151.js @@ -292,7 +292,7 @@ BIP151Stream.prototype.verify = function verify(tag) { * @property {Boolean} initSent * @property {Boolean} ackSent * @property {Object} timeout - * @property {Function} callback + * @property {Job} job * @property {Boolean} completed * @property {Boolean} handshake */ @@ -311,7 +311,7 @@ function BIP151(cipher) { this.initSent = false; this.ackSent = false; this.timeout = null; - this.callback = null; + this.job = null; this.completed = false; this.handshake = false; @@ -435,22 +435,43 @@ BIP151.prototype.encack = function encack(publicKey) { }; /** - * Complete the timeout for handshake, - * possibly with an error. - * @param {Error?} err + * Cleanup handshake job. + * @returns {Job} */ -BIP151.prototype.complete = function complete(err) { +BIP151.prototype.cleanup = function cleanup() { + var job = this.job; + assert(!this.completed, 'Already completed.'); - assert(this.callback, 'No completion callback.'); + assert(job, 'No completion job.'); this.completed = true; + this.job = null; clearTimeout(this.timeout); this.timeout = null; - this.callback(err); - this.callback = null; + return job; +}; + +/** + * Complete the timeout for handshake. + * @param {Object} result + */ + +BIP151.prototype.resolve = function resolve(result) { + var job = this.cleanup(); + job.resolve(result); +}; + +/** + * Complete the timeout for handshake with error. + * @param {Error} err + */ + +BIP151.prototype.reject = function reject(err) { + var job = this.cleanup(); + job.reject(err); }; /** @@ -462,28 +483,31 @@ BIP151.prototype.complete = function complete(err) { BIP151.prototype.wait = function wait(timeout) { var self = this; return new Promise(function(resolve, reject) { - self._wait(timeout, co.wrap(resolve, reject)); + self._wait(timeout, resolve, reject); }); }; /** * Set a timeout and wait for handshake to complete. * @private + * @param {Number} timeout + * @param {Function} resolve + * @param {Function} reject */ -BIP151.prototype._wait = function wait(timeout, callback) { +BIP151.prototype._wait = function wait(timeout, resolve, reject) { var self = this; assert(!this.handshake, 'Cannot wait for init after handshake.'); - this.callback = callback; + this.job = co.job(resolve, reject); this.timeout = setTimeout(function() { - self.complete(new Error('BIP151 handshake timed out.')); + self.reject(new Error('BIP151 handshake timed out.')); }, timeout); this.once('handshake', function() { - self.complete(); + self.resolve(); }); }; diff --git a/lib/net/peer.js b/lib/net/peer.js index 7217f9d3..b296a653 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -2977,7 +2977,7 @@ function RequestEntry() { } RequestEntry.prototype.addJob = function addJob(resolve, reject) { - this.jobs.push(new Job(resolve, reject)); + this.jobs.push(co.job(resolve, reject)); }; RequestEntry.prototype.setTimeout = function setTimeout(timeout) { @@ -3006,16 +3006,6 @@ RequestEntry.prototype.resolve = function resolve(result) { this.jobs.length = 0; }; -/** - * Job - * @constructor - */ - -function Job(resolve, reject) { - this.resolve = resolve; - this.reject = reject; -} - /* * Expose */ diff --git a/lib/net/pool.js b/lib/net/pool.js index 93a5ed32..72bd01f8 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1158,7 +1158,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { * @returns {Promise} */ -Pool.prototype.handleBlockInv = co(function* handleBlockInv(hashes, peer) { +Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { var unlock = yield this.locker.lock(); try { return yield this._handleBlockInv(peer, hashes); @@ -2135,7 +2135,7 @@ util.inherits(BroadcastItem, EventEmitter); */ BroadcastItem.prototype.addJob = function addJob(resolve, reject) { - this.jobs.push(new Job(resolve, reject)); + this.jobs.push(co.job(resolve, reject)); }; /** @@ -2289,16 +2289,6 @@ BroadcastItem.prototype.inspect = function inspect() { + '>'; }; -/** - * Job - * @constructor - */ - -function Job(resolve, reject) { - this.resolve = resolve; - this.reject = reject; -} - /* * Expose */ diff --git a/lib/utils/co.js b/lib/utils/co.js index ed2f0072..9a2c28bf 100644 --- a/lib/utils/co.js +++ b/lib/utils/co.js @@ -293,6 +293,29 @@ every = co(function* every(jobs) { return true; }); +/** + * Create a job object. + * @returns {Job} + */ + +function job(resolve, reject) { + return new Job(resolve, reject); +} + +/** + * Job + * @constructor + * @param {Function} resolve + * @param {Function} reject + * @property {Function} resolve + * @property {Function} reject + */ + +function Job(resolve, reject) { + this.resolve = resolve; + this.reject = reject; +} + /* * This drives me nuts. */ @@ -324,5 +347,6 @@ exports.wrap = wrap; exports.call = call; exports.promisify = promisify; exports.every = every; +exports.job = job; module.exports = exports; diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index e5d1c421..595ffc32 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -662,7 +662,7 @@ Worker.prototype.destroy = function destroy() { Worker.prototype.execute = function execute(packet, timeout) { var self = this; return new Promise(function(resolve, reject) { - self._execute(packet, timeout, co.wrap(resolve, reject)); + self._execute(packet, timeout, resolve, reject); }); }; @@ -671,12 +671,13 @@ Worker.prototype.execute = function execute(packet, timeout) { * @private * @param {Packet} packet * @param {Number} timeout - * @param {Function} callback + * @param {Function} resolve + * @param {Function} reject * the worker method specifies. */ -Worker.prototype._execute = function _execute(packet, timeout, callback) { - var job = new PendingJob(this, packet.id, callback); +Worker.prototype._execute = function _execute(packet, timeout, resolve, reject) { + var job = new PendingJob(this, packet.id, resolve, reject); assert(!this.pending[packet.id], 'ID overflow.'); @@ -699,7 +700,7 @@ Worker.prototype.resolveJob = function resolveJob(id, result) { if (!job) throw new Error('Job ' + id + ' is not in progress.'); - job.finish(null, result); + job.resolve(result); }; /** @@ -714,7 +715,7 @@ Worker.prototype.rejectJob = function rejectJob(id, err) { if (!job) throw new Error('Job ' + id + ' is not in progress.'); - job.finish(err); + job.reject(err); }; /** @@ -737,13 +738,14 @@ Worker.prototype.killJobs = function killJobs() { * @constructor * @param {Worker} worker * @param {Number} id - * @param {Function} callback + * @param {Function} resolve + * @param {Function} reject */ -function PendingJob(worker, id, callback) { +function PendingJob(worker, id, resolve, reject) { this.worker = worker; this.id = id; - this.callback = callback; + this.job = co.job(resolve, reject); this.timer = null; } @@ -759,7 +761,7 @@ PendingJob.prototype.start = function start(timeout) { return; this.timer = setTimeout(function() { - self.finish(new Error('Worker timed out.')); + self.reject(new Error('Worker timed out.')); }, timeout); }; @@ -768,21 +770,20 @@ PendingJob.prototype.start = function start(timeout) { */ PendingJob.prototype.destroy = function destroy() { - this.finish(new Error('Job was destroyed.')); + this.reject(new Error('Job was destroyed.')); }; /** - * Complete job with error and result. - * @param {Error} err - * @param {Object} reject + * Cleanup job state. + * @returns {Job} */ -PendingJob.prototype.finish = function finish(err, result) { - var callback = this.callback; +PendingJob.prototype.cleanup = function cleanup() { + var job = this.job; - assert(callback, 'Already finished.'); + assert(job, 'Already finished.'); - this.callback = null; + this.job = null; if (this.timer != null) { clearTimeout(this.timer); @@ -792,7 +793,27 @@ PendingJob.prototype.finish = function finish(err, result) { assert(this.worker.pending[this.id]); delete this.worker.pending[this.id]; - callback(err, result); + return job; +}; + +/** + * Complete job with result. + * @param {Object} result + */ + +PendingJob.prototype.resolve = function resolve(result) { + var job = this.cleanup(); + job.resolve(result); +}; + +/** + * Complete job with error. + * @param {Error} err + */ + +PendingJob.prototype.reject = function reject(err) { + var job = this.cleanup(); + job.reject(err); }; /*