diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 18b5282d..b828b3a6 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -40,6 +40,8 @@ function Chain(options) { this.mempool = options.mempool; this.blockdb = options.blockdb; this.locked = false; + this.handling = false; + this.busy = false; this.pending = []; this.pendingBlocks = {}; this.pendingSize = 0; @@ -900,6 +902,7 @@ Chain.prototype.resetTime = function resetTime(ts) { Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback) { var self = this; + this.byTimeAsync(ts, function(err, entry) { if (err) return callback(err); @@ -911,6 +914,50 @@ Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback) { }); }; +Chain.prototype._onFlush = function _onFlush(callback) { + if (this.pending.length === 0) + return callback(); + + this.once('flush', callback); +}; + +Chain.prototype._onUnlock = function _onUnlock(callback) { + if (!this.handling) + return callback(); + + this.once('unlock', callback); +}; + +Chain.prototype._onBored = function _onBored(callback) { + if (!this.busy) + return callback(); + + this.once('bored', callback); +}; + +// REALLY? Did it have to be this fucking complicated? +Chain.prototype._onReady = function _onReady(internal, callback) { + var self = this; + if (internal) + return callback(); + this._onUnlock(function() { + if (self.locked) + return self._onReady(internal, callback); + self.locked = true; + self._onBored(function() { + if (self.busy) + return self._onReady(internal, callback); + self.busy = true; + callback(function unlock() { + self.busy = false; + self.locked = false; + self.emit('unlock'); + self.emit('bored'); + }); + }); + }); +}; + Chain.prototype.add = function add(initial, peer, callback) { var self = this; var host = peer ? peer.host : 'unknown'; @@ -929,7 +976,10 @@ Chain.prototype.add = function add(initial, peer, callback) { return; } + assert(!this.handling); + this.locked = true; + this.handling = true; (function next(block) { var hash = block.hash('hex'); @@ -1240,6 +1290,9 @@ Chain.prototype.add = function add(initial, peer, callback) { self.total += total; self.locked = false; + self.handling = false; + + self.emit('unlock'); // Start resolving the queue // (I love asynchronous IO). diff --git a/lib/bcoin/chaindb.js b/lib/bcoin/chaindb.js index c57bec5e..44d080ef 100644 --- a/lib/bcoin/chaindb.js +++ b/lib/bcoin/chaindb.js @@ -39,6 +39,7 @@ function ChainDB(chain, options) { this.heightLookup = {}; this.queue = {}; + this.queueSize = 0; this.cache = {}; this.bufferPool = { used: {} }; this.highest = -1; @@ -260,24 +261,26 @@ ChainDB.prototype._populate = function _populate(entry) { } }; -ChainDB.prototype.getSync = function getSync(height) { +ChainDB.prototype.getSync = function getSync(height, force) { var data, entry; if (typeof height === 'string') height = this.heightLookup[height]; + if (height < 0 || height == null) + return; + + if (!force) { + if ((height + 1) * BLOCK_SIZE > this.size) + return; + } + if (this.cache[height]) return this.cache[height]; if (this.queue[height]) return this.queue[height]; - if (height < 0 || height == null) - return; - - if ((height + 1) * BLOCK_SIZE > this.size) - return; - data = this._readSync(BLOCK_SIZE, height * BLOCK_SIZE); if (!data) @@ -303,12 +306,6 @@ ChainDB.prototype.getAsync = function getAsync(height, callback, force) { if (typeof height === 'string') height = this.heightLookup[height]; - if (this.cache[height]) - return callback(null, this.cache[height]); - - if (this.queue[height]) - return callback(null, this.queue[height]); - if (height < 0 || height == null) return callback(); @@ -317,6 +314,12 @@ ChainDB.prototype.getAsync = function getAsync(height, callback, force) { return callback(); } + if (this.cache[height]) + return callback(null, this.cache[height]); + + if (this.queue[height]) + return callback(null, this.queue[height]); + return this._readAsync(BLOCK_SIZE, height * BLOCK_SIZE, function(err, data) { var entry; @@ -382,17 +385,13 @@ ChainDB.prototype.saveAsync = function saveAsync(entry, callback) { // Populate the entry. this._populate(entry); - // Something is already writing. Cancel it - // and synchronously write the data after - // it cancels. - if (this.queue[entry.height]) { - this.queue[entry.height] = entry; - return callback(); - } + // Something is already writing. + assert(!this.queue[entry.height]); // Speed up writes by doing them asynchronously // and keeping the data to be written in memory. this.queue[entry.height] = entry; + this.queueSize++; // Write asynchronously to the db. raw = entry.toRaw(); @@ -402,18 +401,11 @@ ChainDB.prototype.saveAsync = function saveAsync(entry, callback) { if (err) return callback(err); - var item = self.queue[entry.height]; - - // Something tried to write here but couldn't. - // Synchronously write it and get it over with. - try { - if (item && item !== entry) - success = self._writeSync(item.toRaw(), offset); - } catch (e) { - err = e; - } - + assert(self.queue[entry.height]); delete self.queue[entry.height]; + self.queueSize--; + if (self.queueSize === 0) + self.emit('flush'); return callback(null, success); }); @@ -426,13 +418,14 @@ ChainDB.prototype._drop = function _drop(height) { // to handle this. if (this.queue[height]) { utils.debug('Warning: write job in progress.'); - delete this.queue[height]; + // delete this.queue[height]; } delete this.cache[height]; }; ChainDB.prototype.resetHeightSync = function resetHeightSync(height) { + var self = this; var size, count, existing; if (typeof height === 'string') @@ -450,23 +443,26 @@ ChainDB.prototype.resetHeightSync = function resetHeightSync(height) { assert(this.tip); for (i = height + 1; i < count; i++) { - existing = this.get(i); + existing = this.getSync(i); assert(existing); this._drop(i); delete this.heightLookup[existing.hash]; } - if (!bcoin.fs) - this.ramdisk.truncate(size); - else - fs.ftruncateSync(this.fd, size); - this.size = size; this.highest = height; - this.tip = this.get(height); + this.tip = this.getSync(height); assert(this.tip); this.height = this.tip.height; this.emit('tip', this.tip); + + // This will be synchronous 99% of the time. + this._onFlush(function() { + if (!bcoin.fs) + self.ramdisk.truncate(size); + else + fs.ftruncateSync(self.fd, size); + }); }; ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback) { @@ -492,7 +488,7 @@ ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback) this.size = size; this.highest = height; - this.tip = this.get(height); + this.tip = this.getSync(height); assert(this.tip); this.height = this.tip.height; this.emit('tip', this.tip); @@ -522,20 +518,28 @@ ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback) if (err) return callback(err); - if (!bcoin.fs) { - self.ramdisk.truncate(size); - return callback(); - } + self._onFlush(function() { + if (!bcoin.fs) { + self.ramdisk.truncate(size); + return callback(); + } - fs.ftruncate(self.fd, size, function(err) { - if (err) - return callback(err); + fs.ftruncate(self.fd, size, function(err) { + if (err) + return callback(err); - return callback(); + return callback(); + }); }); } }; +ChainDB.prototype._onFlush = function _onFlush(callback) { + if (this.queueSize === 0) + return callback(); + this.once('flush', callback); +}; + ChainDB.prototype.has = function has(height) { if (typeof height === 'string') height = this.heightLookup[height]; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index e07a1a46..9f31db36 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -249,7 +249,7 @@ Pool.prototype._init = function _init() { Pool.prototype.loadBlocks = function loadBlocks(peer, top, stop) { var self = this; - this._onFlush(function() { + this.chain._onFlush(function() { peer.loadBlocks(self.chain.getLocator(top), stop); }); }; @@ -257,7 +257,7 @@ Pool.prototype.loadBlocks = function loadBlocks(peer, top, stop) { Pool.prototype.loadOrphan = function loadOrphan(peer, top, orphan) { var self = this; assert(orphan); - this._onFlush(function() { + this.chain._onFlush(function() { peer.loadBlocks( self.chain.getLocator(top), self.chain.getOrphanRoot(orphan) @@ -267,7 +267,7 @@ Pool.prototype.loadOrphan = function loadOrphan(peer, top, orphan) { Pool.prototype.loadHeaders = function loadHeaders(peer, top, stop) { var self = this; - this._onFlush(function() { + this.chain._onFlush(function() { peer.loadHeaders(self.chain.getLocator(top), stop); }); }; @@ -570,7 +570,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this.emit('blocks', hashes); - this._onFlush(function() { + this.chain._onFlush(function() { for (i = 0; i < hashes.length; i++) { hash = hashes[i]; @@ -612,13 +612,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this._startTimer(); }; -Pool.prototype._onFlush = function _onFlush(callback) { - if (this.chain.pending.length === 0) - return callback(); - - this.chain.once('flush', callback); -}; - Pool.prototype._handleInv = function _handleInv(hashes, peer) { var i, hash; @@ -1491,7 +1484,7 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { } if (peer._blockQueue.length === 0) { - this._onFlush(function() { + this.chain._onFlush(function() { utils.nextTick(function() { utils.debug( 'Requesting %d/%d blocks from %s with getdata',