Fixed issue where new blocks were not indexed by header service.

This commit is contained in:
Chris Kleeschulte 2017-09-01 15:56:42 -04:00
parent 99d8a6f7ae
commit 3dd9aea3dd
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
3 changed files with 146 additions and 122 deletions

View File

@ -616,7 +616,6 @@ BlockService.prototype._sync = function() {
var self = this;
if (self.node.stopping) {
return;
}
@ -639,6 +638,7 @@ BlockService.prototype._sync = function() {
return;
}
self._header.lastBlockQueried = self._tip.hash;
self._p2p.getBlocks({ startHash: self._tip.hash, endHash: hash });
});

View File

@ -28,6 +28,7 @@ var HeaderService = function(options) {
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._lastHeader = null;
this.blockServiceSyncing = true;
this.lastBlockQueried = null;
this._blockQueue = [];
};
@ -198,21 +199,32 @@ HeaderService.prototype.start = function(callback) {
},
], function(err) {
if (err) {
return callback(err);
}
if (err) {
return callback(err);
}
self._setListeners();
self._bus = self.node.openBus({remoteAddress: 'localhost-header'});
self._startHeaderSubscription();
callback();
self._setListeners();
self._bus = self.node.openBus({remoteAddress: 'localhost-header'});
self._startHeaderSubscription();
callback();
});
};
HeaderService.prototype.stop = function(callback) {
if (this._headerInterval) {
clearInterval(this._headerInterval);
}
if (this._blockProcessor) {
clearInterval(this._blockProcessor);
}
callback();
};
HeaderService.prototype._startHeaderSubscription = function() {
@ -235,134 +247,121 @@ HeaderService.prototype.getPublishEvents = function() {
};
// Called by the p2p network when a block arrives.
// If blocks arrive in rapid succession from the p2p network,
// then this handler could not be finished saving the previous block info
// this should be a rare case since blocks aren't usually mined milliseconds apart
// then this handler could be in the middle of saving a header leading to an erroroneous reorg
HeaderService.prototype._queueBlock = function(_block) {
var self = this;
// mode = syncing, this header already saved
if (self.blockServiceSyncing) {
return self._broadcast(_block);
}
// mode = fully synced, live blocks arriving
if (!self._blockProcessor) {
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
// we just need to queue the block, the interval processor will take it from there
self._blockQueue.push(_block);
};
// this gets fired on a timer to process new blocks from the queue
// under normal circumstances, the queue will only have 1-2 items in it, probably less
// we need this in case there is a deluge of blocks from a peer, asynchronously
// we will also just process one block per call to keep things simple
HeaderService.prototype._processBlocks = function() {
var self = this;
// if we have no blocks to process, exit
var block = self._blockQueue.shift();
if (!block) {
return;
}
// if the block service is busy, then exit and wait for the next call
if (self.blockServiceSyncing) {
return;
}
// clear the interval because this process might span intervals
// we don't want to turn this back on until we know the block header has been persisted
clearInterval(self._blockProcessor);
// let's see if this de-queued block is in order and not reorging the chain
var prevHash = bcoin.util.revHex(block.prevBlock);
var newBlock = prevHash === self._lastHeader.hash;
// common case, blocks arriving in order and not reorging
if (newBlock) {
return self._onSyncBlock(block);
}
// if we are here, we've got special things to do. This is almost always a reorg sitch
self._detectAndHandleReorg(block);
};
HeaderService.prototype._onSyncBlock = function(block) {
var self = this;
self._syncBlock(block, function(err) {
if (err) {
log.error(err);
self.node.stop();
}
self._broadcast(block);
// at this point we know the block header has been persisted, but
// not necessarily that the services are done syncing
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
});
};
HeaderService.prototype._detectAndHandleReorg = function(block) {
var self = this;
// this is the rare case that a block comes to us out of order or is a reorg'ed block
self._detectReorg(block, function(err, reorg) {
// in all cases, we want to ensure the header is saved
self._persistHeader(_block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
}
// if not reorg, then this block is out of order and we can't do anything with it
// TODO: does this ever happen? Can this ever happen?
if (!reorg) {
return log.warn('Block: ' + block.rhash() + ' arrived out of order, skipping.');
if (self.blockServiceSyncing) {
// if we are syncing, we can broadcast right away
return self._broadcast(_block);
}
var header = block.toHeaders().toJSON();
header.timestamp = header.ts;
header.prevHash = header.prevBlock;
// queue the block for _processBlock to process later
self._blockQueue.push(_block);
self._handleReorg(block, header, function(err) {
// if we don't already have a timer, set one here to get things going
if (!self._blockProcessor) {
if (err) {
log.error(err);
return self.node.stop();
}
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
self._onSyncBlock(block);
});
}
});
};
// under normal circumstances, the queue will only have 1-2 items in it, probably less
// we need this in case there is a deluge of blocks from a peer, asynchronously
HeaderService.prototype._processBlocks = function() {
// if we have no blocks to process, exit
var block = this._blockQueue.shift();
if (!block) {
return;
}
// if the block service is busy, then exit and wait for the next call
// we can leave this interval handler enabled because the block service should set the
// syncing flag while is it processing blocks
if (this.blockServiceSyncing) {
return;
}
this._broadcast(block);
};
HeaderService.prototype._persistHeader = function(block, callback) {
var self = this;
self._detectReorg(block, function(err, reorgStatus, historicalBlock) {
if (err) {
return callback(err);
}
// common case
if (historicalBlock) {
return callback();
}
// new block, not causing a reorg
if (!reorgStatus && !historicalBlock) {
return self._syncBlock(block, function(err) {
if (err) {
return callback(err);
}
if (!self.blockServiceSyncing && !self._blockProcessor) {
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
callback();
});
}
// new block causing a reorg
self._handleReorg(block, self._formatHeader(block), function(err) {
if (err) {
return callback(err);
}
});
});
};
HeaderService.prototype._formatHeader = function(block) {
var header = block.toHeaders().toJSON();
header.timestamp = header.ts;
header.prevHash = header.prevBlock;
return header;
};
HeaderService.prototype._syncBlock = function(block, callback) {
var self = this;
var header = block.toHeaders().toJSON();
header.timestamp = header.ts;
header.prevHash = header.prevBlock;
var header = self._formatHeader(block);
log.debug('Header Service: new block: ' + block.rhash());
@ -392,9 +391,11 @@ HeaderService.prototype._onHeader = function(header) {
header.height = this._lastHeader.height + 1;
header.chainwork = this._getChainwork(header, this._lastHeader).toString(16, 64);
if (!header.timestamp) {
header.timestamp = header.time;
}
this._lastHeader = header;
return [
@ -529,11 +530,11 @@ HeaderService.prototype._syncComplete = function() {
HeaderService.prototype._setBestHeader = function() {
var bestHeader = this._lastHeader;
this._tip.height = bestHeader.height;
this._tip.hash = bestHeader.hash;
var bestHeader = this._lastHeader;
this._tip.height = bestHeader.height;
this._tip.hash = bestHeader.hash;
log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.');
log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.');
};
HeaderService.prototype._getHeader = function(height, hash, callback) {
@ -572,8 +573,22 @@ HeaderService.prototype._getHeader = function(height, hash, callback) {
HeaderService.prototype._detectReorg = function(block, callback) {
assert(block, 'Block is needed to detect reorg.');
var self = this;
var prevHash = bcoin.util.revHex(block.prevBlock);
var nextBlock = prevHash === self._lastHeader.hash;
// this is the last block the block service asked for
if (prevHash === this.lastBlockQueried) {
return callback(null, false, true);
}
// new block, not a historical block
if (nextBlock) {
return callback(null, false, false);
}
// could really be a reorg block
var key = this._encoding.encodeHeaderHashKey(bcoin.util.revHex(block.prevBlock));
this._db.get(key, function(err, val) {
@ -584,10 +599,10 @@ HeaderService.prototype._detectReorg = function(block, callback) {
// is this block's prevHash already referenced in the database? If so, reorg
if (val) {
return callback(null, true);
return callback(null, true, false);
}
callback(null, false);
callback(null, false, false);
});
@ -626,7 +641,12 @@ HeaderService.prototype._handleReorg = function(block, header, callback) {
return callback(err || new Error('Missing headers'));
}
var hash = headers.getIndex(self._originalTip.height).hash;
var header = headers.getIndex(self._originalTip.height);
assert(header, 'Atempted to get reorg header for the original tip: ' + self._originalTip.hash +
' at height: ' + self._originalTip.height + ' but could not find in the database.');
var hash = header.hash;
if (block && header) {
hash = block.rhash();
@ -674,7 +694,9 @@ HeaderService.prototype._sync = function() {
var self = this;
var progress;
if (self._bestHeight === 0) {
var bestHeight = Math.max(self._bestHeight, self._lastHeader.height);
if (bestHeight === 0) {
progress = 0;
} else {
progress = (self._tip.height / self._bestHeight*100.00).toFixed(2);
@ -739,7 +761,7 @@ HeaderService.prototype.getNextHash = function(tip, callback) {
};
HeaderService.prototype.getLastHeader = function() {
assert(this._lastHeader, 'Last headers should be populated.');
assert(this._lastHeader, 'Last header should be populated.');
return this._lastHeader;
};

View File

@ -102,6 +102,7 @@ describe('Header Service', function() {
headerService._tip = { height: 120 };
var getHeaders = sandbox.stub();
headerService._p2p = { getHeaders: getHeaders };
headerService._lastHeader = { height: 124 };
headerService._startSync();
expect(getHeaders.calledOnce).to.be.true;
});
@ -115,6 +116,7 @@ describe('Header Service', function() {
headerService._tip = { height: 121, hash: 'a' };
var getHeaders = sandbox.stub();
headerService._p2p = { getHeaders: getHeaders };
headerService._lastHeader = { height: 124 };
headerService._sync();
expect(getHeaders.calledOnce).to.be.true;
});