util: refactor locker.

This commit is contained in:
Christopher Jeffrey 2016-12-18 20:43:47 -08:00
parent 37a3f83334
commit 3732260350
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 172 additions and 102 deletions

View File

@ -78,7 +78,6 @@ function Chain(options) {
this.logger = options.logger || Logger.global;
this.db = new ChainDB(this);
this.total = 0;
this.currentBlock = null;
this.orphanLimit = options.orphanLimit || (20 << 20);
this.locker = new Locker(true);
this.invalid = new LRU(100);
@ -109,10 +108,6 @@ util.inherits(Chain, AsyncObject);
Chain.prototype._init = function _init() {
var self = this;
this.locker.on('purge', function(total, size) {
self.logger.warning('Warning: %dmb of pending objects. Purging.', util.mb(size));
});
this.on('competitor', function(block, entry) {
self.logger.warning('Heads up: Competing chain at height %d:'
+ ' tip-height=%d competitor-height=%d'
@ -1157,7 +1152,7 @@ Chain.prototype._resetTime = co(function* resetTime(ts) {
*/
Chain.prototype.onDrain = function onDrain() {
return this.locker.onDrain();
return this.locker.wait();
};
/**
@ -1166,9 +1161,7 @@ Chain.prototype.onDrain = function onDrain() {
*/
Chain.prototype.isBusy = function isBusy() {
if (this.currentBlock)
return true;
return this.locker.pending.length > 0;
return this.locker.busy;
};
/**
@ -1178,16 +1171,11 @@ Chain.prototype.isBusy = function isBusy() {
*/
Chain.prototype.add = co(function* add(block) {
var unlock = yield this.locker.lock(block);
assert(!this.currentBlock);
this.currentBlock = block.hash('hex');
var hash = block.hash('hex');
var unlock = yield this.locker.lock(hash);
try {
return yield this._add(block);
} finally {
this.currentBlock = null;
unlock();
}
});
@ -1642,10 +1630,7 @@ Chain.prototype.has = co(function* has(hash) {
if (this.hasOrphan(hash))
return true;
if (this.hasPending(hash))
return true;
if (hash === this.currentBlock)
if (this.locker.has(hash))
return true;
return yield this.hasEntry(hash);

View File

@ -87,7 +87,6 @@ function Mempool(options) {
this.orphans = {};
this.map = {};
this.spents = {};
this.currentTX = null;
this.coinIndex = new CoinIndex(this);
this.txIndex = new TXIndex(this);
@ -588,13 +587,13 @@ Mempool.prototype.hasTX = function hasTX(hash) {
*/
Mempool.prototype.has = function has(hash) {
if (this.locker.hasPending(hash))
if (this.locker.has(hash))
return true;
if (hash === this.currentTX)
if (this.hasOrphan(hash))
return true;
return this.exists(hash);
return this.hasTX(hash);
};
/**
@ -606,6 +605,9 @@ Mempool.prototype.has = function has(hash) {
*/
Mempool.prototype.exists = function exists(hash) {
if (this.locker.hasPending(hash))
return true;
if (this.hasOrphan(hash))
return true;
@ -632,12 +634,8 @@ Mempool.prototype.hasReject = function hasReject(hash) {
*/
Mempool.prototype.addTX = co(function* addTX(tx) {
var unlock = yield this.locker.lock(tx);
assert(!this.currentTX);
this.currentTX = tx.hash('hex');
var hash = tx.hash('hex');
var unlock = yield this.locker.lock(hash);
try {
return yield this._addTX(tx);
} catch (err) {
@ -647,7 +645,6 @@ Mempool.prototype.addTX = co(function* addTX(tx) {
}
throw err;
} finally {
this.currentTX = null;
unlock();
}
});
@ -1711,8 +1708,7 @@ Mempool.prototype.indexEntry = function indexEntry(entry, view) {
Mempool.prototype.unindexEntry = function unindexEntry(entry) {
var tx = entry.tx;
var hash = tx.hash('hex');
var i, input, prevout, prev, key;
var i, input, prevout, prev;
this.txIndex.remove(tx);

View File

@ -1071,7 +1071,7 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) {
+ ' ts=%s height=%d highest=%d progress=%s'
+ ' blocks=%d orphans=%d active=%d'
+ ' queue=%d target=%s peers=%d'
+ ' pending=%d jobs=%d',
+ ' jobs=%d',
util.date(block.ts),
this.chain.height,
this.chain.bestHeight,
@ -1082,7 +1082,6 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) {
peer.queueBlock.size,
block.bits,
this.peers.size(),
this.chain.locker.pending.length,
this.chain.locker.jobs.length);
}

View File

@ -7,45 +7,41 @@
'use strict';
var EventEmitter = require('events').EventEmitter;
var util = require('../utils/util');
var assert = require('assert');
/**
* Represents a mutex lock for locking asynchronous object methods.
* @exports Locker
* @constructor
* @param {Function?} add - `add` method (whichever method is queuing data).
* @param {Boolean?} named - Whether to
* maintain a map of queued jobs by job name.
*/
function Locker(add) {
function Locker(named) {
if (!(this instanceof Locker))
return Locker.create(add);
return Locker.create(named);
EventEmitter.call(this);
this.add = add === true;
this.named = named === true;
this.jobs = [];
this.waiting = [];
this.busy = false;
this.destroyed = false;
this.pending = [];
this.pendingMap = {};
this.map = {};
this.current = null;
this.unlocker = this.unlock.bind(this);
}
util.inherits(Locker, EventEmitter);
/**
* Create a closure scoped locker.
* @param {Function?} add
* @param {Boolean?} named
* @returns {Function} Lock method.
*/
Locker.create = function create(add) {
var locker = new Locker(add);
Locker.create = function create(named) {
var locker = new Locker(named);
return function lock(arg1, arg2) {
return locker.lock(arg1, arg2);
};
@ -53,32 +49,51 @@ Locker.create = function create(add) {
/**
* Test whether the locker has a pending
* object by key (usually a {@link Hash}).
* @param {Hash|String} key
* job or a job in progress (by name).
* @param {String} name
* @returns {Boolean}
*/
Locker.prototype.hasPending = function hasPending(key) {
return this.pendingMap[key] === true;
Locker.prototype.has = function has(name) {
assert(this.named, 'Must use named jobs.');
if (this.current === name)
return true;
return this.map[name] > 0;
};
/**
* Test whether the locker has
* a pending job by name.
* @param {String} name
* @returns {Boolean}
*/
Locker.prototype.hasPending = function hasPending(name) {
assert(this.named, 'Must use named jobs.');
return this.map[name] > 0;
};
/**
* Lock the parent object and all its methods
* which use the locker. Begin to queue calls.
* @param {String?} name - Job name.
* @param {Boolean?} force - Bypass the lock.
* @returns {Promise} Returns {Function}, must be
* @returns {Promise} - Returns {Function}, must be
* called once the method finishes executing in order
* to resolve the queue.
*/
Locker.prototype.lock = function lock(arg1, arg2) {
var self = this;
var force, item;
var name, force;
if (this.add) {
item = arg1;
if (this.named) {
name = arg1 || null;
force = arg2;
} else {
name = null;
force = arg1;
}
@ -87,21 +102,22 @@ Locker.prototype.lock = function lock(arg1, arg2) {
if (force) {
assert(this.busy);
return Promise.resolve(util.nop);
return Promise.resolve(nop);
}
if (this.busy) {
if (item) {
this.pending.push(item);
this.pendingMap[item.hash('hex')] = true;
if (name) {
if (!this.map[name])
this.map[name] = 0;
this.map[name]++;
}
return new Promise(function(resolve, reject) {
self.jobs.push(new Job(resolve, reject, item));
self.jobs.push(new Job(resolve, reject, name));
});
}
this.busy = true;
this.current = name;
return Promise.resolve(this.unlocker);
};
@ -114,57 +130,103 @@ Locker.prototype.lock = function lock(arg1, arg2) {
Locker.prototype.unlock = function unlock() {
var job;
assert(this.busy);
this.busy = false;
this.current = null;
if (this.pending.length === 0)
this.emit('drain');
if (this.jobs.length === 0)
if (this.jobs.length === 0) {
this.drain();
return;
}
assert(!this.destroyed);
job = this.jobs.shift();
if (this.destroyed) {
job.reject(new Error('Locker was destroyed.'));
return;
}
if (job.item) {
assert(job.item === this.pending.shift());
delete this.pendingMap[job.item.hash('hex')];
if (job.name) {
assert(this.map[job.name] > 0);
if (--this.map[job.name] === 0)
delete this.map[job.name];
}
this.busy = true;
this.current = job.name;
job.resolve(this.unlocker);
};
/**
* Destroy the locker. Purge all pending calls.
*/
Locker.prototype.destroy = function destroy() {
this.destroyed = true;
};
/**
* Wait for a drain (empty queue).
* @returns {Promise}
*/
Locker.prototype.onDrain = function onDrain() {
Locker.prototype.wait = function wait() {
var self = this;
assert(this.add, 'Cannot wait for drain without add method.');
if (this.destroyed)
return Promise.reject(new Error('Locker is destroyed.'));
if (this.pending.length === 0)
if (!this.busy) {
assert(this.waiting.length === 0);
return Promise.resolve();
}
return new Promise(function(resolve, reject) {
self.once('drain', resolve);
self.waiting.push(new Job(resolve, reject, null));
});
};
/**
* Notify drainers that the queue has emptied.
* @private
*/
Locker.prototype.drain = function drain() {
var i, jobs, job;
if (this.waiting.length === 0)
return;
jobs = this.waiting.slice();
this.waiting.length = 0;
for (i = 0; i < jobs.length; i++) {
job = jobs[i];
job.resolve();
}
};
/**
* Destroy the locker. Purge all pending calls.
*/
Locker.prototype.destroy = function destroy() {
var err = new Error('Locker was destroyed.');
var i, jobs, job;
this.destroyed = true;
jobs = this.jobs.slice();
this.jobs.length = 0;
for (i = 0; i < jobs.length; i++) {
job = jobs[i];
job.reject(err);
}
jobs = this.waiting.slice();
this.waiting.length = 0;
for (i = 0; i < jobs.length; i++) {
job = jobs[i];
job.reject(err);
}
};
/**
* Represents a mutex lock for locking asynchronous object methods.
* Locks methods according to passed-in key.
@ -193,15 +255,23 @@ MappedLock.create = function create() {
};
};
/**
* Test whether the locker has a pending job by name.
* @param {String} name
* @returns {Boolean}
*/
MappedLock.prototype.has = function has(name) {
return this.busy[name] === true;
};
/**
* Lock the parent object and all its methods
* which use the locker with a specified key.
* Begin to queue calls.
* @param {String|Number} key
* @param {Function} func - The method being called.
* @param {Array} args - Arguments passed to the method.
* @param {Boolean?} force - Force a call.
* @returns {Function} Unlocker - must be
* @returns {Promise} - Returns {Function}, must be
* called once the method finishes executing in order
* to resolve the queue.
*/
@ -213,18 +283,18 @@ MappedLock.prototype.lock = function lock(key, force) {
return Promise.reject(new Error('Locker is destroyed.'));
if (key == null)
return Promise.resolve(util.nop);
return Promise.resolve(nop);
if (force) {
assert(this.busy[key]);
return Promise.resolve(util.nop);
return Promise.resolve(nop);
}
if (this.busy[key]) {
return new Promise(function(resolve, reject) {
if (!self.jobs[key])
self.jobs[key] = [];
self.jobs[key].push(new Job(resolve, reject));
self.jobs[key].push(new Job(resolve, reject, null));
});
}
@ -246,22 +316,20 @@ MappedLock.prototype.unlock = function unlock(key) {
var jobs = self.jobs[key];
var job;
assert(self.busy[key]);
delete self.busy[key];
if (!jobs)
return;
assert(!self.destroyed);
job = jobs.shift();
assert(job);
if (jobs.length === 0)
delete self.jobs[key];
if (self.destroyed) {
job.reject(new Error('Locker was destroyed.'));
return;
}
self.busy[key] = true;
job.resolve(unlocker);
@ -273,7 +341,23 @@ MappedLock.prototype.unlock = function unlock(key) {
*/
MappedLock.prototype.destroy = function destroy() {
var err = new Error('Locker was destroyed.');
var map = this.jobs;
var keys = Object.keys(map);
var i, j, key, jobs, job;
this.destroyed = true;
this.jobs = {};
for (i = 0; i < keys.length; i++) {
key = keys[i];
jobs = map[key];
for (j = 0; j < jobs.length; j++) {
job = jobs[j];
job.reject(err);
}
}
};
/**
@ -282,15 +366,21 @@ MappedLock.prototype.destroy = function destroy() {
* @constructor
* @param {Function} resolve
* @param {Function} reject
* @param {Object?} item
* @param {String?} name
*/
function Job(resolve, reject, item) {
function Job(resolve, reject, name) {
this.resolve = resolve;
this.reject = reject;
this.item = item || null;
this.name = name || null;
}
/*
* Helpers
*/
function nop() {}
/*
* Expose
*/