From f69dfd04620a30fdc6681f5a2e6173922be5e1ab Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 25 Jul 2017 18:39:25 -0400 Subject: [PATCH] wip --- lib/services/address/index.js | 13 +- lib/services/block/index.js | 242 ++++++++++++++---------------- lib/services/header/index.js | 37 ++++- lib/services/mempool/index.js | 24 +++ lib/services/transaction/index.js | 12 +- package-lock.json | 5 + 6 files changed, 183 insertions(+), 150 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 8a54366b..d7fcd6d8 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -340,28 +340,26 @@ AddressService.prototype._startSubscriptions = function() { this._bus = this.node.openBus({remoteAddress: 'localhost'}); } - this._bus.on('block/block', this._onBlock.bind(this)); this._bus.on('block/reorg', this._onReorg.bind(this)); - this._bus.subscribe('block/reorg'); - this._bus.subscribe('block/block'); }; -AddressService.prototype._onReorg = function(oldBlockList, commonAncestor) { +AddressService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { // if the common ancestor block height is greater than our own, then nothing to do for the reorg - if (this._tip.height <= commonAncestor.height) { + if (this._tip.height <= commonAncestorHeader.height) { return; } // set the tip to the common ancestor in case something goes wrong with the reorg - var tipOps = utils.encodeTip({ hash: commonAncestor.hash, height: commonAncestor.height }, this.name); + var tipOps = utils.encodeTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }, this.name); var removalOps = [{ type: 'put', key: tipOps.key, value: tipOps.value }]; + // for every tx, remove the address index key for every input and output for(var i = 0; i < oldBlockList.length; i++) { var block = oldBlockList[i]; @@ -407,7 +405,7 @@ AddressService.prototype._onReorg = function(oldBlockList, commonAncestor) { }; -AddressService.prototype._onBlock = function(block) { +AddressService.prototype.onBlock = function(block, callback) { var self = this; var operations = []; @@ -421,6 +419,7 @@ AddressService.prototype._onBlock = function(block) { self._db.batch(operations); } + callback(null, operations); }; AddressService.prototype._processInput = function(tx, input, opts) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index d147cbf9..cc062564 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -272,57 +272,6 @@ BlockService.prototype._determineBlockState = function(block) { }; -BlockService.prototype._findCommonAncestor = function(hash) { - - assert(this._chainTips.length > 1, - 'chain tips collection should have at least 2 chains in order to find a common ancestor.'); - - var self = this; - var headers = this._header.getAllHeaders(); - var count = 0; - var _oldTip = this._tip.hash; - var _newTip = hash; - - assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.'); - - async.whilst( - // test case - function() { - - return _oldTip !== _newTip || ++count <= headers.size; - - }, - // get block - function(next) { - - // old tip has to be in database - self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, data) { - - if (err || !data) { - return next(err || new Error('missing block')); - } - - var block = self._encoding.decodeBlockValue(data); - _oldTip = block.toHeaders().toJSON().prevBlock; - var header = headers.get(_newTip); - - if (!header) { - return next(new Error('Header missing from list of headers')); - } - - _newTip = header.prevHash; - next(); - - }); - - }, - function() { - - this.emit('common ancestor', hash, _newTip === _oldTip ? _newTip : null); - - }); -}; - BlockService.prototype._getBlock = function(hash, callback) { var self = this; @@ -413,68 +362,134 @@ BlockService.prototype._getIncompleteChainIndexes = function(block) { return ret; }; -BlockService.prototype._getOldBlocks = function(currentHash, commonAncestorHash) { +BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) { - if (currentHash === commonAncestorHash || !commonAncestorHash || !currentHash) { - return; - } + var self = this; + var count = 0; + var _oldTip = this._tip.hash; + var _newTip = hash; - var oldBlocks; - var headers = this._header.getAllHeaders(); - for(var i = headers.length - 1; i > 0; --i) { - var item = headers.getIndex(i); - if (item.hash === currentHash) { - oldBlocks = [this._blockQueue.get(currentHash)]; - continue; - } - if (item.hash === commonAncestorHash) { - return oldBlocks; - } - oldBlocks.push(this._blockQueue.get(item.hash)); - } + assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.'); + async.whilst( + // test case + function() { + + return _oldTip !== _newTip || ++count <= allHeaders.size; + + }, + // get block + function(next) { + + // old tip has to be in database + self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, data) { + + if (err || !data) { + return next(err || new Error('missing block')); + } + + var block = self._encoding.decodeBlockValue(data); + _oldTip = block.toHeaders().toJSON().prevBlock; + var header = allHeaders.get(_newTip); + + if (!header) { + return next(new Error('Header missing from list of headers')); + } + + _newTip = header.prevHash; + next(); + + }); + + }, function(err) { + + if (err) { + return callback(err); + } + + self._getOldBlocks(_newTip, function(err, oldBlocks) { + + if (err) { + return callback(err); + } + + callback(null, hash, _newTip, oldBlocks); + + }); + + }); }; -BlockService.prototype._handleReorg = function(hash) { +BlockService.prototype._handleReorg = function(hash, allHeaders) { this._reorging = true; // while this is set, we won't be sending blocks log.warn('Chain reorganization detected! Our current block tip is: ' + this._tip.hash + ' the current block: ' + hash + '.'); - this._once('common ancestor', this._onCommonAncestor.bind(this)); + this._findCommonAncestor(hash, allHeaders, function(err, newHash, commonAncestorHash, oldBlocks) { - this._findCommonAncestor(hash); + if (err) { + + log.error('Block Service: A common ancestor block between hash: ' + + this._tip.hash + ' (our current tip) and: ' + newHash + + ' (the forked block) could not be found. Bitcore-node must exit.'); + + this.node.stop(); + + return; + } + + var commonAncestorHeader = allHeaders.get(commonAncestorHash); + log.warn('A common ancestor block was found to at hash: ' + commonAncestorHeader + '.'); + + this._broadcast(this.subscriptions.reorg, 'block/reorg', [commonAncestorHeader, oldBlocks]); + + this._onReorg(commonAncestorHeader, oldBlocks); + + this._reorging = false; + + }); }; -// once we know what hash the commonAncestor is, we cna set the set the tip to it -// and gather blocks to remove -BlockService.prototype._onCommonAncestor = function(newHash, commonAncestorHeader) { +// get the blocks from our current tip to the given hash, non-inclusive +BlockService.prototype._getOldBlocks = function(hash, callback) { - var oldBlocks = this._getOldBlocks(this._tip.hash, newHash, commonAncestorHeader); + var blocks = []; - if (!commonAncestorHeader || !oldBlocks || oldBlocks.length < 1) { + var _tip = this._tip.hash; - log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' + - newHash + ' (the forked block) could not be found. Bitcore-node must exit.'); + async.whilst( + function() { + return _tip !== hash; + }, + function(next) { - this.node.stop(); + this._get(this._encoding.encodeBlockKey(_tip), function(err, block) { - return; - } + if (err) { + return callback(err); + } - // set tip to this common ancesttor - log.warn('A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + '.'); + if (!block) { + next(new Error('expected to find a block in database, but found none.')); + return; + } + blocks.push(block); + _tip = block.toHeaders().toJSON().prevHash; + }); + }, + function(err) { + if (err) { + return callback(err); + } + callback(null, blocks); + }); - this._broadcast(this.subscriptions.reorg, 'block/reorg', [oldBlocks, commonAncestorHeader]); - - this._onReorg(oldBlocks, commonAncestorHeader); - - this._reorging = false; }; // this JUST rewinds the chain back to the common ancestor block, nothing more -BlockService.prototype._onReorg = function(oldBlockList, commonAncestorHeader) { +BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { // set the tip to the common ancestor in case something goes wrong with the reorg this._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }); @@ -530,10 +545,12 @@ BlockService.prototype._onAllHeaders = function(headers) { BlockService.prototype._processBlock = function() { - var operations = []; - var services = this.node.services; - var block = this._unprocessedBlocks.shift(); + var self = this; + var operations = []; + var services = self.node.services; + var block = self._unprocessedBlocks.shift(); + async.eachSeries( services, function(mod, next) { @@ -551,6 +568,7 @@ BlockService.prototype._processBlock = function() { setImmediate(next); } }, + function(err) { if (err) { @@ -584,46 +602,11 @@ BlockService.prototype._processBlock = function() { BlockService.prototype._onBlock = function(block) { + log.debug('Block Service: new block: ' + block.rhash()); + this._unprocessedBlocks.push(block); this._processBlock(); - return; - // 1. have we already seen this block? - //if (this._blockAlreadyProcessed(block)) { - // return; - //} - // 2. log the reception - //log.debug('New block received: ' + block.rhash()); - - // 3. store the block for safe keeping - //this._cacheBlock(block); - - // 4. don't process any more blocks if we are currently in a reorg - //if (this._reorging) { - // return; - //} - // 5. determine block state, reorg, outoforder, normal - var blockState = this._determineBlockState(block); - - // 6. update internal data structures depending on blockstate - this._updateChainInfo(block, blockState); - - // 7. react to state of block - switch (blockState) { - case 'outoforder': - // nothing to do, but wait until ancestor blocks come in - break; - case 'reorg': - //this._handleReorg(block.hash); - //break; - default: - // send all unsent blocks now that we have a complete chain - this._unprocessedBlocks.push(block); - this._processBlock(); - return; - //this._sendDelta(); - break; - } }; BlockService.prototype._selectActiveChain = function() { @@ -691,6 +674,7 @@ BlockService.prototype._sendDelta = function() { BlockService.prototype._setListeners = function() { this._header.once('headers', this._onAllHeaders.bind(this)); + this._header.on('reorg', this._handleReorg.bind(this)); }; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index cd0d1de5..79a4adef 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -18,7 +18,7 @@ var HeaderService = function(options) { this._tip = null; this._p2p = this.node.services.p2p; this._db = this.node.services.db; - this._checkpoint = options.checkpoint || 0; // the # of header to look back on boot + this._checkpoint = this.node.checkpoint || 1000; // the # of header to look back on boot this._headers = new utils.SimpleMap(); this.subscriptions = {}; @@ -36,7 +36,6 @@ HeaderService.STARTING_CHAINWORK = '00000000000000000000000000000000000000000000 HeaderService.prototype.subscribe = function(name, emitter) { this.subscriptions[name].push(emitter); log.info(emitter.remoteAddress, 'subscribe:', 'header/' + name, 'total:', this.subscriptions[name].length); - }; HeaderService.prototype.unsubscribe = function(name, emitter) { @@ -91,6 +90,7 @@ HeaderService.prototype.start = function(callback) { }, function(tip, next) { self._tip = tip; + self._originalTip = Object.assign(self._tip, {}); next(); }, function(next) { @@ -146,22 +146,28 @@ HeaderService.prototype.getPublishEvents = function() { }; HeaderService.prototype._onBlock = function(block) { - // we just want the header to keep a running list + var hash = block.rhash(); log.debug('Header Service: new block: ' + hash); + // this is case where a block was requested by referencing a + // hash from our own headers set. if (this._headers.get(hash)) { - for (var i = 0; i < this.subscriptions.length; i++) { - this.subscriptions[i].emit('header/block', block); + + for (var i = 0; i < this.subscriptions.block.length; i++) { + this.subscriptions.block[i].emit('header/block', block); } + return; + } - log.debug('Header Service: new block has arrived: ' + hash); + var header = block.toHeaders().toJSON(); header.timestamp = header.ts; header.prevHash = header.prevBlock; this._onHeaders([header], 1); + }; HeaderService.prototype._onHeaders = function(headers, convert) { @@ -229,7 +235,12 @@ HeaderService.prototype._onBestHeight = function(height) { HeaderService.prototype._startSync = function() { - this._numNeeded = this._bestHeight - this._tip.height; + this._numNeeded = Math.max(this._bestHeight - this._tip.height, this._checkpoint); + + if (this._tip.height > this._checkpoint) { + this._tip.height -= this._checkpoint; + this._tip.hash = this._headers.getIndex(this._tip.height).hash; + } log.info('Header Service: Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); @@ -248,10 +259,20 @@ HeaderService.prototype._sync = function() { this._p2p.getHeaders({ startHash: this._tip.hash }); return; - } log.debug('Header Service: download complete.'); + + // have we reorg'ed since we've been shutdown? + var headerHash = this._headers.getIndex(this._originalTip.height).hash; + + if (this._originalTip.hash !== headerHash) { + + this.emit('reorg', headerHash, this._headers); + return; + + } + this.emit('headers', this._headers); }; diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 795266de..acce3bcf 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -34,6 +34,30 @@ MempoolService.prototype.start = function(callback) { }); }; +MempoolService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { + + // set the tip to the common ancestor in case something goes wrong with the reorg + this._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }); + var tipOps = utils.encodeTip(this._tip, this.name); + + var removalOps = [{ + type: 'put', + key: tipOps.key, + value: tipOps.value + }]; + + // remove all the old blocks that we reorg from + oldBlockList.forEach(function(block) { + removalOps.push({ + type: 'del', + key: this.encoding.encodeBlockKey(block.rhash()), + }); + }); + + this._db.batch(removalOps); + +}; + MempoolService.prototype._startSubscriptions = function() { if (this._subscribed) { diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 7cdb5223..4997dcfe 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -149,7 +149,7 @@ TransactionService.prototype._getInputValues = function(tx) { }); }; -TransactionService.prototype._onBlock = function(block) { +TransactionService.prototype.onBlock = function(block, callback) { var self = this; @@ -163,17 +163,19 @@ TransactionService.prototype._onBlock = function(block) { } + callback(null, operations); + }; -TransactionService.prototype._onReorg = function(oldBlockList, commonAncestor) { +TransactionService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { // if the common ancestor block height is greater than our own, then nothing to do for the reorg - if (this._tip.height <= commonAncestor.header.height) { + if (this._tip.height <= commonAncestorHeader.height) { return; } // set the tip to the common ancestor in case something goes wrong with the reorg - var tipOps = utils.encodeTip({ hash: commonAncestor.hash, height: commonAncestor.header.height }, this.name); + var tipOps = utils.encodeTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }, this.name); var removalOps = [{ type: 'put', @@ -238,10 +240,8 @@ TransactionService.prototype._startSubscriptions = function() { this._bus = this.node.openBus({remoteAddress: 'localhost'}); } - this._bus.on('block/block', this._onBlock.bind(this)); this._bus.on('block/reorg', this._onReorg.bind(this)); - this._bus.subscribe('block/block'); this._bus.subscribe('block/reorg'); }; diff --git a/package-lock.json b/package-lock.json index 4c25406f..7312e2d9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1696,6 +1696,11 @@ "sntp": "1.0.9" } }, + "heapdump": { + "version": "0.3.9", + "resolved": "https://registry.npmjs.org/heapdump/-/heapdump-0.3.9.tgz", + "integrity": "sha1-A8dOsN9dZ74Jgug0KbqcnSs7f3g=" + }, "hmac-drbg": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/hmac-drbg/-/hmac-drbg-1.0.1.tgz",