From 97a2f2e86ae038f47763e4b5c4de946a01f95af6 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 17 Aug 2016 23:01:46 -0700 Subject: [PATCH] locker: refactor. --- lib/bcoin/locker.js | 133 ++++++++++++++++++++++++++++++++++++------ lib/bcoin/peer.js | 31 +++++----- lib/bcoin/pool.js | 13 ++++- lib/bcoin/wallet.js | 15 ++++- lib/bcoin/walletdb.js | 63 +------------------- 5 files changed, 157 insertions(+), 98 deletions(-) diff --git a/lib/bcoin/locker.js b/lib/bcoin/locker.js index 49573ead..3605d18d 100644 --- a/lib/bcoin/locker.js +++ b/lib/bcoin/locker.js @@ -15,13 +15,13 @@ var assert = utils.assert; * Represents a mutex lock for locking asynchronous object methods. * @exports Locker * @constructor - * @param {Function} parent - Parent constructor. + * @param {Function} parent - Parent object. * @param {Function?} add - `add` method (whichever method is queuing data). */ function Locker(parent, add) { if (!(this instanceof Locker)) - return new Locker(parent, add); + return Locker.create(parent, add); EventEmitter.call(this); @@ -36,6 +36,20 @@ function Locker(parent, add) { utils.inherits(Locker, EventEmitter); +/** + * Create a closure scoped locker. + * @param {Function} parent - Parent object. + * @param {Function?} add + * @returns {Function} Lock method. + */ + +Locker.create = function create(parent, add) { + var locker = new Locker(parent, add); + return function lock(func, args, force) { + return locker.lock(func, args, force); + }; +}; + /** * Test whether the locker has a pending * object by key (usually a {@link Hash}). @@ -60,21 +74,18 @@ Locker.prototype.hasPending = function hasPending(key) { Locker.prototype.lock = function lock(func, args, force) { var self = this; - var obj, called, arg, callback; + var callback = args[args.length - 1]; + var obj, called, arg; - if (args.length > 0) { - arg = args[args.length - 1]; - if (typeof arg === 'function') - callback = arg; - } + if (typeof callback !== 'function') + throw new Error(func.name + ' requires a callback.'); if (force) { assert(this.busy); return function unlock(err, res1, res2) { - assert(!called); + assert(!called, 'Locked callback executed twice.'); called = true; - if (callback) - callback(err, res1, res2); + callback(err, res1, res2); }; } @@ -93,7 +104,7 @@ Locker.prototype.lock = function lock(func, args, force) { return function unlock(err, res1, res2) { var item, obj; - assert(!called); + assert(!called, 'Locked callback executed twice.'); called = true; self.busy = false; @@ -104,8 +115,7 @@ Locker.prototype.lock = function lock(func, args, force) { } if (self.jobs.length === 0) { - if (callback) - callback(err, res1, res2); + callback(err, res1, res2); return; } @@ -119,8 +129,7 @@ Locker.prototype.lock = function lock(func, args, force) { item[0].apply(self.parent, item[1]); - if (callback) - callback(err, res1, res2); + callback(err, res1, res2); }; }; @@ -146,8 +155,98 @@ Locker.prototype.onDrain = function onDrain(callback) { this.once('drain', callback); }; +/** + * Represents a mutex lock for locking asynchronous object methods. + * Locks methods according to passed-in key. + * @exports MappedLock + * @constructor + * @param {Function} parent - Parent object. + */ + +function MappedLock(parent) { + if (!(this instanceof MappedLock)) + return MappedLock.create(parent); + + this.parent = parent; + this.jobs = []; + this.busy = {}; +} + +/** + * Create a closure scoped locker. + * @param {Function} parent - Parent object. + * @returns {Function} Lock method. + */ + +MappedLock.create = function create(parent) { + var locker = new MappedLock(parent); + return function lock(key, func, args, force) { + return locker.lock(key, func, args, force); + }; +}; + +/** + * Lock the parent object and all its methods + * which use the locker with a specified key. + * Begin to queue calls. + * @param {String|Number} key + * @param {Function} func - The method being called. + * @param {Array} args - Arguments passed to the method. + * @param {Boolean?} force - Force a call. + * @returns {Function} Unlocker - must be + * called once the method finishes executing in order + * to resolve the queue. + */ + +MappedLock.prototype.lock = function lock(key, func, args, force) { + var self = this; + var callback = args[args.length - 1]; + var called; + + if (typeof callback !== 'function') + throw new Error(func.name + ' requires a callback.'); + + if (force || key == null) { + assert(key == null || this.busy[key]); + return function unlock(err, res1, res2) { + assert(!called, 'Locked callback executed twice.'); + called = true; + callback(err, res1, res2); + }; + } + + if (this.busy[key]) { + this.jobs.push([func, args]); + return; + } + + this.busy[key] = true; + + return function unlock(err, res1, res2) { + var item; + + assert(!called, 'Locked callback executed twice.'); + called = true; + + delete self.busy[key]; + + if (self.jobs.length === 0) { + callback(err, res1, res2); + return; + } + + item = self.jobs.shift(); + + item[0].apply(self.parent, item[1]); + + callback(err, res1, res2); + }; +}; + /* * Expose */ -module.exports = Locker; +exports = Locker; +exports.mapped = MappedLock; +module.exports = exports; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 58b7b20f..d01807d0 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -252,6 +252,15 @@ Peer.prototype._init = function init() { } }; +/** + * Invoke mutex lock. + * @private + */ + +Peer.prototype._lock = function _lock(func, args, force) { + return this.locker.lock(func, args, force); +}; + /** * Handle `connect` event (called immediately * if a socket was passed into peer). @@ -1101,8 +1110,8 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { var self = this; var coins = []; var hits = []; + var unlock = this._lock(_handleGetUTXOs, [payload, utils.nop]); - var unlock = this.locker.lock(_handleGetUTXOs, [payload]); if (!unlock) return; @@ -1184,8 +1193,8 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) { var self = this; var headers = []; + var unlock = this._lock(_handleGetHeaders, [payload, utils.nop]); - var unlock = this.locker.lock(_handleGetHeaders, [payload]); if (!unlock) return; @@ -1267,8 +1276,8 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) { Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) { var self = this; var blocks = []; + var unlock = this._lock(_handleGetBlocks, [payload, utils.nop]); - var unlock = this.locker.lock(_handleGetBlocks, [payload]); if (!unlock) return; @@ -1409,8 +1418,8 @@ Peer.prototype._handleMempool = function _handleMempool() { var self = this; var items = []; var i, hashes; + var unlock = this._lock(_handleMempool, [utils.nop]); - var unlock = this.locker.lock(_handleMempool, []); if (!unlock) return; @@ -1492,8 +1501,8 @@ Peer.prototype._getItem = function _getItem(item, callback) { Peer.prototype._handleGetData = function _handleGetData(items) { var self = this; var notFound = []; + var unlock = this._lock(_handleGetData, [items, utils.nop]); - var unlock = this.locker.lock(_handleGetData, [items]); if (!unlock) return; @@ -1920,14 +1929,6 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) { var hash = block.hash('hex'); var result; - function done(err) { - if (err) { - self.emit('error', err); - return; - } - self.fire('cmpctblock', block); - } - if (!this.pool.options.compact) { this.logger.info('Peer sent unsolicited cmpctblock (%s).', this.hostname); return; @@ -1935,14 +1936,14 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) { if (!this.mempool) { this.logger.warning('Requesting compact blocks without a mempool!'); - return done(); + return; } if (this.compactBlocks[hash]) { this.logger.debug( 'Peer sent us a duplicate compact block (%s).', this.hostname); - return done(); + return; } // Sort of a lock too. diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 5f037791..d9f7a857 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -294,6 +294,15 @@ Pool.prototype._init = function _init() { }); }; +/** + * Invoke mutex lock. + * @private + */ + +Pool.prototype._lock = function _lock(func, args, force) { + return this.locker.lock(func, args, force); +}; + /** * Open the pool, wait for the chain to load. * @alias Pool#open @@ -692,7 +701,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) var ret = {}; var last; - callback = this.locker.lock(_handleHeaders, [headers, peer, callback]); + callback = this._lock(_handleHeaders, [headers, peer, callback]); if (!callback) return; @@ -841,7 +850,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) { var self = this; - callback = this.locker.lock(_handleInv, [hashes, peer, callback]); + callback = this._lock(_handleInv, [hashes, peer, callback]); if (!callback) return; diff --git a/lib/bcoin/wallet.js b/lib/bcoin/wallet.js index 76fae68c..28d7c165 100644 --- a/lib/bcoin/wallet.js +++ b/lib/bcoin/wallet.js @@ -2883,6 +2883,15 @@ MasterKey.fromOptions = function fromOptions(options) { return new MasterKey().fromOptions(options); }; +/** + * Invoke mutex lock. + * @private + */ + +MasterKey.prototype._lock = function _lock(func, args, force) { + return this.locker.lock(func, args, force); +}; + /** * Decrypt the key and set a timeout to destroy decrypted data. * @param {Buffer|String} passphrase - Zero this yourself. @@ -2893,7 +2902,7 @@ MasterKey.fromOptions = function fromOptions(options) { MasterKey.prototype.unlock = function unlock(passphrase, timeout, callback) { var self = this; - callback = this.locker.lock(unlock, [passphrase, timeout, callback]); + callback = this._lock(unlock, [passphrase, timeout, callback]); if (!callback) return; @@ -2984,7 +2993,7 @@ MasterKey.prototype.destroy = function destroy() { MasterKey.prototype.decrypt = function decrypt(passphrase, callback) { var self = this; - callback = this.locker.lock(decrypt, [passphrase, callback]); + callback = this._lock(decrypt, [passphrase, callback]); if (!callback) return; @@ -3027,7 +3036,7 @@ MasterKey.prototype.encrypt = function encrypt(passphrase, callback) { var self = this; var data, iv; - callback = this.locker.lock(encrypt, [passphrase, callback]); + callback = this._lock(encrypt, [passphrase, callback]); if (!callback) return; diff --git a/lib/bcoin/walletdb.js b/lib/bcoin/walletdb.js index 1d06ac71..a5088af5 100644 --- a/lib/bcoin/walletdb.js +++ b/lib/bcoin/walletdb.js @@ -167,8 +167,8 @@ function WalletDB(options) { // We need one read lock for `get` and `create`. // It will hold locks specific to wallet ids. - this.readLock = new MappedLock(this); - this.writeLock = new MappedLock(this); + this.readLock = new bcoin.locker.mapped(this); + this.writeLock = new bcoin.locker.mapped(this); this.txLock = new bcoin.locker(this); this.walletCache = new bcoin.lru(10000, 1); @@ -2032,65 +2032,6 @@ WalletBlock.prototype.toJSON = function toJSON() { }; }; -function MappedLock(parent) { - if (!(this instanceof MappedLock)) - return new MappedLock(parent); - - this.parent = parent; - this.jobs = []; - this.busy = {}; -} - -MappedLock.prototype.lock = function lock(key, func, args, force) { - var self = this; - var called, arg, callback; - - if (args.length > 0) { - arg = args[args.length - 1]; - if (typeof arg === 'function') - callback = arg; - } - - if (force || key == null) { - assert(key == null || this.busy[key]); - return function unlock(err, res1, res2) { - assert(!called); - called = true; - if (callback) - callback(err, res1, res2); - }; - } - - if (this.busy[key]) { - this.jobs.push([func, args]); - return; - } - - this.busy[key] = true; - - return function unlock(err, res1, res2) { - var item; - - assert(!called); - called = true; - - delete self.busy[key]; - - if (self.jobs.length === 0) { - if (callback) - callback(err, res1, res2); - return; - } - - item = self.jobs.shift(); - - item[0].apply(self.parent, item[1]); - - if (callback) - callback(err, res1, res2); - }; -}; - /* * Expose */