This commit is contained in:
Chris Kleeschulte 2017-07-20 15:55:02 -04:00
parent 6b45ef27cd
commit 759087bfc2
7 changed files with 204 additions and 129 deletions

View File

@ -13,7 +13,7 @@ function Logger(options) {
options = {};
}
this.formatting = _.isUndefined(options.formatting) ? Logger.DEFAULT_FORMATTING : options.formatting;
this.level = options.logLevel || process.env['LOG_LEVEL'] || 0; // lower numbers means less logs
this.level = options.logLevel || process.env.LOG_LEVEL || 0; // lower numbers means less logs
}
Logger.DEFAULT_FORMATTING = true;
@ -31,7 +31,6 @@ Logger.prototype.info = function() {
* #error
*/
Logger.prototype.error = function() {
var existingPermitWrites = this.permitWrites;
this._log.apply(this, ['red', 'error'].concat(Array.prototype.slice.call(arguments)));
};

View File

@ -18,11 +18,11 @@ Encoding.prototype.decodeBlockKey = function(buffer) {
};
Encoding.prototype.encodeBlockValue = function(block) {
return block.toBuffer();
return block.toRaw();
};
Encoding.prototype.decodeBlockValue = function(buffer) {
return Block.fromBuffer(buffer);
return Block.fromRaw(buffer);
};
module.exports = Encoding;

View File

@ -30,7 +30,7 @@ var BlockService = function(options) {
this._blockQueue = LRU({
max: 50 * (1 * 1024 * 1024), // 50 MB of blocks,
length: function(n) {
return n.toBuffer().length;
return n.size;
}
}); // hash -> block
@ -55,14 +55,13 @@ BlockService.prototype.getAPIMethods = function() {
var methods = [
['getBlock', this, this.getBlock, 1],
['getRawBlock', this, this.getRawBlock, 1],
['getBlockHeader', this, this.getBlockHeader, 1],
['getBlockOverview', this, this.getBlockOverview, 1],
['getBestBlockHash', this, this.getBestBlockHash, 0]
];
return methods;
};
BlockService.prototype.getBestBlockHash = function(callback) {
BlockService.prototype.getBestBlockHash = function() {
var headers = this._header.getAllHeaders();
return headers[headers.length - 1].hash;
};
@ -79,32 +78,6 @@ BlockService.prototype.getBlock = function(arg, callback) {
};
BlockService.prototype.getBlockHeader = function(blockArg, callback) {
blockArg = this._getHash(blockArg);
if (!blockArg) {
return callback();
}
var headers = this._header.getAllHeaders();
this._getBlock(blockArg, function(err, block) {
if(err) {
return callback(err);
}
if (!block) {
return callback();
}
callback(null, block.header.toJSON());
});
};
BlockService.prototype.getBlockOverview = function(hash, callback) {
this._getBlock(hash, function(err, block) {
@ -113,22 +86,25 @@ BlockService.prototype.getBlockOverview = function(hash, callback) {
return callback(err);
}
var header = block.toHeaders();
var blockOverview = {
hash: block.hash,
version: block.header.version,
version: header.version,
confirmations: null,
height: null,
chainWork: null,
prevHash: utils.reverseBufferToString(block.header.prevHash),
prevHash: header.prevBlock,
nextHash: null,
merkleRoot: block.header.merkleroot,
merkleRoot: block.merkleroot,
time: null,
medianTime: null,
nonce: block.header.nonce,
bits: block.header.bits,
nonce: block.nonce,
bits: block.bits,
difficulty: null,
txids: null
};
callback(null, blockOverview);
});
@ -180,8 +156,9 @@ BlockService.prototype.start = function(callback) {
self._db.getServiceTip('block', next);
},
function(tip, next) {
self._tip = tip;
self._setTip(tip);
self._chainTips.push(self._tip.hash);
self._primeBlockQueue(next);
}
], function(err) {
@ -197,6 +174,50 @@ BlockService.prototype.start = function(callback) {
};
BlockService.prototype._primeBlockQueue = function(callback) {
// this will load the last 50 blocks into the block queue to prime the cache
var self = this;
var hash = this._tip.hash;
async.timesSeries(50, function(index, next) {
self._db.get(self._encoding.encodeBlockKey(hash), function(err, block) {
if(err) {
return next(err);
}
if (!block) {
return next();
}
hash = block.toHeaders().prevBlock;
self._blockQueue.set(block.hash, block);
next();
});
}, callback);
};
// upon startup, has the chain reorg'ed from where we were when we shutdown?
BlockService.prototype._detectInitialChainState = function(headers) {
if (this._tip.height === 0) {
return;
}
var index = this._tip.height - 1;
var record = Array.from(headers)[index];
if (record[0] !== this._tip.hash) {
// reorg! we don't yet have the blocks to reorg to, so we'll rewind the chain back to
// to common ancestor, set the tip to the common ancestor and start the sync
this._chainTips.push(record[0]);
this._handleReorg(record[0]);
}
};
BlockService.prototype.stop = function(callback) {
if (this._deferTimeout) {
this._deferTimeout.unref();
@ -245,10 +266,10 @@ BlockService.prototype._broadcast = function(subscribers, name, entity) {
BlockService.prototype._cacheBlock = function(block) {
log.debug('Setting block: "' + block.hash + '" in the block cache.');
log.debug('Setting block: "' + block.rhash() + '" in the block cache.');
// 1. set the block queue, which holds full blocks in memory
this._blockQueue.set(block.hash, block);
this._blockQueue.set(block.rhash(), block);
// 2. store the block in the database
var operations = this._getBlockOperations(block);
@ -280,33 +301,54 @@ BlockService.prototype._determineBlockState = function(block) {
};
BlockService.prototype._findCommonAncestor = function(block) {
BlockService.prototype._findCommonAncestor = function(hash) {
assert(this._chainTips.length > 1,
'chain tips collection should have at least 2 chains in order to find a common ancestor.');
var self = this;
var headers = this._header.getAllHeaders();
var count = 0;
var _oldTip = this._tip.hash;
var _newTip = block.hash;
var _newTip = hash;
assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.');
var len = this._blockQueue.itemCount;
for(var i = 0; i < len; i++) {
async.whilst(
// test case
function() {
var oldBlk = this._blockQueue.get(_oldTip);
var newBlk = this._blockQueue.get(_newTip);
return _oldTip !== _newTip || ++count <= headers.size;
if (!oldBlk || !newBlk) {
return;
}
},
// get block
function(next) {
_oldTip = oldBlk.prevHash;
_newTip = newBlk.prevHash;
// old tip has to be in database
self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, block) {
if (_newTip === _oldTip) {
return this._blockQueue.get(_newTip);
}
}
if (err || !block) {
return next(err || new Error('missing block'));
}
_oldTip = block.toHeaders().prevBlock;
var header = headers.get(_newTip);
if (!header) {
return next(new Error('Header missing from list of headers'));
}
_newTip = header.prevHash;
next();
});
},
function() {
this.emit('common ancestor', hash, _newTip === _oldTip ? _newTip : null);
});
};
BlockService.prototype._getBlock = function(hash, callback) {
@ -335,7 +377,7 @@ BlockService.prototype._getBlockOperations = function(block) {
// block
operations.push({
type: 'put',
key: self._encoding.encodeBlockKey(block.hash),
key: self._encoding.encodeBlockKey(block.rhash()),
value: self._encoding.encodeBlockValue(block)
});
@ -350,7 +392,7 @@ BlockService.prototype._getDelta = function(tip) {
while (_tip !== this._tip.hash) {
var blk = this._blockQueue.get(_tip);
_tip = utils.reverseBufferToString(blk.header.prevHash);
_tip = blk.toHeaders().prevBlock;
blocks.push(blk);
}
@ -362,8 +404,10 @@ BlockService.prototype._getDelta = function(tip) {
BlockService.prototype._getHash = function(blockArg) {
var headers = this._header.getAllHeaders();
return (_.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))) &&
headers[blockArg] ? headers[blockArg] : null;
if (utils.isHeight(blockArg)) {
return Array.from(headers)[blockArg];
}
};
@ -371,8 +415,8 @@ BlockService.prototype._getIncompleteChainIndexes = function(block) {
var ret = [];
for(var i = 0; i < this._incompleteChains.length; i++) {
var chain = this._incompleteChains[i];
var lastEntry = chain[chain.length - 1];
if (utils.reverseBufferToString(lastEntry.header.prevHash) === block.hash) {
var lastEntry = chain[chain.length - 1].toHeaders();
if (lastEntry.prevHash === block.rhash()) {
ret.push(i);
}
}
@ -401,36 +445,50 @@ BlockService.prototype._getOldBlocks = function(currentHash, commonAncestorHash)
};
BlockService.prototype._handleReorg = function(block) {
BlockService.prototype._handleReorg = function(hash) {
this._reorging = true;
this._reorging = true; // while this is set, we won't be sending blocks
log.warn('Chain reorganization detected! Our current block tip is: ' +
this._tip.hash + ' the current block: ' + block.hash + '.');
this._tip.hash + ' the current block: ' + hash + '.');
var commonAncestor = this._findCommonAncestor(block);
var oldBlocks = this._getOldBlocks(this._tip.hash, block.hash, commonAncestor.hash);
this._once('common ancestor', this._onCommonAncestor.bind(this));
this._findCommonAncestor(hash);
};
// once we know what hash the commonAncestor is, we cna set the set the tip to it
// and gather blocks to remove
BlockService.prototype._onCommonAncestor = function(newHash, commonAncestorHeader) {
var oldBlocks = this._getOldBlocks(this._tip.hash, newHash, commonAncestorHeader);
if (!commonAncestorHeader || !oldBlocks || oldBlocks.length < 1) {
if (!commonAncestor || !oldBlocks || oldBlocks.length < 1) {
log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' +
block.hash + ' (the forked block) could not be found. Bitcore-node must exit.');
newHash + ' (the forked block) could not be found. Bitcore-node must exit.');
this.node.stop();
return;
}
log.warn('A common ancestor block was found to at hash: ' + commonAncestor.hash + '.');
// set tip to this common ancesttor
log.warn('A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + '.');
this._broadcast(this.subscriptions.reorg, 'block/reorg', [oldBlocks, [block], commonAncestor]);
this._broadcast(this.subscriptions.reorg, 'block/reorg', [oldBlocks, commonAncestorHeader]);
this._onReorg(oldBlocks, [block], commonAncestor);
this._onReorg(oldBlocks, commonAncestorHeader);
this._reorging = false;
};
BlockService.prototype._onReorg = function(oldBlockList, newBlockList, commonAncestor) {
// this JUST rewinds the chain back to the common ancestor block, nothing more
BlockService.prototype._onReorg = function(oldBlockList, commonAncestorHeader) {
// set the tip to the common ancestor in case something goes wrong with the reorg
var tipOps = utils.encodeTip({ height: commonAncestor.header.height });
this._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height });
var tipOps = utils.encodeTip(this._tip, this.name);
var removalOps = [{
type: 'put',
@ -438,30 +496,22 @@ BlockService.prototype._onReorg = function(oldBlockList, newBlockList, commonAnc
value: tipOps.value
}];
// remove all the old blocks that we reorg from
oldBlockList.forEach(function(block) {
removalOps.push({
type: 'del',
key: this.encoding.encodeBlockKey(block.header.timestamp),
key: this.encoding.encodeBlockKey(block.rhash()),
});
});
this._db.batch(removalOps);
//call onBlock for each of the new blocks
newBlockList.forEach(this._onBlock.bind(this));
// if the common ancestor block height is greater than our own, then nothing to do for the reorg
if (this._tip.height <= commonAncestor.header.height) {
return;
}
};
BlockService.prototype._isChainReorganizing = function(block) {
// if we aren't an out of order block, then is this block's prev hash our tip?
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prevHash = block.toHeaders().prevBlock;
return prevHash !== this._tip.hash;
@ -470,7 +520,7 @@ BlockService.prototype._isChainReorganizing = function(block) {
BlockService.prototype._isOutOfOrder = function(block) {
// is this block the direct child of one of our chain tips? If so, not an out of order block
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prevHash = block.toHeaders().prevBlock;
for(var i = 0; i < this._chainTips.length; i++) {
var chainTip = this._chainTips[i];
@ -484,7 +534,8 @@ BlockService.prototype._isOutOfOrder = function(block) {
};
BlockService.prototype._onAllHeaders = function(headers) {
this._bestHeight = headers.length;
this._bestHeight = headers.size;
this._detectInitialChainState(headers);
this._startSync();
};
@ -496,7 +547,7 @@ BlockService.prototype._onBlock = function(block) {
}
// 2. log the reception
log.debug2('New block received: ' + block.hash);
log.info('New block received: ' + block.rhash());
// 3. store the block for safe keeping
this._cacheBlock(block);
@ -517,8 +568,7 @@ BlockService.prototype._onBlock = function(block) {
// nothing to do, but wait until ancestor blocks come in
break;
case 'reorg':
this._handleReorg();
this.emit('reorg', block);
this._handleReorg(block.hash);
break;
default:
// send all unsent blocks now that we have a complete chain
@ -640,7 +690,7 @@ BlockService.prototype._startSubscriptions = function() {
this._subscribed = true;
if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost'});
this._bus = this.node.openBus({remoteAddress: 'localhost-block'});
}
this._bus.on('p2p/block', this._onBlock.bind(this));
@ -662,7 +712,7 @@ BlockService.prototype._sync = function() {
BlockService.prototype._updateChainInfo = function(block, state) {
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prevHash = block.toHeaders().prevBlock;
if (state === 'normal') {
this._updateNormalStateChainInfo(block, prevHash);
@ -721,7 +771,7 @@ BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) {
*/
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prevHash = block.toHeaders().prevBlock;
var possibleJoins = { tip: [], genesis: [] };
var newChains = [];
var joinedChains = false;
@ -738,7 +788,7 @@ BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) {
var chains;
// criteria 1
var lastEntryPrevHash = utils.reverseBufferToString(lastEntry.header.prevHash);
var lastEntryPrevHash = lastEntry.toHeaders().prevBlock;
if (lastEntryPrevHash === block.hash) {
joinedChains = true;

View File

@ -17,7 +17,7 @@ var HeaderService = function(options) {
this._tip = null;
this._p2p = this.node.services.p2p;
this._db = this.node.services.db;
this._headers = [];
this._headers = new Map();
};
inherits(HeaderService, BaseService);
@ -32,13 +32,24 @@ HeaderService.prototype.getAPIMethods = function() {
var methods = [
['getAllHeaders', this, this.getAllHeaders, 0],
['getBestHeight', this, this.getBestHeight, 0]
['getBestHeight', this, this.getBestHeight, 0],
['getBlockHeader', this, this.getBlockHeader, 1]
];
return methods;
};
HeaderService.prototype.getBlockHeader = function(arg) {
if (utils.isHeight(arg)) {
var header = Array.from(this._headers)[arg];
return header ? header[1] : null;
}
return this._headers.get(arg);
};
HeaderService.prototype.getBestHeight = function() {
return this._tip.height;
};
@ -57,15 +68,14 @@ HeaderService.prototype.start = function(callback) {
},
function(tip, next) {
self._tip = tip;
self._getPersistedHeaders(next);
next();
}
], function(err, headers) {
], function(err) {
if (err) {
return callback(err);
}
this._headers = headers;
self._setListeners();
self._startSubscriptions();
callback();
@ -86,11 +96,11 @@ HeaderService.prototype._startSubscriptions = function() {
this._subscribed = true;
if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost'});
this._bus = this.node.openBus({remoteAddress: 'localhost-header'});
}
this._bus.on('p2p/headers', this._onHeaders.bind(this));
this._bus.on('p2p/block', this._onHeaders.bind(this));
this._bus.on('p2p/block', this._onBlock.bind(this));
this._bus.subscribe('p2p/headers');
this._bus.subscribe('p2p/block');
@ -98,7 +108,19 @@ HeaderService.prototype._startSubscriptions = function() {
HeaderService.prototype._onBlock = function(block) {
// we just want the header to keep a running list
this._onHeaders([block.header]);
log.debug('Header Service: new block: ' + block.hash);
var header = {
hash: block.hash,
version: block.version,
timestamp: block.ts,
bits: block.bits,
nonce: block.nonce,
merkleRoot: block.merkleRoot,
prevHash: block.prevBlock
};
this._onHeaders([header]);
};
HeaderService.prototype._onHeaders = function(headers) {
@ -107,27 +129,14 @@ HeaderService.prototype._onHeaders = function(headers) {
return;
}
log.debug('Header Service: Received: ' + headers.length + ' header(s).');
var operations = this._getHeaderOperations(headers);
this._tip.hash = headers[headers.length - 1].hash;
this._tip.height = this._tip.height + headers.length;
var tipOps = utils.encodeTip(this._tip, this.name);
operations.push({
type: 'put',
key: tipOps.key,
value: tipOps.value
});
this._db.batch(operations);
this._headers.concat(headers);
if (this._tip.height >= this._bestHeight) {
log.info('Header download complete.');
this.emit('headers', this._headers);
return;
}
this._sync();
@ -137,11 +146,15 @@ HeaderService.prototype._getHeaderOperations = function(headers) {
var self = this;
var runningHeight = this._tip.height;
var prevHeader = this._headers[this._headers.length - 1];
// get the last header placed in the map
var prevHeader = Array.from(this._headers)[this._headers.length - 1];
return headers.map(function(header) {
header.height = ++runningHeight;
header.chainwork = self._getChainwork(header, prevHeader).toString(16, 32);
prevHeader = header;
// set the header in the in-memory map
self._headers.set(header.hash, utils.convertLEHeaderStrings(header));
return {
type: 'put',
key: self._encoding.encodeHeaderKey(header.height, header.hash),
@ -158,6 +171,7 @@ HeaderService.prototype._setListeners = function() {
};
HeaderService.prototype._onBestHeight = function(height) {
log.debug('Header Service: Best Height is: ' + height);
this._bestHeight = height;
this._startSync();
};
@ -165,30 +179,31 @@ HeaderService.prototype._onBestHeight = function(height) {
HeaderService.prototype._startSync = function() {
this._numNeeded = this._bestHeight - this._tip.height;
if (this._numNeeded <= 0) {
return;
}
log.info('Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
log.info('Header Service: Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
this._p2pHeaderCallsNeeded = Math.ceil(this._numNeeded / 500);
this._sync();
};
HeaderService.prototype._sync = function() {
if (--this._p2pHeaderCallsNeeded > 0) {
log.info('Headers download progress: ' + this._tip.height + '/' +
if (this._tip.height < this._bestHeight) {
log.info('Header Service: download progress: ' + this._tip.height + '/' +
this._numNeeded + ' (' + (this._tip.height / this._numNeeded*100).toFixed(2) + '%)');
this._p2p.getHeaders({ startHash: this._tip.hash });
return;
}
log.debug('Header Service: download complete.');
this.emit('headers', this._headers);
};
HeaderService.prototype.getAllHeaders = function() {

View File

@ -41,7 +41,6 @@ P2P.prototype.getAPIMethods = function() {
['getInfo', this, this.getInfo, 0],
['getMempool', this, this.getMempool, 0],
['sendTransaction', this, this.sendTransaction, 1]
['getBestHeight', this, this.getBestHeight, 0]
];
return methods;
};
@ -103,7 +102,7 @@ P2P.prototype.getPublishEvents = function() {
};
P2P.prototype.sendTransaction = function(tx, callback) {
P2P.prototype.sendTransaction = function(tx) {
p2p.sendMessage(this.messages.Inventory(tx));
};

View File

@ -209,5 +209,15 @@ utils.encodeTip = function(tip, name) {
};
utils.isHeight = function(arg) {
return _.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))
};
utils.convertLEHeaderStrings = function(header) {
return Object.assign(header, {
prevHash: utils.reverseBufferToString(new Buffer(header.prevHash, 'hex')),
merkleRoot: utils.reverseBufferToString(new Buffer(header.merkleRoot, 'hex'))
});
};
module.exports = utils;

View File

@ -1,7 +1,9 @@
{
"name": "bitcore-node",
"description": "Full node with extended capabilities using Bitcore and Bitcoin Core",
"engines": { "node": ">=6.10.3" },
"engines": {
"node": ">=8.2.0"
},
"author": "BitPay <dev@bitpay.com>",
"version": "4.0.0",
"main": "./index.js",
@ -56,8 +58,8 @@
"commander": "^2.8.1",
"errno": "^0.1.4",
"express": "^4.13.3",
"leveldown": "^1.6.0",
"levelup": "^1.3.3",
"leveldown": "",
"levelup": "",
"liftoff": "^2.2.0",
"lodash": "^4.17.4",
"lru-cache": "^4.0.2",