From 6a18c1e46e9b9513b3e5a580ed762311f726ac75 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 11 Sep 2017 15:41:27 -0400 Subject: [PATCH] Fixed issue with reorg. --- lib/services/address/index.js | 73 +++++++++++---- lib/services/block/index.js | 138 +++++++++++++++++++--------- lib/services/header/index.js | 88 +++--------------- lib/services/timestamp/index.js | 2 +- lib/services/transaction/index.js | 3 +- package-lock.json | 14 ++- package.json | 2 +- test/services/address/index.unit.js | 7 +- test/services/block/index.unit.js | 24 +++-- 9 files changed, 193 insertions(+), 158 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index fcde178a..36e46741 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -60,7 +60,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba return callback(err); } - var txList = _.flatten(txLists); + var txList = _.flattenDeep(txLists); var results = { totalCount: txList.length, @@ -393,32 +393,65 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac }; AddressService.prototype._removeBlock = function(block, callback) { + var self = this; - async.eachSeries(block.txs, function(tx, next) { + + async.mapSeries(block.txs, function(tx, next) { + self._removeTx(tx, block, next); + }, callback); + }; AddressService.prototype._removeTx = function(tx, block, callback) { + var self = this; + var operations = []; + async.parallelLimit([ + function(next) { async.eachOfSeries(tx.inputs, function(input, indext, next) { - self._removeInput(input, tx, block, index, next); + self._removeInput(input, tx, block, index, function(err, ops) { + if(err) { + return next(err); + } + operations = operations.concat(ops); + next(); + }); }, next); }, + function(next) { async.eachOfSeries(tx.outputs, function(output, index, next) { - self._removeOutput(output, tx, block, index, next); + self._removeOutput(output, tx, block, index, function(err, ops) { + if(err) { + return next(err); + } + operations = operations.concat(ops); + next(); + }); }, next); } - ], 4, callback); + + ], 4, function(err) { + + if(err) { + return callback(err); + } + + callback(null, operations); + + }); + }; AddressService.prototype._removeInput = function(input, tx, block, index, callback) { var self = this; var address = input.getAddress(); + var removalOps = []; if (!address) { @@ -428,9 +461,11 @@ AddressService.prototype._removeInput = function(input, tx, block, index, callba address.network = self._network; address = address.toString(); + assert(block && block.__ts && block.__height, 'Missing block or block values.'); + removalOps.push({ type: 'del', - key: self._encoding.encodeAddressIndexKey(address, block.height, tx.txid(), index, 1, block.ts) + key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 1, block.__ts) }); // look up prev output of this input and put it back in the set of utxos @@ -441,14 +476,15 @@ AddressService.prototype._removeInput = function(input, tx, block, index, callba } assert(_tx, 'Missing prev tx to insert back into the utxo set when reorging address index.'); + assert(_tx.__height && _tx.__inputValues && _tx.__timestamp, 'Missing tx values.'); removalOps.push({ type: 'put', key: self._encoding.encodeUtxoIndexKey(address, _tx.txid(), input.prevout.index), value: self._encoding.encodeUtxoIndexValue( - _tx.height, + _tx.__height, _tx.__inputValues[input.prevout.index], - _tx.timestamp, _tx.outputs[input.prevout.index].script.toRaw()) + _tx.__timestamp, _tx.outputs[input.prevout.index].script.toRaw()) }); callback(null, removalOps); @@ -469,9 +505,11 @@ AddressService.prototype._removeOutput = function(output, tx, block, index, call address.network = self._network; address = address.toString(); + assert(block && block.__ts && block.__height, 'Missing block or block values.'); + removalOps.push({ type: 'del', - key: self._encoding.encodeAddressIndexKey(address, block.height, tx.txid(), index, 0, block.ts) + key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 0, block.__ts) }); //remove the utxo for this output from the collection @@ -491,13 +529,14 @@ AddressService.prototype.onReorg = function(args, callback) { // for every tx, remove the address index key for every input and output // for every input record, we need to find its previous output and put it back into the utxo collection - async.eachSeries(oldBlockList, self._removeBlock.bind(self), function(err, ops) { + async.mapSeries(oldBlockList, self._removeBlock.bind(self), function(err, ops) { if (err) { return callback(err); } - callback(null, _.compact(_.flatten(ops))); + var operations = _.compact(_.flattenDeep(ops)); + callback(null, operations); }); }; @@ -517,7 +556,7 @@ AddressService.prototype.onBlock = function(block, callback) { operations.push(ops); } - operations = _.flatten(operations); + operations = _.flattenDeep(operations); callback(null, operations); }; @@ -539,7 +578,7 @@ AddressService.prototype._processInput = function(tx, input, index, opts) { assert(timestamp, 'Must have a timestamp in order to process input.'); // address index - var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.height, txid, index, 1, timestamp); + var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 1, timestamp); var operations = [{ type: 'put', @@ -573,11 +612,11 @@ AddressService.prototype._processOutput = function(tx, output, index, opts) { assert(timestamp, 'Must have a timestamp in order to process output.'); - var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.height, txid, index, 0, timestamp); + var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 0, timestamp); var utxoKey = this._encoding.encodeUtxoIndexKey(address, txid, index); var utxoValue = this._encoding.encodeUtxoIndexValue( - opts.block.height, + opts.block.__height, output.value, timestamp, output.script.toRaw() @@ -608,7 +647,7 @@ AddressService.prototype._processTransaction = function(tx, opts) { return self._processOutput(tx, output, index, _opts); }); - outputOperations = _.flatten(_.compact(outputOperations)); + outputOperations = _.compact(_.flattenDeep(outputOperations)); assert(outputOperations.length % 2 === 0 && outputOperations.length <= tx.outputs.length * 2, 'Output operations count is not reflective of what should be possible.'); @@ -617,7 +656,7 @@ AddressService.prototype._processTransaction = function(tx, opts) { return self._processInput(tx, input, index, _opts); }); - inputOperations = _.flatten(_.compact(inputOperations)); + inputOperations = _.compact(_.flattenDeep(inputOperations)); assert(inputOperations.length % 2 === 0 && inputOperations.length <= tx.inputs.length * 2, diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 0efbf73f..f0ad6855 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -165,6 +165,36 @@ BlockService.prototype.getRawBlock = function(hash, callback) { }); }; +BlockService.prototype._checkTip = function(callback) { + // check to see if our own tip is no longer in the main chain + // (there was a reorg while we were shut down) + var self = this; + + self._header.getBlockHeader(self._tip.height, function(err, header) { + + if (err || !header) { + return callback(err || new Error('Header at height: ' + self._tip.height + ' was not found.')); + } + + if (header.hash === self._tip.hash) { + return callback(); + } + + // means the header service no longer tracks our tip on the main chain + // so we should consider ourselves in a reorg situation + self._header.getAllHeaders(function(err, headers) { + + if(err || !headers) { + return callback(err || new Error('All headers not found.')); + } + + self._handleReorg(header.hash, headers, callback); + + }); + + }); +}; + BlockService.prototype.start = function(callback) { var self = this; @@ -247,7 +277,7 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback var self = this; var count = 0; - var _oldTip = this._tip.hash; + var _oldTip = self._tip.hash; var _newTip = hash; var oldBlocks = []; @@ -272,13 +302,11 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback // once we've found the old tip, we will find its prev and check to see if matches new tip's prev var block = self._encoding.decodeBlockValue(data); + + // for the purposes of building the old blocks list + // apply the block's height - var blockHdr = allHeaders.get(block.rhash()); - if (!blockHdr) { - return next(new Error('Could not find block in list of headers: ' + block.rhash())); - } - block.height = blockHdr.height; - assert(block.height >= 0, 'We mamaged to save a header with an incorrect height.'); + block.__height = self._tip.height; // apply the block's timestamp self._timestamp.getTimestamp(block.rhash(), function(err, timestamp) { @@ -287,23 +315,21 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback return next(err || new Error('missing timestamp')); } - block.ts = timestamp; + block.__ts = timestamp; + // we will squirrel away the block because our services will need to remove it after we've found the common ancestor oldBlocks.push(block); // this is our current tip's prev hash _oldTip = bcoin.util.revHex(block.prevBlock); - // our current headers have the correct state of the chain, so consult that for its prev aash + // our current headers have the correct state of the chain, so consult that for its prev hash var header = allHeaders.get(_newTip); - if (!header) { - return next(new Error('Header missing from list of headers')); - } + assert(header, 'The expected hash is not in the headers list.'); // set new tip to the prev hash _newTip = header.prevHash; - next(); }); @@ -315,8 +341,9 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback return callback(err); } - var commonAncestorHash = _newTip; - callback(null, commonAncestorHash, oldBlocks); + assert(_newTip === _oldTip, 'A common ancestor hash could not be found.'); + + callback(null, _newTip, oldBlocks); }); }; @@ -364,7 +391,7 @@ BlockService.prototype._getHash = function(blockArg, callback) { }; -BlockService.prototype._handleReorg = function(hash, allHeaders, block) { +BlockService.prototype._handleReorg = function(hash, allHeaders, callback) { // hash is the hash of the new block that we are reorging to. assert(hash, 'We were asked to reorg to a non-existent hash.'); @@ -379,19 +406,18 @@ BlockService.prototype._handleReorg = function(hash, allHeaders, block) { if (err) { - log.error('Block Service: A common ancestor block between hash: ' + - self._tip.hash + ' (our current tip) and: ' + hash + - ' (the forked block) could not be found. Bitcore-node must exit.'); + return callback(err); - self.node.stop(); - return; } var commonAncestorHeader = allHeaders.get(commonAncestorHash); - log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash); + assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.'); - self._processReorg(commonAncestorHeader, oldBlocks, block); + log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + + ' at height: ' + commonAncestorHeader.height + '. Removing: ' + oldBlocks.length + ' block(s) from our indexes.'); + + self._processReorg(commonAncestorHeader, oldBlocks, callback); }); @@ -399,7 +425,7 @@ BlockService.prototype._handleReorg = function(hash, allHeaders, block) { }; // this JUST rewinds the chain back to the common ancestor block, nothing more -BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, newBlock) { +BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, callback) { // set the tip to the common ancestor in case something goes wrong with the reorg var self = this; @@ -420,23 +446,38 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, n }); }); - self._db.batch(removalOps, function() { + self._db.batch(removalOps, function(err) { - self._reorging = false; + if(err) { + return callback(err); + } + + self._reorging = false; + self._p2p.clearInventoryCache(); + callback(); - if (newBlock) { - self._onBlock(newBlock); - } }); }; BlockService.prototype._onAllHeaders = function() { - this._startSync(); + // once the header service has all of its headers, we know we can check our + // own tip for consistency and make sure our it is on the mainchain + var self = this; + self._checkTip(function(err) { + + if(err) { + log.error(err); + return self.node.stop(); + } + + self._startSync(); + + }); }; -BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, newBlock) { +BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, callback) { var self = this; var operations = []; @@ -463,22 +504,16 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, function(err) { if (err) { - if (!self.node.stopping) { - log.error('Block Service: ' + err); - self.node.stop(); - } - return; + return callback(err); } self._db.batch(operations, function(err) { - if (err && !self.node.stopping) { - log.error('Block Service: ' + err); - self.node.stop(); + if (err) { + return callback(err); } - self._onReorg(commonAncestorHeader, oldBlocks, newBlock); - + self._onReorg(commonAncestorHeader, oldBlocks, callback); }); } @@ -568,7 +603,6 @@ BlockService.prototype._onBlock = function(block) { return; } - // this will prevent the block service from hammering us with blocks // before we are ready // this will be turned to false once all the blocks have been processed @@ -581,7 +615,7 @@ BlockService.prototype._onBlock = function(block) { return; } log.debug('Block Service: new block: ' + block.rhash()); - block.height = this._tip.height + 1; + block.__height = this._tip.height + 1; this._processBlock(block); }; @@ -589,12 +623,26 @@ BlockService.prototype._onBlock = function(block) { BlockService.prototype._setListeners = function() { var self = this; + self._header.once('headers', self._onAllHeaders.bind(self)); - self._header.on('reorg', function(hash, headers, block) { + + self._header.on('reorg', function(hash, headers) { + if (!self._reorging && !this._initialSync) { + log.debug('Block Service: detected a reorg from the header service.'); - self._handleReorg(hash, headers, block); + + self._handleReorg(hash, headers, function(err) { + + if(err) { + return callback(err); + } + self._startSync(); + + }); + } + }); }; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 3b9dd975..460de750 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -139,10 +139,6 @@ HeaderService.prototype.start = function(callback) { function(tip, next) { self._tip = tip; - log.debug('Header Service: original tip height is: ' + self._tip.height); - log.debug('Header Service: original tip hash is: ' + self._tip.hash); - - self._originalTip = { height: self._tip.height, hash: self._tip.hash }; if (self._tip.height === 0) { @@ -469,29 +465,7 @@ HeaderService.prototype._onHeadersSave = function(err) { self._setBestHeader(); - self._detectStartupReorg(function(err, reorg) { - - if (err) { - log.error(err); - self.node.stop(); - return; - } - - if (reorg) { - return self._handleReorg(null, null, function(err) { - if (err) { - log.error(err); - this.node.stop(); - return; - } - }); - } - - log.debug('Header Service: emitting headers to block service.'); - - - self.emit('headers'); - }); + self.emit('headers'); }; @@ -594,30 +568,6 @@ HeaderService.prototype._detectReorg = function(block, callback) { }; -HeaderService.prototype._detectStartupReorg = function(callback) { - - var self = this; - - self._getHeader(self._originalTip.height, null, function(err, header) { - - if (err) { - return callback(err); - } - - if (!header) { - return callback(null, true); - } - - if (header.hash !== self._originalTip.hash) { - return callback(null, true); - } - - callback(null, false); - - }); - -}; - HeaderService.prototype._handleReorg = function(block, header, callback) { var self = this; @@ -628,35 +578,19 @@ HeaderService.prototype._handleReorg = function(block, header, callback) { return callback(err || new Error('Missing headers')); } - var originalTipHeader = headers.getIndex(self._originalTip.height); + var hash = block.rhash(); + headers.set(hash, header); // appends to the end - assert(header, 'Atempted to get reorg header for the original tip: ' + self._originalTip.hash + - ' at height: ' + self._originalTip.height + ' but could not find in the database.'); + // this will ensure our own headers collection is correct + self._onReorg(block, headers, function(err) { - var hash = originalTipHeader.hash; + if (err) { + return callback(err); + } - // reorg from new blocks - if (block && header) { - - hash = block.rhash(); - headers.set(hash, header); // appends to the end - - // this will ensure our own headers collection is correct - return self._onReorg(block, headers, function(err) { - - if (err) { - return callback(err); - } - - self.emit('reorg', hash, headers, block); - return callback(); - }); - } - - // reorg from startup - assert(hash, 'To reorg, we need a hash to reorg to.'); - self.emit('reorg', hash, headers); - callback(); + self.emit('reorg', hash, headers); + return callback(); + }); }); diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index a837069f..a4ab09f7 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -131,7 +131,7 @@ TimestampService.prototype.onReorg = function(args, callback) { removalOps.concat([ { type: 'del', - key: self._encoding.encodeTimestampBlockKey(block.ts), + key: self._encoding.encodeTimestampBlockKey(block.__ts), }, { type: 'del', diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 87012dda..37cf7fb6 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -205,6 +205,7 @@ TransactionService.prototype._getTransaction = function(txid, options, callback) tx = self._encoding.decodeTransactionValue(tx); callback(null, txid, tx, options); + }); }; @@ -345,7 +346,7 @@ TransactionService.prototype._processTransaction = function(tx, opts, callback) assert(tx.__timestamp, 'Timestamp is required when saving a transaction.'); // height - tx.__height = opts.block.height; + tx.__height = opts.block.__height; assert(tx.__height, 'Block height is required when saving a trasnaction.'); callback(null, { diff --git a/package-lock.json b/package-lock.json index e9095d87..421f42a9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "bitcore-node", - "version": "5.0.0-beta.7", + "version": "5.0.0-beta.8", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2034,6 +2034,11 @@ "sntp": "1.0.9" } }, + "he": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/he/-/he-1.1.1.tgz", + "integrity": "sha1-k0EP0hsAlzUVH4howvJx80J+I/0=" + }, "hmac-drbg": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/hmac-drbg/-/hmac-drbg-1.0.1.tgz", @@ -2901,9 +2906,9 @@ } }, "mocha": { - "version": "3.5.0", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-3.5.0.tgz", - "integrity": "sha512-pIU2PJjrPYvYRqVpjXzj76qltO9uBYI7woYAMoxbSefsa+vqAfptjoeevd6bUgwD0mPIO+hv9f7ltvsNreL2PA==", + "version": "3.5.1", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-3.5.1.tgz", + "integrity": "sha512-2jD6NS4PNKVDpaICERx8vEkXaisx2MlRKxj5KuFJVZJdK1zRGs/HnS3OeH7zXhXAbGlzaMIan4Kwpm4O5hORnA==", "requires": { "browser-stdout": "1.3.0", "commander": "2.9.0", @@ -2912,6 +2917,7 @@ "escape-string-regexp": "1.0.5", "glob": "7.1.1", "growl": "1.9.2", + "he": "1.1.1", "json3": "3.3.2", "lodash.create": "3.1.1", "mkdirp": "0.5.1", diff --git a/package.json b/package.json index 4afb2741..d46b7ddb 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "node": ">=8.0.0" }, "author": "BitPay ", - "version": "5.0.0-beta.7", + "version": "5.0.0-beta.8", "main": "./index.js", "repository": "git://github.com/bitpay/bitcore-node.git", "homepage": "https://github.com/bitpay/bitcore-node", diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index bcb1f1a2..54532204 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -228,11 +228,14 @@ describe('Address Service', function() { it('should reorg when there is nothing to reorg', function(done ) { var commonAncestorHeader = bcoin.block.fromRaw(blocks[5], 'hex').toHeaders().toJSON(); - var oldBlocks = [bcoin.block.fromRaw(blocks[6], 'hex')]; + var block = bcoin.block.fromRaw(blocks[6], 'hex'); + block.__ts = 55555; + block.__height = 999; + var oldBlocks = [block]; addressService.onReorg([commonAncestorHeader, oldBlocks], function(err, ops) { - expect(ops.length).to.equal(0); + expect(ops.length).to.equal(2); done(); }); diff --git a/test/services/block/index.unit.js b/test/services/block/index.unit.js index c29a2dc1..4a9ee717 100644 --- a/test/services/block/index.unit.js +++ b/test/services/block/index.unit.js @@ -49,23 +49,27 @@ describe('Block Service', function() { it('should find the common ancestor between the current chain and the new chain', function(done) { blockService._tip = { hash: block2.rhash(), height: 70901 }; + var data = blockService._encoding.encodeBlockValue(block2); + blockService._db = { get: sandbox.stub().callsArgWith(1, null, data) }; + blockService._timestamp = { getTimestamp: sandbox.stub().callsArgWith(1, null, 1234) }; + var commonAncestorHash = bcoin.util.revHex(block2.prevBlock); + var allHeaders = { get: sandbox.stub().returns({ prevHash: commonAncestorHash }), size: 1e6 }; - var encodedData = blockService._encoding.encodeBlockValue(block2); + blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) { - var get = sandbox.stub().callsArgWith(1, null, encodedData); - - var headers = { get: sandbox.stub().returns({ prevHash: block1.rhash() }) }; - blockService._db = { get: get }; - - blockService._findCommonAncestor('aa', headers, function(err, common, oldBlocks) { - if (err) { + if(err) { return done(err); } - expect(common).to.equal('aa'); - expect(oldBlocks).to.deep.equal([]); + + block2.__ts = 1234; + block2.__height = 70901; + expect(commonAncestorHashActual).to.equal(commonAncestorHash); + expect(oldBlockList).to.deep.equal([block2]); done(); + }); }); + }); describe('#getBestBlockHash', function() {