diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 20ffc810..12b60c41 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -26,7 +26,8 @@ var BlockService = function(options) { this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._initialSync = false; - this._serviceIniting = false; + this._processingBlock = false; + this._blocksInQueue = 0; this._recentBlockHashesCount = options.recentBlockHashesCount || 1000; // block service won't reorg past this point this._recentBlockHashes = new LRU(this._recentBlockHashesCount); @@ -522,17 +523,19 @@ BlockService.prototype.onHeaders = function(callback) { var self = this; - // at this point, we need to ensure the block header is in a very specific state - // 1. we should have no listeners of any kind that could cause side effects - // 2. we should have no pending events, yet to be fired, after this routine yields - + // if this service is waiting on block-related callbacks to be fired in the event loop, + // then we need to wait for the _processingBlock flag to be set to false. + // when this flag is false, we know we aren't waiting on any new blocks or historical blocks + // that we asked for, but not yet received self._initialSync = true; - self._serviceIniting = true; - self._removeAllSubscriptions(); - - // this should ensure that any handlers, yet to be fired, will fire - setImmediate(function() { + // a heavy block could take a really long time to index + async.retry({ interval: 1000, times: 100 }, function(next) { + return next(self._processingBlock); + }, function(err) { + if (err) { + return callback(err); + } self._onHeaders(callback); }); @@ -542,8 +545,6 @@ BlockService.prototype._onHeaders = function(callback) { var self = this; - self._serviceIniting = false; - // check whether or not we need build a new tip (unlikely) self._resetTip(function(err) { @@ -560,6 +561,8 @@ BlockService.prototype._onHeaders = function(callback) { }, function() { + self._removeAllSubscriptions(); + // check to see if our current tip matches what the header service has. // if this is a reorg during our initial block sync, it is especially important // to see if we've synced past where the reorg/fork took place. @@ -571,6 +574,7 @@ BlockService.prototype._onHeaders = function(callback) { // we've checked the tip, handled any reorg for ourselves, and we are ready to // sync any new blocks that might exist after our tip + self._reorging = false; self._startSync(); // once we start syncing, we can call back to the header service, so that it can @@ -832,19 +836,24 @@ BlockService.prototype._onBlock = function(block, callback) { var self = this; - if (self._reorging || self._serviceIniting) { + if (self._reorging) { + self._processingBlock = false; return callback(); } + self._processingBlock = true; + self._getBlock(block.rhash(), function(err, _block) { if(err) { + self._processingBlock = false; return self._handleError(err); } if (_block) { + self._processingBlock = false; log.debug('Block Service: not syncing, block already in database.'); - return setImmediate(callback); + return callback(); } self._processBlock(block, callback); @@ -857,6 +866,7 @@ BlockService.prototype._processBlock = function(block, callback) { var self = this; if (self.node.stopping) { + self._processingBlock = false; return callback(); } @@ -873,6 +883,7 @@ BlockService.prototype._processBlock = function(block, callback) { // we will have a chance to clear out our block queue, check our tip, // discover where to reorg to, reorg all the services that rely on // blocks and sync from there. + self._processingBlock = false; return callback(); }; @@ -895,17 +906,26 @@ BlockService.prototype._saveBlock = function(block, callback) { }, function(err, ops) { if (err) { + self._processingBlock = false; return callback(err); } self._db.batch(_.compact(_.flattenDeep(ops)), function(err) { if (err) { + self._processingBlock = false; return callback(err); } self._recentBlockHashes.set(block.rhash(), bcoin.util.revHex(block.prevBlock)); - self._setTip({ hash: block.rhash(), height: self._tip.height + 1 }, callback); + self._setTip({ hash: block.rhash(), height: block.__height }, function(err) { + if (err) { + self._processingBlock = false; + return callback(err); + } + self._processingBlock = false; + callback(); + }); }); }); @@ -924,6 +944,7 @@ BlockService.prototype._syncBlock = function(block) { clearTimeout(self._getBlocksTimer); if (self._lastBlockSaved === block.rhash()) { + self._processingBlock = false; return; } @@ -991,10 +1012,6 @@ BlockService.prototype._onSynced = function() { clearInterval(this._reportInterval); } - if (this._serviceIniting) { - return; - } - self._logProgress(); self._initialSync = false; self._startBlockSubscription(); @@ -1024,19 +1041,23 @@ BlockService.prototype._sync = function() { var self = this; - if (self.node.stopping) { + if (self.node.stopping || self._reorging) { return; } + self._processingBlock = true; + log.debug('Block Service: querying header service for next block using tip: ' + self._tip.hash); self._header.getEndHash(self._tip, self._readAheadBlockCount, function(err, targetHash, endHash) { if(err) { + self._processingBlock = false; return self._handleError(err); } if (!targetHash && !endHash) { + self._processingBlock = false; return self.emit('synced'); } @@ -1048,7 +1069,11 @@ BlockService.prototype._sync = function() { // then we must assume we've reorg'ed very shortly after // we made this call and we should re-compute where we are self._getBlocksTimer = setTimeout(function() { - self.emit('next block'); + log.debug('Block Service: block timeout, emitting for next block'); + self._processingBlock = false; + if (!self._reorging) { + self.emit('next block'); + } }, 5000); self._getBlocksTimer.unref();