This commit is contained in:
Chris Kleeschulte 2017-07-13 19:15:21 -04:00
parent a488c3f083
commit 63cada9e8d
6 changed files with 265 additions and 305 deletions

View File

@ -13,7 +13,7 @@ function Logger(options) {
options = {};
}
this.formatting = _.isUndefined(options.formatting) ? Logger.DEFAULT_FORMATTING : options.formatting;
this.permitWrites = true;
this.level = options.logLevel || process.env['LOG_LEVEL'] || 0; // lower numbers means less logs
}
Logger.DEFAULT_FORMATTING = true;
@ -32,9 +32,7 @@ Logger.prototype.info = function() {
*/
Logger.prototype.error = function() {
var existingPermitWrites = this.permitWrites;
this.permitWrites = true;
this._log.apply(this, ['red', 'error'].concat(Array.prototype.slice.call(arguments)));
this.permitWrites = existingPermitWrites;
};
/**
@ -42,7 +40,29 @@ Logger.prototype.error = function() {
* #debug
*/
Logger.prototype.debug = function() {
this._log.apply(this, ['magenta', 'debug'].concat(Array.prototype.slice.call(arguments)));
if (this.level >= 1) {
this._log.apply(this, ['magenta', 'debug'].concat(Array.prototype.slice.call(arguments)));
}
};
/**
* Prints an debug2 message
* #debug2
*/
Logger.prototype.debug2 = function() {
if (this.level >= 2) {
this._log.apply(this, ['green', 'debug'].concat(Array.prototype.slice.call(arguments)));
}
};
/**
* Prints an debug3 message
* #debug3
*/
Logger.prototype.debug3 = function() {
if (this.level >= 3) {
this._log.apply(this, ['cyan', 'debug'].concat(Array.prototype.slice.call(arguments)));
}
};
/**
@ -58,9 +78,6 @@ Logger.prototype.warn = function() {
* #_log
*/
Logger.prototype._log = function(color) {
if (!this.permitWrites) {
return;
}
var args = Array.prototype.slice.call(arguments);
args = args.slice(1);
var level = args.shift();

View File

@ -89,99 +89,4 @@ Service.prototype.getRoutePrefix = function() {
return this.name;
};
Service.prototype._createConcurrencyCache = function(opts) {
this._concurrencyCache = LRU(opts || 500);
};
Service.prototype._createCache = function(opts) {
this._cache = LRU(opts || 500);
};
Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) {
var self = this;
var prev = self._concurrencyCache.get(prevKey);
if (prev && !prev.prevKey) {
if (fn) {
valueItem = fn.call(self, valueItem, prev.valueItem);
}
self._concurrencyCache.del(prevKey);
self._concurrencyCache.set(key, { valueItem: valueItem });
return [{ key: key, value: valueItem }];
}
self._concurrencyCache.set(key, { valueItem: valueItem, prevKey: prevKey });
var resolvedDeps = [];
var depKey = key;
self._concurrencyCache.rforEach(function(value, key) {
if (depKey === value.prevKey) {
valueItem = fn.call(self, value.valueItem, depKey.valueItem);
resolvedDeps.push({ key: key, value: valueItem });
depKey = key;
self._concurrencyCache.del(key);
}
});
return resolvedDeps;
};
Service.prototype._getGenesisBlock = function() {
this.genesis = {};
this.genesis.height = 0;
this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
return this.genesis;
};
Service.prototype._decodeTipData = function(tipBuf) {
assert(tipBuf && Buffer.isBuffer(tipBuf) && tipBuf.length === 36,
'tip data: ' + tipBuf.toString('hex') + ' if not in the proper format');
var hash = tipBuf.slice(0, 32).toString('hex');
var height = tipBuf.slice(32).toString('hex').readUInt32BE();
return {
hash: hash,
height: height
};
};
Service.prototype._loadTip = function(callback) {
var self = this;
self.db.get(new Buffer('00', 'hex') + self.name, function(err, tipBuf) {
if (err) {
return callback(err);
}
if (!tipBuf) {
self.tip = self._getGenesisBlock();
self._lastHeight = 0;
return callback();
}
self.tip = self._decodeTipData(tipData);
self._lastHeight = self.tip.height;
log.info('Loaded tip for: ' + self.name + ' service, hash: ' +
self.tip.hash + ' height: ' + self.tip.height);
callback();
});
};
module.exports = Service;

View File

@ -29,13 +29,13 @@ Encoding.prototype.decodeBlockValue = function(buffer) {
// ---- height --> hash, chainwork
Encoding.prototype.encodeMetaKey = function(height) {
var heigthtBuf = new Buffer(4);
var heightBuf = new Buffer(4);
heightBuf.writeUInt32BE(height);
return Buffer.concat([ this.blockPrefix, this.metaPrefix, heightBuf ]);
};
Encoding.prototype.decodeMetaKey = function(buffer) {
return heightBuf.readUInt32BE(3);
return buffer.readUInt32BE(3);
};
Encoding.prototype.encodeMetaValue = function(value) {

View File

@ -1,5 +1,6 @@
'use strict';
var async = require('async');
var BaseService = require('../../service');
var inherits = require('util').inherits;
var Encoding = require('./encoding');
@ -39,7 +40,7 @@ var BlockService = function(options) {
}); // hash -> block
// keep track of out-of-order blocks, this is a list of chains (which are lists themselves)
// e.g. [ [ block1, block5 ], [ block2, block6 ] ];
// e.g. [ [ block5, block4 ], [ block8, block7 ] ];
this._incompleteChains = [];
this._chainTips = []; // list of all chain tips, including main chain and any chains that were orphaned after a reorg
this._blockCount = 0;
@ -55,37 +56,6 @@ BlockService.MAX_CHAINWORK = new BN(1).ushln(256);
BlockService.MAX_BLOCKS = 500;
// --- public prototype functions
BlockService.prototype.start = function(callback) {
var self = this;
self._db.getPrefix(self.name, function(err, prefix) {
if(err) {
return callback(err);
}
self._db.getServiceTip('block', function(err, tip) {
if (err) {
return callback(err);
}
self.prefix = prefix;
self._encoding = new Encoding(self.prefix);
self._loadMeta();
self._setListeners();
self._onTipBlock(tip);
callback();
});
});
};
BlockService.prototype.stop = function(callback) {
this._saveMetaData();
callback();
};
BlockService.prototype.getAPIMethods = function() {
var methods = [
['getBlock', this, this.getBlock, 1],
@ -117,6 +87,39 @@ BlockService.prototype.getPublishEvents = function() {
};
BlockService.prototype.start = function(callback) {
var self = this;
self._setListeners();
async.waterfall([
function(next) {
self._db.getPrefix(self.name, next);
},
function(prefix, next) {
self._prefix = prefix;
self._encoding = new Encoding(self._prefix);
self._db.getServiceTip('block', next);
},
function(tip, next) {
self._tip = tip;
self._chainTips.push(self._tip.hash);
self._loadMeta(next);
}
], function(err) {
if(err) {
return callback(err);
}
self._startSubscriptions();
callback();
});
};
BlockService.prototype.stop = function(callback) {
callback();
};
BlockService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
@ -260,49 +263,12 @@ BlockService.prototype._getBlockOperations = function(block) {
};
BlockService.prototype._getBlocksFromHashes = function(hashes) {
var self = this;
var blocks = hashes.map(function(hash) {
var block = self._blockQueue.get(hash);
if (!block) {
log.error('block: ' + hash + ' was not found in our in-memory block cache.');
this.node.stop();
return;
}
return block;
});
return blocks;
};
BlockService.prototype._getChainwork = function(tipHash) {
var block = this._blockQueue.get(tipHash);
assert(block, 'expected to find a block in block queue for hash: ' + tipHash + ', but did not find it.');
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prevChainwork;
// if the previous block is the genesis block, then the chain work is the same for all know chains
if (prevHash === this.GENESIS_HASH) {
prevChainwork = new BN(new Buffer('0000000000000000000000000000000000000000000000000000000100010001', 'hex'));
} else {
var prevBlock = this._blockQueue.get(prevHash);
assert(prevBlock, 'expected to find a previous block in block queue for hash: ' +
prevHash + ', but did not find any.');
// whatevs the latest chainwork in meta, this is the cumulative chainwork
var lastChainwork = this._meta[this._meta.length - 1].chainwork;
prevChainwork = new BN(new Buffer(lastChainwork, 'hex'));
}
var lastChainwork = this._meta[this._meta.length - 1].chainwork;
var prevChainwork = new BN(new Buffer(lastChainwork, 'hex'));
return this._computeChainwork(block.header.bits, prevChainwork);
};
@ -318,10 +284,23 @@ BlockService.prototype._getDelta = function(tip) {
blocks.push(blk);
}
blocks.reverse();
return blocks;
};
BlockService.prototype._getIncompleteChainIndexes = function(block) {
var ret = [];
for(var i = 0; i < this._incompleteChains.length; i++) {
var chain = this._incompleteChains[i];
var lastEntry = chain[chain.length - 1];
if (utils.reverseBufferToString(lastEntry.header.prevHash) === block.hash) {
ret.push(i);
}
}
return ret;
};
BlockService.prototype._handleReorg = function(block) {
this._reorging = true;
@ -333,12 +312,12 @@ BlockService.prototype._handleReorg = function(block) {
if (!commonAncestor) {
log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' +
block.hash + ' (the forked block) could not be found. Bitcore-node must exit.');
log.permitWrites = false;
this.node.stop();
return;
}
log.warn('A common ancestor block was found to at hash: ' + commonAncestor + '.');
this._setTip(block);
this._broadcast(this.subscriptions.reorg, 'block/reorg', [block, commonAncestor]);
this._reorging = false;
@ -369,17 +348,31 @@ BlockService.prototype._isOutOfOrder = function(block) {
};
BlockService.prototype._loadMeta = function() {
this._meta = this._db.loadBlockMetaData();
if (!this._meta || this._meta.length < 1) {
this._rebuildMetaData();
}
BlockService.prototype._loadMeta = function(callback) {
var self = this;
var criteria = {
gte: self._encoding.encodeMetaKey(0),
lte: self._encoding.encodeMetaKey(0xffffffff)
};
var stream = this._db.createReadStream(criteria);
stream.on('error', self._onDbError.bind(self));
stream.on('end', function() {
if (self._meta.length < 1) {
self._meta.push({
chainwork: '0000000000000000000000000000000000000000000000000000000100010001',
hash: self.GENESIS_HASH
});
}
callback();
});
stream.on('data', function(data) {
self._meta.push(self._encoding.decodeMetaValue(data.value));
});
};
BlockService.prototype._onBestHeight = function(height) {
var self = this;
self._bestHeight = height;
this._bestHeight = height;
this._startSync();
};
BlockService.prototype._onBlock = function(block) {
@ -403,7 +396,7 @@ BlockService.prototype._onBlock = function(block) {
// 7. react to state of block
switch (blockState) {
case 'orphaned':
case 'outoforder':
// nothing to do, but wait until ancestor blocks come in
break;
case 'reorg':
@ -411,7 +404,7 @@ BlockService.prototype._onBlock = function(block) {
break;
default:
// send all unsent blocks now that we have a complete chain
this._updateMeta(block);
this._saveMetaData(block);
this._sendDelta();
break;
}
@ -424,49 +417,20 @@ BlockService.prototype._onDbError = function(err) {
};
BlockService.prototype._onTipBlock = function(tip) {
BlockService.prototype._saveMetaData = function(block) {
var item = {
chainwork: this._getChainwork(block.hash).toString(16, 64),
hash: block.hash
};
var self = this;
self._tip = tip;
self._chainTips.push(self._tip.hash);
self._startSubscriptions();
self._startSync();
};
BlockService.prototype._rebuildMetaData = function() {
// we have to go through all the blocks in the database and calculate chainwork
// TODO: implement this
this._meta = [{
hash: this.GENESIS_HASH,
chainwork: '0000000000000000000000000000000000000000000000000000000100010001'
}];
};
BlockService.prototype._reportBootStatus = function() {
var blockInfoString = utils.getBlockInfoString(this._tip.height, this._bestHeight);
log.info('Block Service tip is currently height: ' + this._tip.height + ' hash: ' +
this._tip.hash + ' P2P network best height: ' + this._bestHeight + '. Block Service is: ' +
blockInfoString);
this._meta.push(item);
// save meta position in db
this._db.put(this._encoding.encodeMetaKey(this._meta.length - 1),
this._encoding.encodeMetaValue(item));
};
BlockService.prototype._saveMetaData = function() {
try {
this._db.saveBlockMetaData(this._meta);
} catch(e) {
log.error('Block meta file failed to save, error: ' + e);
}
};
/*
Since blocks can arrive out of order from our trusted peer, we can't rely on the latest block
being the tip of the main/active chain. We should, instead, take the chain with the most work completed.
We need not concern ourselves whether or not the block is valid, we trust our peer to do this validation.
*/
BlockService.prototype._selectActiveChain = function() {
var chainTip;
@ -523,15 +487,9 @@ BlockService.prototype._setListeners = function() {
};
BlockService.prototype._setMetaData = function(block) {
this._meta.push({
chainwork: this._getChainwork(block.hash).toString(16, 64),
hash: block.hash
});
};
BlockService.prototype._setTip = function(tip) {
log.debug('Setting tip to height: ' + tip.height);
log.debug('Setting tip to hash: ' + tip.hash);
this._tip = tip;
this._db.setServiceTip('block', this._tip);
};
@ -580,44 +538,6 @@ BlockService.prototype._sync = function() {
};
BlockService.prototype._updateNormalStateChainInfo = function(block, prevHash) {
var index = this._chainTips.indexOf(prevHash);
assert(index > -1, 'Block state is normal, ' +
'yet the previous block hash is missing from the chain tips collection.');
this._chainTips.push(block.hash);
this._chainTips.splice(index, 1);
};
BlockService.prototype._updateReorgStateChainInfo = function(block) {
this._chainTips.push(block.hash);
};
BlockService.prototype._updateOutOfOrderStateChainInfo = function(block, prevHash) {
// add this block to the incomplete chains
// are we the parent of some other block in here?
// are we the child of some other block in here?
// block chains in here go newest to oldest block.
// When the oldest block's prevHash is one of the chainTips, the
// incomplete chain is now complete.
for(var i = 0; i < this._incompleteChains.length; i++) {
var chain = this._incompleteChains[i];
// does the last block in this chain have a prevHash that equal's our block's hash?
// if so, this block should be pushed on the end
if (chain.length > 0) {
if (utils.reverseBufferToString(chain[chain.length - 1].header.prevHash) === block.hash) {
chain.push(block);
}
// is our block's prevHash equal to the first block's hash?
if (chain[0].hash === prevHash) {
chain.unshift(block);
}
}
}
};
BlockService.prototype._updateChainInfo = function(block, state) {
var prevHash = utils.reverseBufferToString(block.header.prevHash);
@ -628,33 +548,134 @@ BlockService.prototype._updateChainInfo = function(block, state) {
}
// if this is a reorg state, then a new chain tip will be added to the list
if (state === 'reorg') {
this._updateReorgStateChainInfo(block);
return;
}
this._updateOutOfOrderStateChainInfo(block, prevHash);
this._updateOutOfOrderStateChainInfo(block);
};
BlockService.prototype._updateMeta = function(block) {
// should always have at least one item in here.
var self = this;
var latestMetaHash = self._meta[self._meta.length - 1].hash;
var prevHash = utils.reverseBufferToString(block.header.prevHash);
if (prevHash === latestMetaHash) {
self._setMetaData(block);
latestMetaHash = block.hash;
// check for past orphans that we can now stack on top
self._blockQueue.rforEach(function(v) {
if (latestMetaHash === utils.reverseBufferToString(v.header.prevHash)) {
self._setMetaData(v);
latestMetaHash = v.hash;
}
});
BlockService.prototype._updateNormalStateChainInfo = function(block, prevHash) {
var index = this._chainTips.indexOf(prevHash);
assert(index > -1, 'Block state is normal, ' +
'yet the previous block hash is missing from the chain tips collection.');
var incompleteChainIndexes = this._getIncompleteChainIndexes(block);
//retrieving more than one incomplete chain for a given block means there is a future reorg in the incomplete chain
if (incompleteChainIndexes.length < 1) {
this._chainTips.push(block.hash);
} else {
incompleteChainIndexes.sort().reverse();
for(var i = 0; i < incompleteChainIndexes.length; i++) {
var incompleteIndex = incompleteChainIndexes[i];
this._chainTips.push(this._incompleteChains[incompleteIndex][0].hash);
this._incompleteChains.splice(incompleteIndex, 1);
}
}
this._chainTips.splice(index, 1);
};
BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) {
/*
At this point, we know we have a block that arrived out of order
so we need to search through our existing incomplete chains to detect the following criteria:
1. one of more of the chains has us as the last block in the chain
(we are the parent to every other block in that chain)
2. one of more of the chains has us as the first block in the chain
(we are the youngest block in the chain)
3. there are no chains that reference us as prev hash and our prev hash does not exist in any chain.
(we create a new incomplete chain with us as the only enrtry)
*/
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var possibleJoins = { tip: [], genesis: [] };
var newChains = [];
var joinedChains = false;
for(var i = 0; i < this._incompleteChains.length; i++) {
// if we see that a block is the tip of one chain and the genesis of another,
// then we can join those chains together.
var chain = this._incompleteChains[i];
var firstEntry = chain[0];
var lastEntry = chain[chain.length - 1];
var chains;
// criteria 1
var lastEntryPrevHash = utils.reverseBufferToString(lastEntry.header.prevHash);
if (lastEntryPrevHash === block.hash) {
joinedChains = true;
if (possibleJoins.tip.length > 0) {
chains = utils.joinListsOnIndex(possibleJoins.tip, chain, this._incompleteChains);
newChains = utils.removeItemsByIndexList(possibleJoins.tip, newChains);
newChains = newChains.concat(chains);
} else {
// push the block on the end of chain
chain.push(block);
newChains.push(chain);
possibleJoins.genesis.push(i);
}
continue;
}
// criteria 2
if (firstEntry.hash === prevHash) {
joinedChains = true;
if (possibleJoins.genesis.length > 0) {
chains = utils.joinListsOnIndex(possibleJoins.genesis, chain, this._incompleteChains, 'reverse');
newChains = utils.removeItemsByIndexList(possibleJoins.genesis, newChains);
newChains = newChains.concat(chains);
} else {
// have we already put our block as the genesis of some other chain, if so, join the chains
// add the block as the first element on the chainf
chain.unshift(block);
newChains.push(chain);
possibleJoins.tip.push(i);
}
continue;
}
newChains.push(chain);
}
if (joinedChains) {
this._incompleteChains = newChains;
return;
}
// criteria 3
this._incompleteChains.push([block]);
};
BlockService.prototype._updateReorgStateChainInfo = function(block) {
this._chainTips.push(block.hash);
};
module.exports = BlockService;

View File

@ -10,6 +10,7 @@ var bitcore = require('bitcore-lib');
var Networks = bitcore.Networks;
var $ = bitcore.util.preconditions;
var Service = require('../../service');
var constants = require('../../constants');
function DB(options) {
@ -44,19 +45,6 @@ util.inherits(DB, Service);
DB.dependencies = [];
DB.prototype.saveBlockMetaData = function(data) {
fs.writeFileSync(this.node.datadir + '/block-meta-data.json', JSON.stringify(data));
};
DB.prototype.loadBlockMetaData = function() {
var json;
try {
json = require(this.node.datadir + '/block-meta-data.json');
} catch(e) {
}
return json;
};
DB.prototype._setDataPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
if (this.node.network === Networks.livenet) {
@ -182,7 +170,9 @@ DB.prototype.put = function(key, value, options, callback) {
});
} else {
setImmediate(cb);
if (typeof cb === 'function') {
cb();
}
}
};
@ -275,11 +265,9 @@ DB.prototype.getServiceTip = function(serviceName, callback) {
var tip;
if (tipBuf) {
var heightBuf = new Buffer(4);
tip = {
height: heightBuf.readUInt32BE(),
hash: tipBuf.slice(4)
height: tipBuf.readUInt32BE(0,4),
hash: tipBuf.slice(4).toString('hex')
};
} else {
@ -302,6 +290,7 @@ DB.prototype.setServiceTip = function(serviceName, tip) {
var keyBuf = Buffer.concat([ self.dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]);
var heightBuf = new Buffer(4);
heightBuf.writeUInt32BE(tip.height);
var valBuf = Buffer.concat([ heightBuf, new Buffer(tip.hash, 'hex') ]);
self.put(keyBuf, valBuf, function(err) {
if (err) {

View File

@ -188,5 +188,33 @@ utils.getBlockInfoString = function(tip, best) {
}
};
utils.joinListsOnIndex = function(indexList, list1, list2, direction) {
var newChains = [];
indexList.forEach(function(index) {
var otherList = list2[index];
if (direction && direction === 'reverse') {
newChains.push(otherList.concat(list1));
} else {
newChains.push(list1.concat(otherList));
}
});
return newChains;
};
utils.removeItemsByIndexList = function(indexList, list) {
var ret = [];
for(var i = 0; i < list.length; i++) {
var item = list[i];
if (indexList.indexOf(i) === -1) {
ret.push(item);
}
}
return ret;
};
module.exports = utils;