From 759087bfc249df4b9049eda439ce4391b2159324 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 20 Jul 2017 15:55:02 -0400 Subject: [PATCH] wip --- lib/logger.js | 3 +- lib/services/block/encoding.js | 4 +- lib/services/block/index.js | 228 ++++++++++++++++++++------------- lib/services/header/index.js | 77 ++++++----- lib/services/p2p/index.js | 3 +- lib/utils.js | 10 ++ package.json | 8 +- 7 files changed, 204 insertions(+), 129 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index 253a5277..791ede22 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -13,7 +13,7 @@ function Logger(options) { options = {}; } this.formatting = _.isUndefined(options.formatting) ? Logger.DEFAULT_FORMATTING : options.formatting; - this.level = options.logLevel || process.env['LOG_LEVEL'] || 0; // lower numbers means less logs + this.level = options.logLevel || process.env.LOG_LEVEL || 0; // lower numbers means less logs } Logger.DEFAULT_FORMATTING = true; @@ -31,7 +31,6 @@ Logger.prototype.info = function() { * #error */ Logger.prototype.error = function() { - var existingPermitWrites = this.permitWrites; this._log.apply(this, ['red', 'error'].concat(Array.prototype.slice.call(arguments))); }; diff --git a/lib/services/block/encoding.js b/lib/services/block/encoding.js index fe82c6a8..91f56daa 100644 --- a/lib/services/block/encoding.js +++ b/lib/services/block/encoding.js @@ -18,11 +18,11 @@ Encoding.prototype.decodeBlockKey = function(buffer) { }; Encoding.prototype.encodeBlockValue = function(block) { - return block.toBuffer(); + return block.toRaw(); }; Encoding.prototype.decodeBlockValue = function(buffer) { - return Block.fromBuffer(buffer); + return Block.fromRaw(buffer); }; module.exports = Encoding; diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 8b0489d6..9edd7db6 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -30,7 +30,7 @@ var BlockService = function(options) { this._blockQueue = LRU({ max: 50 * (1 * 1024 * 1024), // 50 MB of blocks, length: function(n) { - return n.toBuffer().length; + return n.size; } }); // hash -> block @@ -55,14 +55,13 @@ BlockService.prototype.getAPIMethods = function() { var methods = [ ['getBlock', this, this.getBlock, 1], ['getRawBlock', this, this.getRawBlock, 1], - ['getBlockHeader', this, this.getBlockHeader, 1], ['getBlockOverview', this, this.getBlockOverview, 1], ['getBestBlockHash', this, this.getBestBlockHash, 0] ]; return methods; }; -BlockService.prototype.getBestBlockHash = function(callback) { +BlockService.prototype.getBestBlockHash = function() { var headers = this._header.getAllHeaders(); return headers[headers.length - 1].hash; }; @@ -79,32 +78,6 @@ BlockService.prototype.getBlock = function(arg, callback) { }; - -BlockService.prototype.getBlockHeader = function(blockArg, callback) { - - blockArg = this._getHash(blockArg); - - if (!blockArg) { - return callback(); - } - - var headers = this._header.getAllHeaders(); - this._getBlock(blockArg, function(err, block) { - - if(err) { - return callback(err); - } - - if (!block) { - return callback(); - } - - callback(null, block.header.toJSON()); - - }); - -}; - BlockService.prototype.getBlockOverview = function(hash, callback) { this._getBlock(hash, function(err, block) { @@ -113,22 +86,25 @@ BlockService.prototype.getBlockOverview = function(hash, callback) { return callback(err); } + var header = block.toHeaders(); + var blockOverview = { hash: block.hash, - version: block.header.version, + version: header.version, confirmations: null, height: null, chainWork: null, - prevHash: utils.reverseBufferToString(block.header.prevHash), + prevHash: header.prevBlock, nextHash: null, - merkleRoot: block.header.merkleroot, + merkleRoot: block.merkleroot, time: null, medianTime: null, - nonce: block.header.nonce, - bits: block.header.bits, + nonce: block.nonce, + bits: block.bits, difficulty: null, txids: null }; + callback(null, blockOverview); }); @@ -180,8 +156,9 @@ BlockService.prototype.start = function(callback) { self._db.getServiceTip('block', next); }, function(tip, next) { - self._tip = tip; + self._setTip(tip); self._chainTips.push(self._tip.hash); + self._primeBlockQueue(next); } ], function(err) { @@ -197,6 +174,50 @@ BlockService.prototype.start = function(callback) { }; +BlockService.prototype._primeBlockQueue = function(callback) { + // this will load the last 50 blocks into the block queue to prime the cache + var self = this; + var hash = this._tip.hash; + + async.timesSeries(50, function(index, next) { + + self._db.get(self._encoding.encodeBlockKey(hash), function(err, block) { + + if(err) { + return next(err); + } + + if (!block) { + return next(); + } + + hash = block.toHeaders().prevBlock; + self._blockQueue.set(block.hash, block); + next(); + + }); + + }, callback); +}; + +// upon startup, has the chain reorg'ed from where we were when we shutdown? +BlockService.prototype._detectInitialChainState = function(headers) { + + if (this._tip.height === 0) { + return; + } + + var index = this._tip.height - 1; + var record = Array.from(headers)[index]; + + if (record[0] !== this._tip.hash) { + // reorg! we don't yet have the blocks to reorg to, so we'll rewind the chain back to + // to common ancestor, set the tip to the common ancestor and start the sync + this._chainTips.push(record[0]); + this._handleReorg(record[0]); + } +}; + BlockService.prototype.stop = function(callback) { if (this._deferTimeout) { this._deferTimeout.unref(); @@ -245,10 +266,10 @@ BlockService.prototype._broadcast = function(subscribers, name, entity) { BlockService.prototype._cacheBlock = function(block) { - log.debug('Setting block: "' + block.hash + '" in the block cache.'); + log.debug('Setting block: "' + block.rhash() + '" in the block cache.'); // 1. set the block queue, which holds full blocks in memory - this._blockQueue.set(block.hash, block); + this._blockQueue.set(block.rhash(), block); // 2. store the block in the database var operations = this._getBlockOperations(block); @@ -280,33 +301,54 @@ BlockService.prototype._determineBlockState = function(block) { }; -BlockService.prototype._findCommonAncestor = function(block) { +BlockService.prototype._findCommonAncestor = function(hash) { assert(this._chainTips.length > 1, 'chain tips collection should have at least 2 chains in order to find a common ancestor.'); + var self = this; + var headers = this._header.getAllHeaders(); + var count = 0; var _oldTip = this._tip.hash; - var _newTip = block.hash; + var _newTip = hash; assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.'); - var len = this._blockQueue.itemCount; - for(var i = 0; i < len; i++) { + async.whilst( + // test case + function() { - var oldBlk = this._blockQueue.get(_oldTip); - var newBlk = this._blockQueue.get(_newTip); + return _oldTip !== _newTip || ++count <= headers.size; - if (!oldBlk || !newBlk) { - return; - } + }, + // get block + function(next) { - _oldTip = oldBlk.prevHash; - _newTip = newBlk.prevHash; + // old tip has to be in database + self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, block) { - if (_newTip === _oldTip) { - return this._blockQueue.get(_newTip); - } - } + if (err || !block) { + return next(err || new Error('missing block')); + } + + _oldTip = block.toHeaders().prevBlock; + var header = headers.get(_newTip); + + if (!header) { + return next(new Error('Header missing from list of headers')); + } + + _newTip = header.prevHash; + next(); + + }); + + }, + function() { + + this.emit('common ancestor', hash, _newTip === _oldTip ? _newTip : null); + + }); }; BlockService.prototype._getBlock = function(hash, callback) { @@ -335,7 +377,7 @@ BlockService.prototype._getBlockOperations = function(block) { // block operations.push({ type: 'put', - key: self._encoding.encodeBlockKey(block.hash), + key: self._encoding.encodeBlockKey(block.rhash()), value: self._encoding.encodeBlockValue(block) }); @@ -350,7 +392,7 @@ BlockService.prototype._getDelta = function(tip) { while (_tip !== this._tip.hash) { var blk = this._blockQueue.get(_tip); - _tip = utils.reverseBufferToString(blk.header.prevHash); + _tip = blk.toHeaders().prevBlock; blocks.push(blk); } @@ -362,8 +404,10 @@ BlockService.prototype._getDelta = function(tip) { BlockService.prototype._getHash = function(blockArg) { var headers = this._header.getAllHeaders(); - return (_.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))) && - headers[blockArg] ? headers[blockArg] : null; + + if (utils.isHeight(blockArg)) { + return Array.from(headers)[blockArg]; + } }; @@ -371,8 +415,8 @@ BlockService.prototype._getIncompleteChainIndexes = function(block) { var ret = []; for(var i = 0; i < this._incompleteChains.length; i++) { var chain = this._incompleteChains[i]; - var lastEntry = chain[chain.length - 1]; - if (utils.reverseBufferToString(lastEntry.header.prevHash) === block.hash) { + var lastEntry = chain[chain.length - 1].toHeaders(); + if (lastEntry.prevHash === block.rhash()) { ret.push(i); } } @@ -401,36 +445,50 @@ BlockService.prototype._getOldBlocks = function(currentHash, commonAncestorHash) }; -BlockService.prototype._handleReorg = function(block) { +BlockService.prototype._handleReorg = function(hash) { - this._reorging = true; + this._reorging = true; // while this is set, we won't be sending blocks log.warn('Chain reorganization detected! Our current block tip is: ' + - this._tip.hash + ' the current block: ' + block.hash + '.'); + this._tip.hash + ' the current block: ' + hash + '.'); - var commonAncestor = this._findCommonAncestor(block); - var oldBlocks = this._getOldBlocks(this._tip.hash, block.hash, commonAncestor.hash); + this._once('common ancestor', this._onCommonAncestor.bind(this)); + + this._findCommonAncestor(hash); +}; + +// once we know what hash the commonAncestor is, we cna set the set the tip to it +// and gather blocks to remove +BlockService.prototype._onCommonAncestor = function(newHash, commonAncestorHeader) { + + var oldBlocks = this._getOldBlocks(this._tip.hash, newHash, commonAncestorHeader); + + if (!commonAncestorHeader || !oldBlocks || oldBlocks.length < 1) { - if (!commonAncestor || !oldBlocks || oldBlocks.length < 1) { log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' + - block.hash + ' (the forked block) could not be found. Bitcore-node must exit.'); + newHash + ' (the forked block) could not be found. Bitcore-node must exit.'); + this.node.stop(); + return; } - log.warn('A common ancestor block was found to at hash: ' + commonAncestor.hash + '.'); + // set tip to this common ancesttor + log.warn('A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + '.'); - this._broadcast(this.subscriptions.reorg, 'block/reorg', [oldBlocks, [block], commonAncestor]); + this._broadcast(this.subscriptions.reorg, 'block/reorg', [oldBlocks, commonAncestorHeader]); - this._onReorg(oldBlocks, [block], commonAncestor); + this._onReorg(oldBlocks, commonAncestorHeader); this._reorging = false; }; -BlockService.prototype._onReorg = function(oldBlockList, newBlockList, commonAncestor) { +// this JUST rewinds the chain back to the common ancestor block, nothing more +BlockService.prototype._onReorg = function(oldBlockList, commonAncestorHeader) { // set the tip to the common ancestor in case something goes wrong with the reorg - var tipOps = utils.encodeTip({ height: commonAncestor.header.height }); + this._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }); + var tipOps = utils.encodeTip(this._tip, this.name); var removalOps = [{ type: 'put', @@ -438,30 +496,22 @@ BlockService.prototype._onReorg = function(oldBlockList, newBlockList, commonAnc value: tipOps.value }]; - // remove all the old blocks that we reorg from oldBlockList.forEach(function(block) { removalOps.push({ type: 'del', - key: this.encoding.encodeBlockKey(block.header.timestamp), + key: this.encoding.encodeBlockKey(block.rhash()), }); }); this._db.batch(removalOps); - //call onBlock for each of the new blocks - newBlockList.forEach(this._onBlock.bind(this)); - // if the common ancestor block height is greater than our own, then nothing to do for the reorg - if (this._tip.height <= commonAncestor.header.height) { - return; - } - }; BlockService.prototype._isChainReorganizing = function(block) { // if we aren't an out of order block, then is this block's prev hash our tip? - var prevHash = utils.reverseBufferToString(block.header.prevHash); + var prevHash = block.toHeaders().prevBlock; return prevHash !== this._tip.hash; @@ -470,7 +520,7 @@ BlockService.prototype._isChainReorganizing = function(block) { BlockService.prototype._isOutOfOrder = function(block) { // is this block the direct child of one of our chain tips? If so, not an out of order block - var prevHash = utils.reverseBufferToString(block.header.prevHash); + var prevHash = block.toHeaders().prevBlock; for(var i = 0; i < this._chainTips.length; i++) { var chainTip = this._chainTips[i]; @@ -484,7 +534,8 @@ BlockService.prototype._isOutOfOrder = function(block) { }; BlockService.prototype._onAllHeaders = function(headers) { - this._bestHeight = headers.length; + this._bestHeight = headers.size; + this._detectInitialChainState(headers); this._startSync(); }; @@ -496,7 +547,7 @@ BlockService.prototype._onBlock = function(block) { } // 2. log the reception - log.debug2('New block received: ' + block.hash); + log.info('New block received: ' + block.rhash()); // 3. store the block for safe keeping this._cacheBlock(block); @@ -517,8 +568,7 @@ BlockService.prototype._onBlock = function(block) { // nothing to do, but wait until ancestor blocks come in break; case 'reorg': - this._handleReorg(); - this.emit('reorg', block); + this._handleReorg(block.hash); break; default: // send all unsent blocks now that we have a complete chain @@ -640,7 +690,7 @@ BlockService.prototype._startSubscriptions = function() { this._subscribed = true; if (!this._bus) { - this._bus = this.node.openBus({remoteAddress: 'localhost'}); + this._bus = this.node.openBus({remoteAddress: 'localhost-block'}); } this._bus.on('p2p/block', this._onBlock.bind(this)); @@ -662,7 +712,7 @@ BlockService.prototype._sync = function() { BlockService.prototype._updateChainInfo = function(block, state) { - var prevHash = utils.reverseBufferToString(block.header.prevHash); + var prevHash = block.toHeaders().prevBlock; if (state === 'normal') { this._updateNormalStateChainInfo(block, prevHash); @@ -721,7 +771,7 @@ BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) { */ - var prevHash = utils.reverseBufferToString(block.header.prevHash); + var prevHash = block.toHeaders().prevBlock; var possibleJoins = { tip: [], genesis: [] }; var newChains = []; var joinedChains = false; @@ -738,7 +788,7 @@ BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) { var chains; // criteria 1 - var lastEntryPrevHash = utils.reverseBufferToString(lastEntry.header.prevHash); + var lastEntryPrevHash = lastEntry.toHeaders().prevBlock; if (lastEntryPrevHash === block.hash) { joinedChains = true; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 0ae2b18c..5368150c 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -17,7 +17,7 @@ var HeaderService = function(options) { this._tip = null; this._p2p = this.node.services.p2p; this._db = this.node.services.db; - this._headers = []; + this._headers = new Map(); }; inherits(HeaderService, BaseService); @@ -32,13 +32,24 @@ HeaderService.prototype.getAPIMethods = function() { var methods = [ ['getAllHeaders', this, this.getAllHeaders, 0], - ['getBestHeight', this, this.getBestHeight, 0] + ['getBestHeight', this, this.getBestHeight, 0], + ['getBlockHeader', this, this.getBlockHeader, 1] ]; return methods; }; +HeaderService.prototype.getBlockHeader = function(arg) { + + if (utils.isHeight(arg)) { + var header = Array.from(this._headers)[arg]; + return header ? header[1] : null; + } + + return this._headers.get(arg); +}; + HeaderService.prototype.getBestHeight = function() { return this._tip.height; }; @@ -57,15 +68,14 @@ HeaderService.prototype.start = function(callback) { }, function(tip, next) { self._tip = tip; - self._getPersistedHeaders(next); + next(); } - ], function(err, headers) { + ], function(err) { if (err) { return callback(err); } - this._headers = headers; self._setListeners(); self._startSubscriptions(); callback(); @@ -86,11 +96,11 @@ HeaderService.prototype._startSubscriptions = function() { this._subscribed = true; if (!this._bus) { - this._bus = this.node.openBus({remoteAddress: 'localhost'}); + this._bus = this.node.openBus({remoteAddress: 'localhost-header'}); } this._bus.on('p2p/headers', this._onHeaders.bind(this)); - this._bus.on('p2p/block', this._onHeaders.bind(this)); + this._bus.on('p2p/block', this._onBlock.bind(this)); this._bus.subscribe('p2p/headers'); this._bus.subscribe('p2p/block'); @@ -98,7 +108,19 @@ HeaderService.prototype._startSubscriptions = function() { HeaderService.prototype._onBlock = function(block) { // we just want the header to keep a running list - this._onHeaders([block.header]); + log.debug('Header Service: new block: ' + block.hash); + + var header = { + hash: block.hash, + version: block.version, + timestamp: block.ts, + bits: block.bits, + nonce: block.nonce, + merkleRoot: block.merkleRoot, + prevHash: block.prevBlock + }; + + this._onHeaders([header]); }; HeaderService.prototype._onHeaders = function(headers) { @@ -107,27 +129,14 @@ HeaderService.prototype._onHeaders = function(headers) { return; } + log.debug('Header Service: Received: ' + headers.length + ' header(s).'); + var operations = this._getHeaderOperations(headers); this._tip.hash = headers[headers.length - 1].hash; this._tip.height = this._tip.height + headers.length; - var tipOps = utils.encodeTip(this._tip, this.name); - - operations.push({ - type: 'put', - key: tipOps.key, - value: tipOps.value - }); - this._db.batch(operations); - this._headers.concat(headers); - - if (this._tip.height >= this._bestHeight) { - log.info('Header download complete.'); - this.emit('headers', this._headers); - return; - } this._sync(); @@ -137,11 +146,15 @@ HeaderService.prototype._getHeaderOperations = function(headers) { var self = this; var runningHeight = this._tip.height; - var prevHeader = this._headers[this._headers.length - 1]; + // get the last header placed in the map + var prevHeader = Array.from(this._headers)[this._headers.length - 1]; + return headers.map(function(header) { header.height = ++runningHeight; header.chainwork = self._getChainwork(header, prevHeader).toString(16, 32); prevHeader = header; + // set the header in the in-memory map + self._headers.set(header.hash, utils.convertLEHeaderStrings(header)); return { type: 'put', key: self._encoding.encodeHeaderKey(header.height, header.hash), @@ -158,6 +171,7 @@ HeaderService.prototype._setListeners = function() { }; HeaderService.prototype._onBestHeight = function(height) { + log.debug('Header Service: Best Height is: ' + height); this._bestHeight = height; this._startSync(); }; @@ -165,30 +179,31 @@ HeaderService.prototype._onBestHeight = function(height) { HeaderService.prototype._startSync = function() { this._numNeeded = this._bestHeight - this._tip.height; - if (this._numNeeded <= 0) { - return; - } - log.info('Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); + log.info('Header Service: Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); - this._p2pHeaderCallsNeeded = Math.ceil(this._numNeeded / 500); this._sync(); }; HeaderService.prototype._sync = function() { - if (--this._p2pHeaderCallsNeeded > 0) { - log.info('Headers download progress: ' + this._tip.height + '/' + + if (this._tip.height < this._bestHeight) { + + log.info('Header Service: download progress: ' + this._tip.height + '/' + this._numNeeded + ' (' + (this._tip.height / this._numNeeded*100).toFixed(2) + '%)'); + this._p2p.getHeaders({ startHash: this._tip.hash }); return; } + log.debug('Header Service: download complete.'); + this.emit('headers', this._headers); + }; HeaderService.prototype.getAllHeaders = function() { diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 301ac0e6..c5ceefb7 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -41,7 +41,6 @@ P2P.prototype.getAPIMethods = function() { ['getInfo', this, this.getInfo, 0], ['getMempool', this, this.getMempool, 0], ['sendTransaction', this, this.sendTransaction, 1] - ['getBestHeight', this, this.getBestHeight, 0] ]; return methods; }; @@ -103,7 +102,7 @@ P2P.prototype.getPublishEvents = function() { }; -P2P.prototype.sendTransaction = function(tx, callback) { +P2P.prototype.sendTransaction = function(tx) { p2p.sendMessage(this.messages.Inventory(tx)); }; diff --git a/lib/utils.js b/lib/utils.js index a21c705e..54816de6 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -209,5 +209,15 @@ utils.encodeTip = function(tip, name) { }; +utils.isHeight = function(arg) { + return _.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg)) +}; + +utils.convertLEHeaderStrings = function(header) { + return Object.assign(header, { + prevHash: utils.reverseBufferToString(new Buffer(header.prevHash, 'hex')), + merkleRoot: utils.reverseBufferToString(new Buffer(header.merkleRoot, 'hex')) + }); +}; module.exports = utils; diff --git a/package.json b/package.json index f8b1924a..90dd1cd4 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,9 @@ { "name": "bitcore-node", "description": "Full node with extended capabilities using Bitcore and Bitcoin Core", - "engines": { "node": ">=6.10.3" }, + "engines": { + "node": ">=8.2.0" + }, "author": "BitPay ", "version": "4.0.0", "main": "./index.js", @@ -56,8 +58,8 @@ "commander": "^2.8.1", "errno": "^0.1.4", "express": "^4.13.3", - "leveldown": "^1.6.0", - "levelup": "^1.3.3", + "leveldown": "", + "levelup": "", "liftoff": "^2.2.0", "lodash": "^4.17.4", "lru-cache": "^4.0.2",