This commit is contained in:
Chris Kleeschulte 2017-07-07 14:30:38 -04:00
parent ee9a3682b4
commit c837456761
3 changed files with 159 additions and 316 deletions

View File

@ -21,22 +21,21 @@ var BlockService = function(options) {
this._subscriptions = {};
this._subscriptions.block = [];
this._subscriptions.reorg = [];
this._latestHeaderHashReceived = null;
this._heightIndex = null; // tracks block height to block hash
this._blockHeaderQueue = LRU({
max: 50,
length: function(n) {
return n.length * (1 * 1024 * 1024); // 50 MB of block headers, that should be plenty to hold all the headers
}
}); // hash -> header
// meta is [{ chainwork: chainwork, hash: hash }]
this._meta = []; // properties that apply to blocks that are not already stored on the blocks, yet we still needed, i.e. chainwork, height
this._blockQueue = LRU({
max: 50,
max: 50 * (1 * 1024 * 1024), // 50 MB of blocks,
length: function(n) {
return n.length * (1 * 1024 * 1024); // 50 MB of blocks
return n.toBuffer().length;
}
}); // hash -> block
this._chainTips = []; // list of all chain tips, including main chain and any chains that were orphaned after a reorg
this._maxLookback = 100;
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
};
inherits(BlockService, BaseService);
@ -44,6 +43,7 @@ inherits(BlockService, BaseService);
BlockService.dependencies = [ 'p2p', 'db' ];
BlockService.MAX_CHAINWORK = new BN(1).ushln(256);
BlockService.MAX_BLOCKS = 500;
// --- public prototype functions
BlockService.prototype.start = function(callback) {
@ -58,6 +58,7 @@ BlockService.prototype.start = function(callback) {
self.prefix = prefix;
self._encoding = new Encoding(self.prefix);
self._loadMeta();
self._setListeners();
callback();
});
@ -65,6 +66,7 @@ BlockService.prototype.start = function(callback) {
};
BlockService.prototype.stop = function(callback) {
this._saveMetaData();
callback();
};
@ -113,23 +115,9 @@ BlockService.prototype.unsubscribe = function(name, emitter) {
// --- start private prototype functions
BlockService.prototype._applyHeaderHeights = function() {
var genesis = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
this._heightIndex = new Array(this._blockHeaderQueue.length + 1);
this._heightIndex.push(genesis);
var _tip = this._latestHeaderHashReceived;
while (_tip !== genesis) {
this._heightIndex.push(_tip);
_tip = this._blockHeaderQueue.get(_tip).prevHash;
}
};
BlockService.prototype._blockAlreadyProcessed = function(block) {
return this._blockHeaderQueue.get(block.hash) ? true : false;
return this._blockQueue.get(block.hash) ? true : false;
};
@ -162,58 +150,6 @@ BlockService.prototype._cacheBlock = function(block) {
};
BlockService.prototype._cacheHeader = function(header) {
// 1. put a height on the haeder
header.height = ++this._latestBlockHeight;
// 2. save to in-memory cache first
this._blockHeaderQueue.set(header.hash, header.toObject());
// 3. save to in-memory map block index
this._heightIndex.push(header);
// 4. get operations
return this._getHeaderOperations(header);
};
BlockService.prototype._cacheHeaders = function(headers) {
var self = this;
var operations = headers.map(self._cacheHeader.bind(self));
operations = _.flatten(operations);
log.debug('Saving: ' + headers.length + ' headers to the database.');
self._db.batch(operations, function(err) {
if (!err) {
self._setHeaderTip();
}
});
self._sync('headers');
};
BlockService.prototype._checkChain = function(tip) {
var _tip = tip;
for(var i = 0; i < this._maxLookback; i++) {
if (_tip === this._tip.hash) {
return true;
}
var prevHeader = this._blockHeaderQueue.get(_tip);
if (!prevHeader) {
return false;
}
_tip = prevHeader.prevHash;
}
return false;
};
BlockService.prototype._computeChainwork = function(bits, prev) {
var target = consensus.fromCompact(bits);
@ -234,35 +170,6 @@ BlockService.prototype._computeChainwork = function(bits, prev) {
BlockService.prototype._determineBlockState = function(block) {
/*
block could be in 1 of 3 possible states.
1. normal state: block's prev hssh points to our current tip
2. reorg state: block's prev hash points to block in our chain that is not the tip.
Another way to express this is: New block's prev block ALREADY has another block pointing to it.
This leads to 2 children, for 1 parent block, which is a fork and not a chain. This is a blockchain
and not a blockfork ;) Thus, this new block should be considered the rightful child of its parent.
Remember, this isn't a validating node. Blocks that are relayed to us are considered to be the authoritative.
New blocks always trump what came before.
3. orphaned state: block's prev hash is not in our chain at all. Possible reasons are:
* blocks were delivered out of order, parent of this block is yet to come
* blocks were, again, delivered out order, but the parent will never come
In point 1, waiting longer for blocks to arrive is the best action.
In point 2, waiting won't help. The block's parent block may be in an orphaned chain and this chain may
never become the main chain. Also, your peers may nor may not give you all the parent blocks for this orphan
chain. It is best to not assign this block a height until all of its parents are linked. We should, however,
call getBlocks with startHash of our tip and end hash of the list of orphaned blocks periodically.
*/
if (this._isOrphanBlock(block)) {
return 'orphaned';
}
@ -285,17 +192,18 @@ BlockService.prototype._findCommonAncestor = function(block) {
assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.');
for(var i = 0; i < this._maxLookback; i++) {
var len = this._blockQueue.itemCount;
for(var i = 0; i < len; i++) {
var oldHdr = this._blockHeaderQueue.get(_oldTip);
var newHdr = this._blockHeaderQueue.get(_newTip);
var oldBlk = this._blockQueue.get(_oldTip);
var newBlk = this._blockQueue.get(_newTip);
if (!oldHdr || !newHdr) {
if (!oldBlk || !newBlk) {
return;
}
_oldTip = oldHdr.prevHash;
_newTip = newHdr.prevHash;
_oldTip = oldBlk.prevHash;
_newTip = newBlk.prevHash;
if (_newTip === _oldTip) {
return _newTip;
@ -353,27 +261,27 @@ BlockService.prototype._getBlocksFromHashes = function(hashes) {
BlockService.prototype._getChainwork = function(tipHash) {
var header = this._blockHeaderQueue.get(tipHash);
assert(header, 'expected to find a header in block header queue, but did not find any.');
var block = this._blockQueue.get(tipHash);
assert(block, 'expected to find a block in block queue for hash: ' + tipHash + ', but did not find it.');
var prevHeader = this._blockHeaderQueue.get(header.prevHash);
assert(header, 'expected to find a previous header in block header queue, but did not find any.');
var prevHash = utils.reverseBufferToString(block.header.prevHash);
//we persist the chainWork to avoid recalculating it on boot
var chainwork = new BN(new Buffer(header.chainwork || '00', 'hex'));
var prevChainwork;
// if the previous block is the genesis block, then the chain work is the same for all know chains
if (prevHash === this.GENESIS_HASH) {
prevChainwork = new BN(new Buffer('0000000000000000000000000000000000000000000000000000000100010001', 'hex'));
} else {
var prevBlock = this._blockQueue.get(prevHash);
if (chainwork.gtn(0)) {
return chainwork;
assert(prevBlock, 'expected to find a previous block in block queue for hash: ' +
prevHash + ', but did not find any.');
// whatevs the latest chainwork in meta, this is the cumulative chainwork
var lastChainwork = this._meta[this._meta.length - 1].chainwork;
prevChainwork = new BN(new Buffer(lastChainwork, 'hex'));
}
var prevChainwork = new BN(new Buffer(prevHeader.chainwork || '00', 'hex'));
chainwork = this._computeChainwork(header.bits, prevChainwork);
header.chainwork = chainwork.toBuffer().toString('hex');
this._blockHeaderQueue.set(tipHash, header);
return chainwork;
return this._computeChainwork(block.header.bits, prevChainwork);
};
BlockService.prototype._getDelta = function(tip) {
@ -382,9 +290,8 @@ BlockService.prototype._getDelta = function(tip) {
var _tip = tip;
while (_tip !== this._tip.hash) {
var hdr = this._blockHeaderQueue.get(_tip);
var blk = this._blockQueue.get(_tip);
_tip = hdr.prevHash;
_tip = utils.reverseBufferToString(blk.header.prevHash);
blocks.push(blk);
}
@ -392,51 +299,6 @@ BlockService.prototype._getDelta = function(tip) {
};
BlockService.prototype._getHeader = function(block) {
return {
hash: block.hash,
version: 1,
prevHash: utils.reverseBufferToString(block.header.prevHash),
merkleRoot: utils.reverseBufferToString(block.header.merkleRoot),
time: block.header.time,
height: block.__height
};
};
BlockService.prototype._getHeaderOperations = function(header) {
var self = this;
// header or [header]
if (_.isArray(header)) {
var ops = [];
_.forEach(header, function(h) {
ops.push(self._getBlockOperations(h));
});
return _.flatten(ops);
}
var operations = [];
// hash
operations.push({
type: 'put',
key: self._encoding.encodeHashKey(header.hash),
value: self._encoding.encodeHeaderValue(header)
});
// height
operations.push({
type: 'put',
key: self._encoding.encodeHeightKey(header.height),
value: self._encoding.encodeHeaderValue(header)
});
return operations;
};
BlockService.prototype._handleReorg = function(block) {
this._reorging = true;
@ -459,7 +321,6 @@ BlockService.prototype._handleReorg = function(block) {
};
BlockService.prototype._isChainReorganizing = function(block) {
var prevHash = utils.reverseBufferToString(block.header.prevHash);
@ -470,66 +331,25 @@ BlockService.prototype._isChainReorganizing = function(block) {
BlockService.prototype._isOrphanBlock = function(block) {
// if any of our ancestors are missing from the queue, then this is an orphan block
var prevHash = utils.reverseBufferToString(block.header.prevHash);
// so this should fail - "is orphan"
var pastBlockCount = Math.min(this._maxLookback, this._blockHeaderQueue.length);
for(var i = 0; i < pastBlockCount; i++) {
var prevHeader = this._blockHeaderQueue.get(prevHash);
if (!prevHeader) {
return true;
// we'll consult our metadata
var i = this._meta.length - 1;
for(; i >= 0; --i) {
// if we find this block's prev hash in our meta collection, then we know all of its ancestors are there too
if (block.hash === this._meta[i].hash) {
return false;
}
prevHash = prevHeader.prevHash;
}
return false;
return true;
};
BlockService.prototype._loadHeaders = function(cb) {
// on initial sync of headers...we don't have block heights with the headers
// we just have a list of headers that have a prevhash pointer and that's about it
// we can't be sure that headers have arrived in order, so we can pick the last header
// that came in and work backwards to the genesis block. If we don't process exactly the
// number of total blocks, then we know we did not really have the last header and we should try again
var self = this;
var start = self._encoding.encodeHashKey(new Array(65).join('0'));
var end = Buffer.concat([ utils.getTerminalKey(start.slice(0, 3)), start.slice(3) ]);
var stream = self._db.createReadStream({ gte: start, lt: end });
stream.on('end', function() {
cb();
});
stream.on('data', function(data) {
self._blockHeaderQueue.set(self._encoding.decodeHashKey(data.key), self._encoding.decodeHeaderValue(data.value));
});
stream.on('error', function(err) {
cb(err);
});
};
BlockService.prototype._loadHeights = function(cb) {
var self = this;
var start = self._encoding.encodeHeightKey(0);
var end = self._encoding.encodeHeightKey(0xffffffff);
var stream = self._db.createReadStream({ gte: start, lte: end });
stream.on('end', function() {
cb();
});
stream.on('data', function(data) {
// I think this should stream in-order
self._heightIndex.push(self._encoding.decodeHeaderValue(data.value).hash);
});
stream.on('error', function(err) {
cb(err);
});
BlockService.prototype._loadMeta = function() {
this._meta = this._db.loadBlockMetaData();
if (!this._meta || this._meta.length < 1) {
this._rebuildMetaData();
}
};
BlockService.prototype._loadTip = function() {
@ -546,7 +366,6 @@ BlockService.prototype._onBestHeight = function(height) {
self._db.once('tip-block', self._onTipBlock.bind(self));
self._loadTip();
};
BlockService.prototype._onBlock = function(block) {
@ -562,13 +381,16 @@ BlockService.prototype._onBlock = function(block) {
// 3. store the block for safe keeping
this._cacheBlock(block);
// 4. determine block state, reorg, orphaned, normal
// 4. add block to meta
this._updateMeta(block);
// 5. determine block state, reorg, orphaned, normal
var blockState = this._determineBlockState(block);
// 5. add block hash to chain tips
this._updateChainTips(block);
// 6. add block to chainTips
this._updateChainTips(block, blockState);
// 6. react to state of block
// 7. react to state of block
switch (blockState) {
case 'orphaned':
// nothing to do, but wait until ancestor blocks come in
@ -585,21 +407,12 @@ BlockService.prototype._onBlock = function(block) {
BlockService.prototype._onDbError = function(err) {
log.error('Block Service: Error: ' + err.message + ' not recovering.');
log.error('Block Service: Error: ' + err + ' not recovering.');
this.node.stop();
};
BlockService.prototype._onHeaders = function(headers) {
log.debug('New headers received, count: ' + headers.length);
this._numCompleted += headers.length;
this._cacheHeaders(headers);
this._latestHeaderHash = headers[headers.length - 1].hash;
};
// --- mark 1
BlockService.prototype._onTipBlock = function(tip) {
var self = this;
@ -611,15 +424,26 @@ BlockService.prototype._onTipBlock = function(tip) {
self._tip = {
height: 0,
hash: constants.BITCOIN_GENESIS_HASH[self.node.getNetworkName()]
hash: self.GENESIS_HASH
};
}
self._startSync('header');
self._chainTips.push(self._tip.hash);
self._startSubscriptions();
self._startSync();
};
BlockService.prototype._rebuildMetaData = function() {
// we have to go through all the blocks in the database and calculate chainwork
// TODO: implement this
this._meta = [{
hash: this.GENESIS_HASH,
chainwork: '0000000000000000000000000000000000000000000000000000000100010001'
}];
};
BlockService.prototype._reportBootStatus = function() {
var blockInfoString = utils.getBlockInfoString(this._tip.height, this._bestHeight);
@ -631,6 +455,14 @@ BlockService.prototype._reportBootStatus = function() {
};
BlockService.prototype._saveMetaData = function() {
try {
this._db.saveBlockMetaData(this._meta);
} catch(e) {
log.error('Block meta file failed to save, error: ' + e);
}
};
/*
Since blocks can arrive out of order from our trusted peer, we can't rely on the latest block
being the tip of the main/active chain. We should, instead, take the chain with the most work completed.
@ -664,16 +496,21 @@ BlockService.prototype._sendDelta = function() {
// our task is to send all blocks between active chain's tip and our tip.
var activeChainTip = this._selectActiveChain();
// it is essential that the activeChainTip be in the same chain as our current tip
assert(this._checkChain(activeChainTip), 'The chain with the greatest work does not include our current tip.');
var blocks = this._getDelta(activeChainTip);
for(var i = 0; i < blocks.length; i++) {
this._broadcast(this._subscriptions.block, 'block/block', blocks[i]);
}
this._setTip(blocks[i-1]);
var len = this._meta.length - 1;
this._setTip({ height: len, hash: this._meta[len].hash });
if (++this._blockCount >= BlockService.MAX_BLOCKS) {
this._latestBlockHash = this._tip.hash;
this._numCompleted = this._tip.height;
this._blockCount = 0;
this._sync();
}
};
@ -687,93 +524,61 @@ BlockService.prototype._setListeners = function() {
};
BlockService.prototype._setHeaderTip = function() {
var index = this._heightIndex;
var length = index.length;
this._headerTip = {
hash: index[length - 1] || constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()],
height: length || 0
};
BlockService.prototype._setMetaData = function(block) {
this._meta.push({
chainwork: this._getChainwork(block.hash).toString(16, 64),
hash: block.hash
});
};
BlockService.prototype._setTip = function(block) {
this._tip.height = block.height;
this._tip.hash = block.hash;
BlockService.prototype._setTip = function(tip) {
log.debug('Setting tip to height: ' + tip.height);
this._tip = tip;
this._db.setServiceTip('block', this._tip);
};
BlockService.prototype._startSync = function(type) {
BlockService.prototype._startSync = function() {
var currentHeight = type === 'block' ? this._tip.height : 0; //we gather all headers on each boot
var currentHeight = this._tip.height;
this._numNeeded = this._bestHeight - currentHeight;
this._numCompleted = currentHeight;
if (this._numNeeded <= 0) {
return;
}
log.info('Gathering: ' + this._numNeeded + ' ' + type + '(s) from the peer-to-peer network.');
var hash = this._tip.hash || constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
if (type === 'block') {
this._p2pBlockCallsNeeded = Math.ceil(this._numNeeded / 500);
this._latestBlockHash = hash;
this._sync('block');
} else {
this._p2pHeaderCallsNeeded = Math.ceil(this._numNeeded / 2000);
this._latestHeaderHash = hash;
this._sync('header');
}
log.info('Gathering: ' + this._numNeeded + ' ' + 'block(s) from the peer-to-peer network.');
this._p2pBlockCallsNeeded = Math.ceil(this._numNeeded / 500);
this._latestBlockHash = this._tip.hash || this.GENESIS_HASH;
this._sync();
};
BlockService.prototype._startSubscriptions = function() {
var self = this;
if (self._subscribed) {
if (this._subscribed) {
return;
}
self._subscribed = true;
self._bus = self.node.openBus({remoteAddress: 'localhost'});
self._bus.on('p2p/block', self._onBlock.bind(self));
self._bus.on('p2p/headers', self._onHeaders.bind(self));
self._bus.subscribe('p2p/block');
self._bus.subscribe('p2p/headers');
this._subscribed = true;
if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost'});
}
this._bus.on('p2p/block', this._onBlock.bind(this));
this._bus.subscribe('p2p/block');
};
BlockService.prototype._sync = function(type) {
BlockService.prototype._sync = function() {
var counter, func, obj, name;
if (type === 'block') {
counter = --this._p2pBlockCallsNeeded;
obj = { startHash: this._latestBlockHash };
func = this._p2p.getBlocks;
name = 'Blocks';
} else {
counter = --this._p2pHeaderCallsNeeded;
obj = { startHash: this._latestHeaderHash };
func = this._p2p.getHeaders;
name = 'Headers';
}
if (--this._p2pBlockCallsNeeded > 0) {
if (counter > 0) {
log.debug(name + ' download progress: ' + this._numCompleted + '/' +
log.info('Blocks download progress: ' + this._numCompleted + '/' +
this._numNeeded + ' (' + (this._numCompleted/this._numNeeded*100).toFixed(2) + '%)');
func.call(this._p2p, obj);
this._p2p.getBlocks({ startHash: this._latestBlockHash });
return;
}
if ('header') {
// we are done loading headers, proceed to number them
this._applyHeaderHeights(this._startSync.bind(this, 'block'));
}
};
BlockService.prototype._updateChainTips = function(block, state) {
@ -803,4 +608,22 @@ BlockService.prototype._updateChainTips = function(block, state) {
};
BlockService.prototype._updateMeta = function(block) {
// should always have at least one item in here.
var self = this;
var latestMetaHash = self._meta[self._meta.length - 1].hash;
var prevHash = utils.reverseBufferToString(block.header.prevHash);
if (prevHash === latestMetaHash) {
self._setMetaData(block);
latestMetaHash = block.hash;
// check for past orphans that we can now stack on top
self._blockQueue.rforEach(function(v) {
if (latestMetaHash === utils.reverseBufferToString(v.header.prevHash)) {
self._setMetaData(v);
latestMetaHash = v.hash;
}
});
}
};
module.exports = BlockService;

View File

@ -43,6 +43,19 @@ util.inherits(DB, Service);
DB.dependencies = [];
DB.prototype.saveBlockMetaData = function(data) {
fs.writeFileSync(this.node.datadir + '/block-meta-data.json', JSON.stringify(data));
};
DB.prototype.loadBlockMetaData = function() {
var json;
try {
json = require(this.node.datadir + '/block-meta-data.json');
} catch(e) {
}
return json;
};
DB.prototype._setDataPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
if (this.node.network === Networks.livenet) {
@ -274,12 +287,15 @@ DB.prototype.getServiceTip = function(serviceName) {
DB.prototype.setServiceTip = function(serviceName, tip) {
var keyBuf = Buffer.concat([ this.dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]);
var self = this;
var keyBuf = Buffer.concat([ self.dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]);
var heightBuf = new Buffer(4);
var valBuf = Buffer.concat([ heightBuf.writeUInt32BE(tip.height), new Buffer(tip.hash, 'hex') ]);
this.put(keyBuf, valBuf, function(err) {
this.emit('error', err);
heightBuf.writeUInt32BE(tip.height);
var valBuf = Buffer.concat([ heightBuf, new Buffer(tip.hash, 'hex') ]);
self.put(keyBuf, valBuf, function(err) {
if (err) {
self.emit('error', err);
}
});
};

View File

@ -29,14 +29,18 @@ describe('Block Service', function() {
blockService._blockHeaderQueue.set(genesis, { prevHash: '00' });
blockService._latestHeaderHashReceived = '100';
blockService.node = { getNetworkName: function() { return 'regtest'; } };
for(var i = 0; i < 99; i++) {
for(var i = 0; i < 100; i++) {
var prevHash = i.toString();
if (i === 0) {
prevHash = genesis;
}
blockService._blockHeaderQueue.set((i+1).toString(), { prevHash: prevHash });
}
blockService._applyHeaderHeights();
for(var j = 0; j < blockService._applyHeaderHeights.length; j++) {
expect(blockService._applyHeaderHeights[j]).to.equal(j.toString());
}
});
});