diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index aa8a9d42..49fe3e2f 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -781,7 +781,7 @@ Chain.prototype.reorganizeSPV = co(function* reorganizeSPV(competitor, block) { entry = disconnect[i]; headers = entry.toHeaders(); view = new CoinView(); - this.emit('disconnect', entry, headers, view); + yield this.fire('disconnect', entry, headers, view); } this.logger.warning( @@ -825,7 +825,7 @@ Chain.prototype.disconnect = co(function* disconnect(entry) { this.height = prev.height; this.emit('tip', prev); - this.emit('disconnect', entry, block, view); + yield this.fire('disconnect', entry, block, view); }); /** @@ -874,7 +874,7 @@ Chain.prototype.reconnect = co(function* reconnect(entry) { this.emit('tip', entry); this.emit('reconnect', entry, block); - this.emit('connect', entry, block, result.view); + yield this.fire('connect', entry, block, result.view); }); /** @@ -941,7 +941,7 @@ Chain.prototype.setBestChain = co(function* setBestChain(entry, block, prev, fla this.emit('tip', entry); this.emit('block', block, entry); - this.emit('connect', entry, block, result.view); + yield this.fire('connect', entry, block, result.view); }); /** @@ -1040,7 +1040,7 @@ Chain.prototype._reset = co(function* reset(block, silent) { this.emit('tip', tip); if (!silent) - this.emit('reset', tip); + yield this.fire('reset', tip); // Reset the orphan map completely. There may // have been some orphans on a forked chain we diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index 8e805487..32ba4bfb 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -187,9 +187,9 @@ FullNode.prototype._init = function _init() { self.emit('tx', tx); }); - this.chain.on('connect', co(function* (entry, block) { + this.chain.hook('connect', co(function* (entry, block) { try { - yield self.mempool.addBlock(entry, block.txs); + yield self.mempool._addBlock(entry, block.txs); } catch (e) { self.error(e); } @@ -197,18 +197,18 @@ FullNode.prototype._init = function _init() { self.emit('connect', entry, block); })); - this.chain.on('disconnect', co(function* (entry, block) { + this.chain.hook('disconnect', co(function* (entry, block) { try { - yield self.mempool.removeBlock(entry, block.txs); + yield self.mempool._removeBlock(entry, block.txs); } catch (e) { self.error(e); } self.emit('disconnect', entry, block); })); - this.chain.on('reset', co(function* (tip) { + this.chain.hook('reset', co(function* (tip) { try { - yield self.mempool.reset(); + yield self.mempool._reset(); } catch (e) { self.error(e); } diff --git a/lib/utils/asyncobject.js b/lib/utils/asyncobject.js index b37a1c24..e8c1b0d7 100644 --- a/lib/utils/asyncobject.js +++ b/lib/utils/asyncobject.js @@ -6,11 +6,11 @@ 'use strict'; +var assert = require('assert'); +var EventEmitter = require('events').EventEmitter; var util = require('./util'); var co = require('./co'); var Lock = require('./lock'); -var assert = require('assert'); -var EventEmitter = require('events').EventEmitter; /** * An abstract object that handles state and @@ -28,6 +28,7 @@ function AsyncObject() { EventEmitter.call(this); this._asyncLock = new Lock(); + this._hooks = Object.create(null); this.loading = false; this.closing = false; @@ -152,6 +153,81 @@ AsyncObject.prototype._close = function _close(callback) { throw new Error('Abstract method.'); }; +/** + * Add a hook listener. + * @param {String} type + * @param {Function} handler + */ + +AsyncObject.prototype.hook = function hook(type, handler) { + assert(typeof type === 'string', '`type` must be a string.'); + + if (!this._hooks[type]) + this._hooks[type] = []; + + this._hooks[type].push(handler); +}; + +/** + * Emit events and hooks for type. + * @method + * @param {String} type + * @param {...Object} args + * @returns {Promise} + */ + +AsyncObject.prototype.fire = function fire() { + this.emit.apply(this, arguments); + return this.fireHook.apply(this, arguments); +}; + +/** + * Emit an asynchronous event (hook). + * Wait for promises to resolve. + * @method + * @param {String} type + * @param {...Object} args + * @returns {Promise} + */ + +AsyncObject.prototype.fireHook = co(function* fireHook(type) { + var i, j, listeners, args, handler; + + assert(typeof type === 'string', '`type` must be a string.'); + + listeners = this._hooks[type]; + + if (!listeners || listeners.length === 0) + return; + + for (i = 0; i < listeners.length; i++) { + handler = listeners[i]; + + switch (arguments.length) { + case 1: + yield handler(); + break; + case 2: + yield handler(arguments[1]); + break; + case 3: + yield handler(arguments[1], arguments[2]); + break; + case 4: + yield handler(arguments[1], arguments[2], arguments[3]); + break; + default: + if (!args) { + args = new Array(arguments.length - 1); + for (j = 1; j < arguments.length; j++) + args[j - 1] = arguments[j]; + } + yield handler.apply(null, args); + break; + } + } +}); + /* * Expose */