From 373226035014a4e0ace26746ac44ba645cda9184 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sun, 18 Dec 2016 20:43:47 -0800 Subject: [PATCH] util: refactor locker. --- lib/blockchain/chain.js | 25 +---- lib/mempool/mempool.js | 22 ++-- lib/net/pool.js | 3 +- lib/utils/locker.js | 224 ++++++++++++++++++++++++++++------------ 4 files changed, 172 insertions(+), 102 deletions(-) diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 6c041bfd..c1f9ef5d 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -78,7 +78,6 @@ function Chain(options) { this.logger = options.logger || Logger.global; this.db = new ChainDB(this); this.total = 0; - this.currentBlock = null; this.orphanLimit = options.orphanLimit || (20 << 20); this.locker = new Locker(true); this.invalid = new LRU(100); @@ -109,10 +108,6 @@ util.inherits(Chain, AsyncObject); Chain.prototype._init = function _init() { var self = this; - this.locker.on('purge', function(total, size) { - self.logger.warning('Warning: %dmb of pending objects. Purging.', util.mb(size)); - }); - this.on('competitor', function(block, entry) { self.logger.warning('Heads up: Competing chain at height %d:' + ' tip-height=%d competitor-height=%d' @@ -1157,7 +1152,7 @@ Chain.prototype._resetTime = co(function* resetTime(ts) { */ Chain.prototype.onDrain = function onDrain() { - return this.locker.onDrain(); + return this.locker.wait(); }; /** @@ -1166,9 +1161,7 @@ Chain.prototype.onDrain = function onDrain() { */ Chain.prototype.isBusy = function isBusy() { - if (this.currentBlock) - return true; - return this.locker.pending.length > 0; + return this.locker.busy; }; /** @@ -1178,16 +1171,11 @@ Chain.prototype.isBusy = function isBusy() { */ Chain.prototype.add = co(function* add(block) { - var unlock = yield this.locker.lock(block); - - assert(!this.currentBlock); - - this.currentBlock = block.hash('hex'); - + var hash = block.hash('hex'); + var unlock = yield this.locker.lock(hash); try { return yield this._add(block); } finally { - this.currentBlock = null; unlock(); } }); @@ -1642,10 +1630,7 @@ Chain.prototype.has = co(function* has(hash) { if (this.hasOrphan(hash)) return true; - if (this.hasPending(hash)) - return true; - - if (hash === this.currentBlock) + if (this.locker.has(hash)) return true; return yield this.hasEntry(hash); diff --git a/lib/mempool/mempool.js b/lib/mempool/mempool.js index 6453879e..1d7d8d27 100644 --- a/lib/mempool/mempool.js +++ b/lib/mempool/mempool.js @@ -87,7 +87,6 @@ function Mempool(options) { this.orphans = {}; this.map = {}; this.spents = {}; - this.currentTX = null; this.coinIndex = new CoinIndex(this); this.txIndex = new TXIndex(this); @@ -588,13 +587,13 @@ Mempool.prototype.hasTX = function hasTX(hash) { */ Mempool.prototype.has = function has(hash) { - if (this.locker.hasPending(hash)) + if (this.locker.has(hash)) return true; - if (hash === this.currentTX) + if (this.hasOrphan(hash)) return true; - return this.exists(hash); + return this.hasTX(hash); }; /** @@ -606,6 +605,9 @@ Mempool.prototype.has = function has(hash) { */ Mempool.prototype.exists = function exists(hash) { + if (this.locker.hasPending(hash)) + return true; + if (this.hasOrphan(hash)) return true; @@ -632,12 +634,8 @@ Mempool.prototype.hasReject = function hasReject(hash) { */ Mempool.prototype.addTX = co(function* addTX(tx) { - var unlock = yield this.locker.lock(tx); - - assert(!this.currentTX); - - this.currentTX = tx.hash('hex'); - + var hash = tx.hash('hex'); + var unlock = yield this.locker.lock(hash); try { return yield this._addTX(tx); } catch (err) { @@ -647,7 +645,6 @@ Mempool.prototype.addTX = co(function* addTX(tx) { } throw err; } finally { - this.currentTX = null; unlock(); } }); @@ -1711,8 +1708,7 @@ Mempool.prototype.indexEntry = function indexEntry(entry, view) { Mempool.prototype.unindexEntry = function unindexEntry(entry) { var tx = entry.tx; - var hash = tx.hash('hex'); - var i, input, prevout, prev, key; + var i, input, prevout, prev; this.txIndex.remove(tx); diff --git a/lib/net/pool.js b/lib/net/pool.js index 015c41d8..97e02009 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1071,7 +1071,7 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { + ' ts=%s height=%d highest=%d progress=%s' + ' blocks=%d orphans=%d active=%d' + ' queue=%d target=%s peers=%d' - + ' pending=%d jobs=%d', + + ' jobs=%d', util.date(block.ts), this.chain.height, this.chain.bestHeight, @@ -1082,7 +1082,6 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { peer.queueBlock.size, block.bits, this.peers.size(), - this.chain.locker.pending.length, this.chain.locker.jobs.length); } diff --git a/lib/utils/locker.js b/lib/utils/locker.js index 2bc77814..72e92c46 100644 --- a/lib/utils/locker.js +++ b/lib/utils/locker.js @@ -7,45 +7,41 @@ 'use strict'; -var EventEmitter = require('events').EventEmitter; -var util = require('../utils/util'); var assert = require('assert'); /** * Represents a mutex lock for locking asynchronous object methods. * @exports Locker * @constructor - * @param {Function?} add - `add` method (whichever method is queuing data). + * @param {Boolean?} named - Whether to + * maintain a map of queued jobs by job name. */ -function Locker(add) { +function Locker(named) { if (!(this instanceof Locker)) - return Locker.create(add); + return Locker.create(named); - EventEmitter.call(this); - - this.add = add === true; + this.named = named === true; this.jobs = []; + this.waiting = []; this.busy = false; this.destroyed = false; - this.pending = []; - this.pendingMap = {}; + this.map = {}; + this.current = null; this.unlocker = this.unlock.bind(this); } -util.inherits(Locker, EventEmitter); - /** * Create a closure scoped locker. - * @param {Function?} add + * @param {Boolean?} named * @returns {Function} Lock method. */ -Locker.create = function create(add) { - var locker = new Locker(add); +Locker.create = function create(named) { + var locker = new Locker(named); return function lock(arg1, arg2) { return locker.lock(arg1, arg2); }; @@ -53,32 +49,51 @@ Locker.create = function create(add) { /** * Test whether the locker has a pending - * object by key (usually a {@link Hash}). - * @param {Hash|String} key + * job or a job in progress (by name). + * @param {String} name * @returns {Boolean} */ -Locker.prototype.hasPending = function hasPending(key) { - return this.pendingMap[key] === true; +Locker.prototype.has = function has(name) { + assert(this.named, 'Must use named jobs.'); + + if (this.current === name) + return true; + + return this.map[name] > 0; +}; + +/** + * Test whether the locker has + * a pending job by name. + * @param {String} name + * @returns {Boolean} + */ + +Locker.prototype.hasPending = function hasPending(name) { + assert(this.named, 'Must use named jobs.'); + return this.map[name] > 0; }; /** * Lock the parent object and all its methods * which use the locker. Begin to queue calls. + * @param {String?} name - Job name. * @param {Boolean?} force - Bypass the lock. - * @returns {Promise} Returns {Function}, must be + * @returns {Promise} - Returns {Function}, must be * called once the method finishes executing in order * to resolve the queue. */ Locker.prototype.lock = function lock(arg1, arg2) { var self = this; - var force, item; + var name, force; - if (this.add) { - item = arg1; + if (this.named) { + name = arg1 || null; force = arg2; } else { + name = null; force = arg1; } @@ -87,21 +102,22 @@ Locker.prototype.lock = function lock(arg1, arg2) { if (force) { assert(this.busy); - return Promise.resolve(util.nop); + return Promise.resolve(nop); } if (this.busy) { - if (item) { - this.pending.push(item); - this.pendingMap[item.hash('hex')] = true; + if (name) { + if (!this.map[name]) + this.map[name] = 0; + this.map[name]++; } - return new Promise(function(resolve, reject) { - self.jobs.push(new Job(resolve, reject, item)); + self.jobs.push(new Job(resolve, reject, name)); }); } this.busy = true; + this.current = name; return Promise.resolve(this.unlocker); }; @@ -114,57 +130,103 @@ Locker.prototype.lock = function lock(arg1, arg2) { Locker.prototype.unlock = function unlock() { var job; + assert(this.busy); + this.busy = false; + this.current = null; - if (this.pending.length === 0) - this.emit('drain'); - - if (this.jobs.length === 0) + if (this.jobs.length === 0) { + this.drain(); return; + } + + assert(!this.destroyed); job = this.jobs.shift(); - if (this.destroyed) { - job.reject(new Error('Locker was destroyed.')); - return; - } - - if (job.item) { - assert(job.item === this.pending.shift()); - delete this.pendingMap[job.item.hash('hex')]; + if (job.name) { + assert(this.map[job.name] > 0); + if (--this.map[job.name] === 0) + delete this.map[job.name]; } this.busy = true; + this.current = job.name; job.resolve(this.unlocker); }; -/** - * Destroy the locker. Purge all pending calls. - */ - -Locker.prototype.destroy = function destroy() { - this.destroyed = true; -}; - /** * Wait for a drain (empty queue). * @returns {Promise} */ -Locker.prototype.onDrain = function onDrain() { +Locker.prototype.wait = function wait() { var self = this; - assert(this.add, 'Cannot wait for drain without add method.'); + if (this.destroyed) + return Promise.reject(new Error('Locker is destroyed.')); - if (this.pending.length === 0) + if (!this.busy) { + assert(this.waiting.length === 0); return Promise.resolve(); + } return new Promise(function(resolve, reject) { - self.once('drain', resolve); + self.waiting.push(new Job(resolve, reject, null)); }); }; +/** + * Notify drainers that the queue has emptied. + * @private + */ + +Locker.prototype.drain = function drain() { + var i, jobs, job; + + if (this.waiting.length === 0) + return; + + jobs = this.waiting.slice(); + + this.waiting.length = 0; + + for (i = 0; i < jobs.length; i++) { + job = jobs[i]; + job.resolve(); + } +}; + +/** + * Destroy the locker. Purge all pending calls. + */ + +Locker.prototype.destroy = function destroy() { + var err = new Error('Locker was destroyed.'); + var i, jobs, job; + + this.destroyed = true; + + jobs = this.jobs.slice(); + + this.jobs.length = 0; + + for (i = 0; i < jobs.length; i++) { + job = jobs[i]; + job.reject(err); + } + + jobs = this.waiting.slice(); + + this.waiting.length = 0; + + for (i = 0; i < jobs.length; i++) { + job = jobs[i]; + job.reject(err); + } +}; + /** * Represents a mutex lock for locking asynchronous object methods. * Locks methods according to passed-in key. @@ -193,15 +255,23 @@ MappedLock.create = function create() { }; }; +/** + * Test whether the locker has a pending job by name. + * @param {String} name + * @returns {Boolean} + */ + +MappedLock.prototype.has = function has(name) { + return this.busy[name] === true; +}; + /** * Lock the parent object and all its methods * which use the locker with a specified key. * Begin to queue calls. * @param {String|Number} key - * @param {Function} func - The method being called. - * @param {Array} args - Arguments passed to the method. * @param {Boolean?} force - Force a call. - * @returns {Function} Unlocker - must be + * @returns {Promise} - Returns {Function}, must be * called once the method finishes executing in order * to resolve the queue. */ @@ -213,18 +283,18 @@ MappedLock.prototype.lock = function lock(key, force) { return Promise.reject(new Error('Locker is destroyed.')); if (key == null) - return Promise.resolve(util.nop); + return Promise.resolve(nop); if (force) { assert(this.busy[key]); - return Promise.resolve(util.nop); + return Promise.resolve(nop); } if (this.busy[key]) { return new Promise(function(resolve, reject) { if (!self.jobs[key]) self.jobs[key] = []; - self.jobs[key].push(new Job(resolve, reject)); + self.jobs[key].push(new Job(resolve, reject, null)); }); } @@ -246,22 +316,20 @@ MappedLock.prototype.unlock = function unlock(key) { var jobs = self.jobs[key]; var job; + assert(self.busy[key]); delete self.busy[key]; if (!jobs) return; + assert(!self.destroyed); + job = jobs.shift(); assert(job); if (jobs.length === 0) delete self.jobs[key]; - if (self.destroyed) { - job.reject(new Error('Locker was destroyed.')); - return; - } - self.busy[key] = true; job.resolve(unlocker); @@ -273,7 +341,23 @@ MappedLock.prototype.unlock = function unlock(key) { */ MappedLock.prototype.destroy = function destroy() { + var err = new Error('Locker was destroyed.'); + var map = this.jobs; + var keys = Object.keys(map); + var i, j, key, jobs, job; + this.destroyed = true; + + this.jobs = {}; + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + jobs = map[key]; + for (j = 0; j < jobs.length; j++) { + job = jobs[j]; + job.reject(err); + } + } }; /** @@ -282,15 +366,21 @@ MappedLock.prototype.destroy = function destroy() { * @constructor * @param {Function} resolve * @param {Function} reject - * @param {Object?} item + * @param {String?} name */ -function Job(resolve, reject, item) { +function Job(resolve, reject, name) { this.resolve = resolve; this.reject = reject; - this.item = item || null; + this.name = name || null; } +/* + * Helpers + */ + +function nop() {} + /* * Expose */