locker: refactor.

This commit is contained in:
Christopher Jeffrey 2016-08-17 23:01:46 -07:00
parent cdbf7c1e8c
commit 97a2f2e86a
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
5 changed files with 157 additions and 98 deletions

View File

@ -15,13 +15,13 @@ var assert = utils.assert;
* Represents a mutex lock for locking asynchronous object methods.
* @exports Locker
* @constructor
* @param {Function} parent - Parent constructor.
* @param {Function} parent - Parent object.
* @param {Function?} add - `add` method (whichever method is queuing data).
*/
function Locker(parent, add) {
if (!(this instanceof Locker))
return new Locker(parent, add);
return Locker.create(parent, add);
EventEmitter.call(this);
@ -36,6 +36,20 @@ function Locker(parent, add) {
utils.inherits(Locker, EventEmitter);
/**
* Create a closure scoped locker.
* @param {Function} parent - Parent object.
* @param {Function?} add
* @returns {Function} Lock method.
*/
Locker.create = function create(parent, add) {
var locker = new Locker(parent, add);
return function lock(func, args, force) {
return locker.lock(func, args, force);
};
};
/**
* Test whether the locker has a pending
* object by key (usually a {@link Hash}).
@ -60,21 +74,18 @@ Locker.prototype.hasPending = function hasPending(key) {
Locker.prototype.lock = function lock(func, args, force) {
var self = this;
var obj, called, arg, callback;
var callback = args[args.length - 1];
var obj, called, arg;
if (args.length > 0) {
arg = args[args.length - 1];
if (typeof arg === 'function')
callback = arg;
}
if (typeof callback !== 'function')
throw new Error(func.name + ' requires a callback.');
if (force) {
assert(this.busy);
return function unlock(err, res1, res2) {
assert(!called);
assert(!called, 'Locked callback executed twice.');
called = true;
if (callback)
callback(err, res1, res2);
callback(err, res1, res2);
};
}
@ -93,7 +104,7 @@ Locker.prototype.lock = function lock(func, args, force) {
return function unlock(err, res1, res2) {
var item, obj;
assert(!called);
assert(!called, 'Locked callback executed twice.');
called = true;
self.busy = false;
@ -104,8 +115,7 @@ Locker.prototype.lock = function lock(func, args, force) {
}
if (self.jobs.length === 0) {
if (callback)
callback(err, res1, res2);
callback(err, res1, res2);
return;
}
@ -119,8 +129,7 @@ Locker.prototype.lock = function lock(func, args, force) {
item[0].apply(self.parent, item[1]);
if (callback)
callback(err, res1, res2);
callback(err, res1, res2);
};
};
@ -146,8 +155,98 @@ Locker.prototype.onDrain = function onDrain(callback) {
this.once('drain', callback);
};
/**
* Represents a mutex lock for locking asynchronous object methods.
* Locks methods according to passed-in key.
* @exports MappedLock
* @constructor
* @param {Function} parent - Parent object.
*/
function MappedLock(parent) {
if (!(this instanceof MappedLock))
return MappedLock.create(parent);
this.parent = parent;
this.jobs = [];
this.busy = {};
}
/**
* Create a closure scoped locker.
* @param {Function} parent - Parent object.
* @returns {Function} Lock method.
*/
MappedLock.create = function create(parent) {
var locker = new MappedLock(parent);
return function lock(key, func, args, force) {
return locker.lock(key, func, args, force);
};
};
/**
* Lock the parent object and all its methods
* which use the locker with a specified key.
* Begin to queue calls.
* @param {String|Number} key
* @param {Function} func - The method being called.
* @param {Array} args - Arguments passed to the method.
* @param {Boolean?} force - Force a call.
* @returns {Function} Unlocker - must be
* called once the method finishes executing in order
* to resolve the queue.
*/
MappedLock.prototype.lock = function lock(key, func, args, force) {
var self = this;
var callback = args[args.length - 1];
var called;
if (typeof callback !== 'function')
throw new Error(func.name + ' requires a callback.');
if (force || key == null) {
assert(key == null || this.busy[key]);
return function unlock(err, res1, res2) {
assert(!called, 'Locked callback executed twice.');
called = true;
callback(err, res1, res2);
};
}
if (this.busy[key]) {
this.jobs.push([func, args]);
return;
}
this.busy[key] = true;
return function unlock(err, res1, res2) {
var item;
assert(!called, 'Locked callback executed twice.');
called = true;
delete self.busy[key];
if (self.jobs.length === 0) {
callback(err, res1, res2);
return;
}
item = self.jobs.shift();
item[0].apply(self.parent, item[1]);
callback(err, res1, res2);
};
};
/*
* Expose
*/
module.exports = Locker;
exports = Locker;
exports.mapped = MappedLock;
module.exports = exports;

View File

@ -252,6 +252,15 @@ Peer.prototype._init = function init() {
}
};
/**
* Invoke mutex lock.
* @private
*/
Peer.prototype._lock = function _lock(func, args, force) {
return this.locker.lock(func, args, force);
};
/**
* Handle `connect` event (called immediately
* if a socket was passed into peer).
@ -1101,8 +1110,8 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
var self = this;
var coins = [];
var hits = [];
var unlock = this._lock(_handleGetUTXOs, [payload, utils.nop]);
var unlock = this.locker.lock(_handleGetUTXOs, [payload]);
if (!unlock)
return;
@ -1184,8 +1193,8 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) {
var self = this;
var headers = [];
var unlock = this._lock(_handleGetHeaders, [payload, utils.nop]);
var unlock = this.locker.lock(_handleGetHeaders, [payload]);
if (!unlock)
return;
@ -1267,8 +1276,8 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) {
Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) {
var self = this;
var blocks = [];
var unlock = this._lock(_handleGetBlocks, [payload, utils.nop]);
var unlock = this.locker.lock(_handleGetBlocks, [payload]);
if (!unlock)
return;
@ -1409,8 +1418,8 @@ Peer.prototype._handleMempool = function _handleMempool() {
var self = this;
var items = [];
var i, hashes;
var unlock = this._lock(_handleMempool, [utils.nop]);
var unlock = this.locker.lock(_handleMempool, []);
if (!unlock)
return;
@ -1492,8 +1501,8 @@ Peer.prototype._getItem = function _getItem(item, callback) {
Peer.prototype._handleGetData = function _handleGetData(items) {
var self = this;
var notFound = [];
var unlock = this._lock(_handleGetData, [items, utils.nop]);
var unlock = this.locker.lock(_handleGetData, [items]);
if (!unlock)
return;
@ -1920,14 +1929,6 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) {
var hash = block.hash('hex');
var result;
function done(err) {
if (err) {
self.emit('error', err);
return;
}
self.fire('cmpctblock', block);
}
if (!this.pool.options.compact) {
this.logger.info('Peer sent unsolicited cmpctblock (%s).', this.hostname);
return;
@ -1935,14 +1936,14 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) {
if (!this.mempool) {
this.logger.warning('Requesting compact blocks without a mempool!');
return done();
return;
}
if (this.compactBlocks[hash]) {
this.logger.debug(
'Peer sent us a duplicate compact block (%s).',
this.hostname);
return done();
return;
}
// Sort of a lock too.

View File

@ -294,6 +294,15 @@ Pool.prototype._init = function _init() {
});
};
/**
* Invoke mutex lock.
* @private
*/
Pool.prototype._lock = function _lock(func, args, force) {
return this.locker.lock(func, args, force);
};
/**
* Open the pool, wait for the chain to load.
* @alias Pool#open
@ -692,7 +701,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
var ret = {};
var last;
callback = this.locker.lock(_handleHeaders, [headers, peer, callback]);
callback = this._lock(_handleHeaders, [headers, peer, callback]);
if (!callback)
return;
@ -841,7 +850,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) {
var self = this;
callback = this.locker.lock(_handleInv, [hashes, peer, callback]);
callback = this._lock(_handleInv, [hashes, peer, callback]);
if (!callback)
return;

View File

@ -2883,6 +2883,15 @@ MasterKey.fromOptions = function fromOptions(options) {
return new MasterKey().fromOptions(options);
};
/**
* Invoke mutex lock.
* @private
*/
MasterKey.prototype._lock = function _lock(func, args, force) {
return this.locker.lock(func, args, force);
};
/**
* Decrypt the key and set a timeout to destroy decrypted data.
* @param {Buffer|String} passphrase - Zero this yourself.
@ -2893,7 +2902,7 @@ MasterKey.fromOptions = function fromOptions(options) {
MasterKey.prototype.unlock = function unlock(passphrase, timeout, callback) {
var self = this;
callback = this.locker.lock(unlock, [passphrase, timeout, callback]);
callback = this._lock(unlock, [passphrase, timeout, callback]);
if (!callback)
return;
@ -2984,7 +2993,7 @@ MasterKey.prototype.destroy = function destroy() {
MasterKey.prototype.decrypt = function decrypt(passphrase, callback) {
var self = this;
callback = this.locker.lock(decrypt, [passphrase, callback]);
callback = this._lock(decrypt, [passphrase, callback]);
if (!callback)
return;
@ -3027,7 +3036,7 @@ MasterKey.prototype.encrypt = function encrypt(passphrase, callback) {
var self = this;
var data, iv;
callback = this.locker.lock(encrypt, [passphrase, callback]);
callback = this._lock(encrypt, [passphrase, callback]);
if (!callback)
return;

View File

@ -167,8 +167,8 @@ function WalletDB(options) {
// We need one read lock for `get` and `create`.
// It will hold locks specific to wallet ids.
this.readLock = new MappedLock(this);
this.writeLock = new MappedLock(this);
this.readLock = new bcoin.locker.mapped(this);
this.writeLock = new bcoin.locker.mapped(this);
this.txLock = new bcoin.locker(this);
this.walletCache = new bcoin.lru(10000, 1);
@ -2032,65 +2032,6 @@ WalletBlock.prototype.toJSON = function toJSON() {
};
};
function MappedLock(parent) {
if (!(this instanceof MappedLock))
return new MappedLock(parent);
this.parent = parent;
this.jobs = [];
this.busy = {};
}
MappedLock.prototype.lock = function lock(key, func, args, force) {
var self = this;
var called, arg, callback;
if (args.length > 0) {
arg = args[args.length - 1];
if (typeof arg === 'function')
callback = arg;
}
if (force || key == null) {
assert(key == null || this.busy[key]);
return function unlock(err, res1, res2) {
assert(!called);
called = true;
if (callback)
callback(err, res1, res2);
};
}
if (this.busy[key]) {
this.jobs.push([func, args]);
return;
}
this.busy[key] = true;
return function unlock(err, res1, res2) {
var item;
assert(!called);
called = true;
delete self.busy[key];
if (self.jobs.length === 0) {
if (callback)
callback(err, res1, res2);
return;
}
item = self.jobs.shift();
item[0].apply(self.parent, item[1]);
if (callback)
callback(err, res1, res2);
};
};
/*
* Expose
*/