Fixed reorg edge case scenario.

This commit is contained in:
Chris Kleeschulte 2017-09-13 11:02:48 -04:00
parent 6a18c1e46e
commit 7350fb3bf6
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
6 changed files with 285 additions and 229 deletions

View File

@ -28,6 +28,7 @@ var BlockService = function(options) {
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initialSync = true;
this._reorgQueue = [];
};
inherits(BlockService, BaseService);
@ -188,7 +189,13 @@ BlockService.prototype._checkTip = function(callback) {
return callback(err || new Error('All headers not found.'));
}
self._handleReorg(header.hash, headers, callback);
assert(self._reorgQueue.length === 0, 'Reorg Queue is not the expected length.');
if (self._reorgQueue.indexOf(hash) === -1) {
self._reorgQueue.push(hash);
}
self._handleReorg(headers, callback);
});
@ -217,7 +224,6 @@ BlockService.prototype.start = function(callback) {
assert(tip.height >= 0, 'tip is not initialized');
self._setTip(tip);
self._setListeners();
self._startSubscriptions();
callback();
});
@ -391,33 +397,43 @@ BlockService.prototype._getHash = function(blockArg, callback) {
};
BlockService.prototype._handleReorg = function(hash, allHeaders, callback) {
BlockService.prototype._handleReorg = function(allHeaders, callback) {
// hash is the hash of the new block that we are reorging to.
assert(hash, 'We were asked to reorg to a non-existent hash.');
var self = this;
self._reorging = true;
log.warn('Block Service: Chain reorganization detected! Our current block tip is: ' +
self._tip.hash + ' the current block: ' + hash + '.');
var reorgHash = self._reorgQueue.shift();
var reorgHeader = allHeaders.get(reorgHash);
assert(reorgHeader, 'We were asked to reorg to a non-existent hash.');
self._findCommonAncestor(hash, allHeaders, function(err, commonAncestorHash, oldBlocks) {
self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHash, oldBlocks) {
if (err) {
return callback(err);
}
var commonAncestorHeader = allHeaders.get(commonAncestorHash);
assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.');
// if we are syncing and we haven't sync'ed to the common ancestor hash, we can safely ignore this reorg
if (self._tip.height < commonAncestorHeader.height) {
return callback();
}
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash +
' at height: ' + commonAncestorHeader.height + '. Removing: ' + oldBlocks.length + ' block(s) from our indexes.');
self._processReorg(commonAncestorHeader, oldBlocks, callback);
self._processReorg(commonAncestorHeader, oldBlocks, function(err) {
if(err) {
return callback(err);
}
self._reorging = false;
callback();
});
});
@ -446,17 +462,7 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, c
});
});
self._db.batch(removalOps, function(err) {
if(err) {
return callback(err);
}
self._reorging = false;
self._p2p.clearInventoryCache();
callback();
});
self._db.batch(removalOps, callback);
};
@ -578,6 +584,7 @@ BlockService.prototype._processBlock = function(block) {
return;
}
self._syncing = false;
self._sync();
});
});
@ -603,17 +610,15 @@ BlockService.prototype._onBlock = function(block) {
return;
}
// this will prevent the block service from hammering us with blocks
// before we are ready
// this will be turned to false once all the blocks have been processed
// by the services
this._header.blockServiceSyncing = true;
// this service must receive blocks in order
var prevHash = bcoin.util.revHex(block.prevBlock);
if (this._tip.hash !== prevHash) {
return;
return log.warn('received a block that was not asked for and not linked directly to our tip.');
}
log.debug('Block Service: new block: ' + block.rhash());
block.__height = this._tip.height + 1;
this._processBlock(block);
@ -628,21 +633,18 @@ BlockService.prototype._setListeners = function() {
self._header.on('reorg', function(hash, headers) {
if (!self._reorging && !this._initialSync) {
log.debug('Block Service: detected a reorg from the header service.');
self._handleReorg(hash, headers, function(err) {
if(err) {
return callback(err);
}
self._startSync();
});
log.debug('Block Service: detected a reorg from the header service.');
if (self._reorgQueue.indexOf(hash) === -1) {
self._reorgQueue.push(hash);
}
async.whilst(function() {
return self._reorgQueue.length > 0;
}, function(next) {
self._handleReorg(headers, next);
}, self._startSync.bind(self));
});
};
@ -681,10 +683,12 @@ BlockService.prototype._sync = function() {
var self = this;
if (self.node.stopping) {
if (self.node.stopping || self._syncing) {
return;
}
self._syncing = true;
var lastHeaderIndex = self._header.getLastHeader().height;
if (self._tip.height < lastHeaderIndex) {
@ -695,7 +699,7 @@ BlockService.prototype._sync = function() {
' (' + self._syncPercentage() + '%)');
}
return self._header.getNextHash(self._tip, function(err, hash) {
return self._header.getNextHash(self._tip, function(err, targetHash, nextHash) {
if(err) {
log.error(err);
@ -703,17 +707,31 @@ BlockService.prototype._sync = function() {
return;
}
self._header.lastBlockQueried = self._tip.hash;
self._p2p.getBlocks({ startHash: self._tip.hash, endHash: hash });
self._header.lastBlockQueried = targetHash;
// to ensure that we can receive blocks that were previously delivered
// this will lead to duplicate transactions being sent
self._p2p.clearInventoryCache();
self._p2p.getP2PBlock({
filter: {
startHash: self._tip.hash,
endHash: nextHash
},
blockHash: targetHash
}, self._onBlock.bind(self));
});
}
this._syncing = false;
this._header.blockServiceSyncing = false;
this._initialSync = false;
log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
this._startSubscriptions();
};
module.exports = BlockService;

View File

@ -29,6 +29,7 @@ var HeaderService = function(options) {
this._lastHeader = null;
this.blockServiceSyncing = true;
this.lastBlockQueried = null;
this._initialSync = true;
this._blockQueue = [];
};
@ -124,6 +125,56 @@ HeaderService.prototype.getBestHeight = function() {
return this._tip.height;
};
HeaderService.prototype._adjustTip = function() {
if (this._checkpoint === -1 || this._tip.height < this._checkpoint) {
this._tip.height = 0;
this._tip.hash = this.GENESIS_HASH;
} else {
this._tip.height -= this._checkpoint;
}
};
HeaderService.prototype._setGenesisBlock = function(callback) {
assert(this._tip.hash === this.GENESIS_HASH, 'Expected tip hash to be genesis hash, but it was not.');
var genesisHeader = {
hash: this.GENESIS_HASH,
height: 0,
chainwork: HeaderService.STARTING_CHAINWORK,
version: 1,
prevHash: new Array(65).join('0'),
timestamp: 1231006505,
nonce: 2083236893,
bits: 0x1d00ffff,
merkleRoot: '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'
};
this._lastHeader = genesisHeader;
var dbOps = [
{
type: 'put',
key: this._encoding.encodeHeaderHeightKey(0),
value: this._encoding.encodeHeaderValue(genesisHeader)
},
{
type: 'put',
key: this._encoding.encodeHeaderHashKey(this.GENESIS_HASH),
value: this._encoding.encodeHeaderValue(genesisHeader)
}
];
this._db.batch(dbOps, callback);
};
HeaderService.prototype.start = function(callback) {
var self = this;
@ -140,41 +191,14 @@ HeaderService.prototype.start = function(callback) {
self._tip = tip;
self._adjustTip();
if (self._tip.height === 0) {
assert(self._tip.hash === self.GENESIS_HASH, 'Expected tip hash to be genesis hash, but it was not.');
var genesisHeader = {
hash: self.GENESIS_HASH,
height: 0,
chainwork: HeaderService.STARTING_CHAINWORK,
version: 1,
prevHash: new Array(65).join('0'),
timestamp: 1231006505,
nonce: 2083236893,
bits: 0x1d00ffff,
merkleRoot: '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'
};
self._lastHeader = genesisHeader;
var dbOps = [
{
type: 'put',
key: self._encoding.encodeHeaderHeightKey(0),
value: self._encoding.encodeHeaderValue(genesisHeader)
},
{
type: 'put',
key: self._encoding.encodeHeaderHashKey(self.GENESIS_HASH),
value: self._encoding.encodeHeaderValue(genesisHeader)
}
];
return self._db.batch(dbOps, next);
return self._setGenesisBlock(next);
}
self._getLastHeader(next);
},
], function(err) {
@ -228,103 +252,91 @@ HeaderService.prototype.getPublishEvents = function() {
};
// Called by the p2p network when a block arrives.
// If blocks arrive in rapid succession from the p2p network,
// then this handler could be in the middle of saving a header leading to an erroroneous reorg
HeaderService.prototype._queueBlock = function(_block) {
// block handler for blocks from the p2p network
HeaderService.prototype._queueBlock = function(block) {
// this block was queried by the block service and, thus, we already have it
if (block.rhash() === this.lastBlockQueried) {
return;
}
// queue the block for _processBlock to process later
// we won't start processing blocks until we have all of our headers
// so if loading headers takes a really long time, we could have a long
// list of blocks. Warning will be logged if this happens.
this._blockQueue.push(block);
};
// we need this in case there is a deluge of blocks from a peer, asynchronously
HeaderService.prototype._processBlocks = function() {
var self = this;
var block = self._blockQueue.shift();
// in all cases, we want to ensure the header is saved
self._persistHeader(_block, function(err) {
if (self._blockQueue.length > 2) {
// normally header sync is pretty quick, within a minute or two.
// under normal circumstances, we won't queue many blocks
log.warn('Header Service: Block queue has: ' + self._blockQueue.length + ' items.');
}
if (!block) {
return;
}
assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.');
clearInterval(self._blockProcessor);
self._blockProcessor = null;
self._persistHeader(block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
}
if (self.blockServiceSyncing) {
// if we are syncing, we can broadcast right away
return self._broadcast(_block);
if (!self.blockServiceSyncing) {
self._broadcast(block);
}
// queue the block for _processBlock to process later
self._blockQueue.push(_block);
// if we don't already have a timer, set one here to get things going
if (!self._blockProcessor) {
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
});
};
// under normal circumstances, the queue will only have 1-2 items in it, probably less
// we need this in case there is a deluge of blocks from a peer, asynchronously
HeaderService.prototype._processBlocks = function() {
// if we have no blocks to process, exit
var block = this._blockQueue.shift();
if (!block) {
return;
}
// if the block service is busy, then exit and wait for the next call
// we can leave this interval handler enabled because the block service should set the
// syncing flag while is it processing blocks
if (this.blockServiceSyncing) {
return;
}
this._broadcast(block);
};
HeaderService.prototype._persistHeader = function(block, callback) {
var self = this;
self._detectReorg(block, function(err, reorgStatus, historicalBlock) {
self._detectReorg(block, function(err, commonHeader) {
if (err) {
return callback(err);
}
// common case
if (historicalBlock) {
return callback();
if (!commonHeader) {
return self._syncBlock(block, callback);
}
// new block, not causing a reorg
if (!reorgStatus && !historicalBlock) {
log.warn('Header Service: Reorganization detected, current tip hash: ' +
self._tip.hash + ' new block causing the reorg: ' + block.rhash() +
' common ancestor hash: ' + commonHeader.hash);
return self._syncBlock(block, function(err) {
self._handleReorg(block, commonHeader, function(err) {
if (err) {
return callback(err);
}
if (!self.blockServiceSyncing && !self._blockProcessor) {
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
callback();
});
}
// new block causing a reorg
self._handleReorg(block, self._formatHeader(block), function(err) {
if (err) {
if(err) {
return callback(err);
}
self._syncBlock(block, callback);
});
});
};
@ -454,6 +466,8 @@ HeaderService.prototype._onHeadersSave = function(err) {
return;
}
self._logProgress();
if (!self._syncComplete()) {
self._sync();
@ -461,10 +475,17 @@ HeaderService.prototype._onHeadersSave = function(err) {
}
if (self._initialSync) {
// we'll turn the block processor on right after we've sync'ed headers
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
self._startBlockSubscription();
self._setBestHeader();
self._initialSync = false;
self.emit('headers');
};
@ -538,39 +559,33 @@ HeaderService.prototype._detectReorg = function(block, callback) {
var prevHash = bcoin.util.revHex(block.prevBlock);
var nextBlock = prevHash === self._lastHeader.hash;
// this is the last block the block service asked for
if (prevHash === this.lastBlockQueried && this.blockServiceSyncing) {
return callback(null, false, true);
}
// new block, not a historical block
// common case
if (nextBlock) {
return callback(null, false, false);
return callback(null, false);
}
// could really be a reorg block
var key = this._encoding.encodeHeaderHashKey(prevHash);
this._db.get(key, function(err, val) {
this.getBlockHeader(prevHash, function(err, header) {
if (err) {
return callback(err);
}
// is this block's prevHash already referenced in the database? If so, reorg
if (val) {
return callback(null, true, false);
if (header) {
return callback(null, header);
}
callback(null, false, false);
log.warn('Block: ' + block.rhash() + 'references: ' + prevHash + ' as its previous block, yet we have not stored this block in our data set, thus ignoring this block.');
callback(null, false);
});
};
HeaderService.prototype._handleReorg = function(block, header, callback) {
HeaderService.prototype._handleReorg = function(block, commonHeader, callback) {
var self = this;
var reorgHeader = self._formatHeader(block);
self.getAllHeaders(function(err, headers) {
@ -579,15 +594,18 @@ HeaderService.prototype._handleReorg = function(block, header, callback) {
}
var hash = block.rhash();
headers.set(hash, header); // appends to the end
headers.set(hash, reorgHeader); // appends to the end
// this will ensure our own headers collection is correct
self._onReorg(block, headers, function(err) {
self._onReorg(reorgHeader, headers, commonHeader, function(err) {
if (err) {
return callback(err);
}
// emit the fact that there is a reorg even though the block
// service may not have reached this point in its sync.
// Let the block service sort that our
self.emit('reorg', hash, headers);
return callback();
});
@ -596,39 +614,29 @@ HeaderService.prototype._handleReorg = function(block, header, callback) {
};
HeaderService.prototype._onReorg = function(block, headers, callback) {
// this will be called when a new block (not historical) arrives and
// has a common ancestor with a block that we already received.
var self = this;
// remove the current last header from the db
var ops = [
{
HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, callback) {
// remove all headers with a height greater than commonHeader
var ops = [];
var startingHeight = this._tip.height;
var hash = this._tip.hash;
while(hash !== commonHeader.hash) {
var header = headers.getIndex(startingHeight--);
assert(header, 'Expected to have a header at this height, but did not. Reorg failed.');
hash = header.prevHash;
ops.push({
type: 'del',
key: self._encoding.encodeHeaderHashKey(self._lastHeader.hash)
},
{
key: this._encoding.encodeHeaderHashKey(header.hash)
});
ops.push({
type: 'del',
key: self._encoding.encodeHeaderHeightKey(self._lastHeader.height)
}
];
self._db.batch(ops, function(err) {
if (err) {
return callback(err);
}
// set the last header to the common ancestor
self._lastHeader = headers.get(bcoin.util.revHex(block.prevBlock));
assert(self._lastHeader, 'Expected our reorg block to have a header entry, but it did not.');
// add the new block
self._syncBlock(block, callback);
});
key: this._encoding.encodeHeaderHeightKey(header.height)
});
}
// setting our tip to the common ancestor
this._tip.hash = commonHeader.hash;
this._tip.height = commonHeader.height;
this._db.batch(ops, callback);
};
HeaderService.prototype._setListeners = function() {
@ -655,21 +663,29 @@ HeaderService.prototype._startSync = function() {
};
HeaderService.prototype._sync = function() {
HeaderService.prototype._logProgress = function() {
var self = this;
if (!this._initialSync) {
return;
}
var progress;
var bestHeight = Math.max(self._bestHeight, self._lastHeader.height);
var bestHeight = Math.max(this._bestHeight, this._lastHeader.height);
if (bestHeight === 0) {
progress = 0;
} else {
progress = (self._tip.height / self._bestHeight*100.00).toFixed(2);
progress = (this._tip.height/bestHeight*100.00).toFixed(2);
}
log.info('Header Service: download progress: ' + self._tip.height + '/' +
self._bestHeight + ' (' + progress + '%)');
log.info('Header Service: download progress: ' + this._tip.height + '/' +
bestHeight + ' (' + progress + '%)');
};
HeaderService.prototype._sync = function() {
var self = this;
self._p2p.getHeaders({ startHash: self._tip.hash });
@ -686,16 +702,16 @@ HeaderService.prototype._sync = function() {
HeaderService.prototype.getNextHash = function(tip, callback) {
var self = this;
var numResultsNeeded = 2;
// if the tip being passed in is the second to last block, then return 0 because there isn't a block
// after the last block
if (tip.height + 1 === self._tip.height) {
return callback(null, 0);
numResultsNeeded = 1;
}
var start = self._encoding.encodeHeaderHeightKey(tip.height + 2);
var start = self._encoding.encodeHeaderHeightKey(tip.height + 1);
var end = self._encoding.encodeHeaderHeightKey(tip.height + 3);
var result = 0;
var results = [];
var criteria = {
gte: start,
@ -711,7 +727,7 @@ HeaderService.prototype.getNextHash = function(tip, callback) {
});
stream.on('data', function(data) {
result = self._encoding.decodeHeaderValue(data.value).hash;
results.push(self._encoding.decodeHeaderValue(data.value).hash);
});
stream.on('end', function() {
@ -720,7 +736,13 @@ HeaderService.prototype.getNextHash = function(tip, callback) {
return streamErr;
}
callback(null, result);
assert(results.length === numResultsNeeded, 'GetNextHash returned incorrect number of results.');
if (!results[1]) {
results[1] = 0;
}
callback(null, results[0], results[1]);
});
@ -735,15 +757,6 @@ HeaderService.prototype._getLastHeader = function(callback) {
var self = this;
// redo all headers
if (this._checkpoint === -1) {
this._checkpoint = this._tip.height;
}
if (self._tip.height >= self._checkpoint) {
self._tip.height -= self._checkpoint;
}
var removalOps = [];
var start = self._encoding.encodeHeaderHeightKey(self._tip.height);

View File

@ -37,7 +37,7 @@ P2P.prototype.clearInventoryCache = function() {
P2P.prototype.getAPIMethods = function() {
var methods = [
['clearInventoryCache', this, this.clearInventoryCache, 0],
['getBlocks', this, this.getBlocks, 1],
['getP2PBlock', this, this.getP2PBlock, 1],
['getHeaders', this, this.getHeaders, 1],
['getMempool', this, this.getMempool, 0],
['sendTransaction', this, this.sendTransaction, 1]
@ -49,18 +49,17 @@ P2P.prototype.getNumberOfPeers = function() {
return this._pool.numberConnected;
};
P2P.prototype.getBlocks = function(filter) {
// this if our peer goes away, but comes back later, we
// can restart the last query
this._currentRequest = filter;
P2P.prototype.getP2PBlock = function(opts, callback) {
// opts is { filter: {<start and end hashes>}, blockHash: block hash we want }
this._currentRequest = opts.filter;
var peer = this._getPeer();
var blockFilter = this._setResourceFilter(filter, 'blocks');
var blockFilter = this._setResourceFilter(opts.filter, 'blocks');
this.once(opts.blockHash, callback);
peer.sendMessage(this.messages.GetBlocks(blockFilter));
};
P2P.prototype.getHeaders = function(filter) {
@ -263,6 +262,7 @@ P2P.prototype._initPubSub = function() {
};
P2P.prototype._onPeerBlock = function(peer, message) {
this.emit(message.block.rhash(), message.block);
this._broadcast(this.subscriptions.block, 'p2p/block', message.block);
};

16
package-lock.json generated
View File

@ -4229,14 +4229,6 @@
"resolved": "https://registry.npmjs.org/statuses/-/statuses-1.3.1.tgz",
"integrity": "sha1-+vUbnrdKrvOzrPStX2Gr8ky3uT4="
},
"string_decoder": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz",
"integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==",
"requires": {
"safe-buffer": "5.1.1"
}
},
"string-length": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/string-length/-/string-length-1.0.1.tgz",
@ -4256,6 +4248,14 @@
"strip-ansi": "3.0.1"
}
},
"string_decoder": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz",
"integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==",
"requires": {
"safe-buffer": "5.1.1"
}
},
"stringstream": {
"version": "0.0.5",
"resolved": "https://registry.npmjs.org/stringstream/-/stringstream-0.0.5.tgz",

View File

@ -176,14 +176,12 @@ describe('Block Service', function() {
var getPrefix = sandbox.stub().callsArgWith(1, null, blockService._encoding);
var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 1, hash: 'aa' });
var setListeners = sandbox.stub(blockService, '_setListeners');
var startSub = sandbox.stub(blockService, '_startSubscriptions');
var setTip = sandbox.stub(blockService, '_setTip');
blockService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip };
blockService.start(function() {
expect(blockService._encoding).to.be.an.instanceof(Encoding);
expect(getServiceTip.calledOnce).to.be.true;
expect(getPrefix.calledOnce).to.be.true;
expect(startSub.calledOnce).to.be.true;
expect(setTip.calledOnce).to.be.true;
done();
});

View File

@ -42,6 +42,8 @@ describe('Header Service', function() {
var setListeners = sandbox.stub(headerService, '_setListeners');
var getPrefix = sandbox.stub().callsArgWith(1, null, new Buffer('ffee', 'hex'));
var getLastHeader = sandbox.stub(headerService, '_getLastHeader').callsArgWith(0, null);
var setGenesisBlock = sandbox.stub(headerService, '_setGenesisBlock').callsArgWith(0, null);
headerService.GENESIS_HASH = '00';
var openBus = sandbox.stub();
headerService.node = { openBus: openBus };
var _startHeaderSubscription = sandbox.stub(headerService, '_startHeaderSubscription');
@ -50,9 +52,10 @@ describe('Header Service', function() {
headerService.start(function() {
expect(_startHeaderSubscription.calledOnce).to.be.true;
expect(getLastHeader.calledOnce).to.be.true;
expect(setGenesisBlock.calledOnce).to.be.true;
expect(getLastHeader.calledOnce).to.be.false;
expect(setListeners.calledOnce).to.be.true;
expect(headerService._tip).to.be.deep.equal({ height: 123, hash: 'a' });
expect(headerService._tip).to.be.deep.equal({ height: 0, hash: '00' });
expect(headerService._encoding).to.be.instanceOf(Encoding);
done();
});
@ -161,4 +164,28 @@ describe('Header Service', function() {
});
describe('#_getLastHeader', function() {
it('should get the last header from which to start synchronizing more headers', function(done) {
var stream = new Emitter();
var header = Object.assign({ chainwork: '00', height: 2 }, prevHeader );
var headerBuf = headerService._encoding.encodeHeaderValue(header);
headerService._tip = { height: 2, hash: 'aa' };
headerService._db = {
createReadStream: sandbox.stub().returns(stream),
batch: sandbox.stub().callsArgWith(1, null)
};
headerService._getLastHeader(function(err) {
if(err) {
return done(err);
}
expect(headerService._tip.hash).to.equal(header.hash);
done();
});
stream.emit('data', { value: headerBuf });
stream.emit('end');
});
});
});