Fixed issue with reorg.

This commit is contained in:
Chris Kleeschulte 2017-09-11 15:41:27 -04:00
parent daa89f3086
commit 6a18c1e46e
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
9 changed files with 193 additions and 158 deletions

View File

@ -60,7 +60,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
return callback(err);
}
var txList = _.flatten(txLists);
var txList = _.flattenDeep(txLists);
var results = {
totalCount: txList.length,
@ -393,32 +393,65 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
};
AddressService.prototype._removeBlock = function(block, callback) {
var self = this;
async.eachSeries(block.txs, function(tx, next) {
async.mapSeries(block.txs, function(tx, next) {
self._removeTx(tx, block, next);
}, callback);
};
AddressService.prototype._removeTx = function(tx, block, callback) {
var self = this;
var operations = [];
async.parallelLimit([
function(next) {
async.eachOfSeries(tx.inputs, function(input, indext, next) {
self._removeInput(input, tx, block, index, next);
self._removeInput(input, tx, block, index, function(err, ops) {
if(err) {
return next(err);
}
operations = operations.concat(ops);
next();
});
}, next);
},
function(next) {
async.eachOfSeries(tx.outputs, function(output, index, next) {
self._removeOutput(output, tx, block, index, next);
self._removeOutput(output, tx, block, index, function(err, ops) {
if(err) {
return next(err);
}
operations = operations.concat(ops);
next();
});
}, next);
}
], 4, callback);
], 4, function(err) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
AddressService.prototype._removeInput = function(input, tx, block, index, callback) {
var self = this;
var address = input.getAddress();
var removalOps = [];
if (!address) {
@ -428,9 +461,11 @@ AddressService.prototype._removeInput = function(input, tx, block, index, callba
address.network = self._network;
address = address.toString();
assert(block && block.__ts && block.__height, 'Missing block or block values.');
removalOps.push({
type: 'del',
key: self._encoding.encodeAddressIndexKey(address, block.height, tx.txid(), index, 1, block.ts)
key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 1, block.__ts)
});
// look up prev output of this input and put it back in the set of utxos
@ -441,14 +476,15 @@ AddressService.prototype._removeInput = function(input, tx, block, index, callba
}
assert(_tx, 'Missing prev tx to insert back into the utxo set when reorging address index.');
assert(_tx.__height && _tx.__inputValues && _tx.__timestamp, 'Missing tx values.');
removalOps.push({
type: 'put',
key: self._encoding.encodeUtxoIndexKey(address, _tx.txid(), input.prevout.index),
value: self._encoding.encodeUtxoIndexValue(
_tx.height,
_tx.__height,
_tx.__inputValues[input.prevout.index],
_tx.timestamp, _tx.outputs[input.prevout.index].script.toRaw())
_tx.__timestamp, _tx.outputs[input.prevout.index].script.toRaw())
});
callback(null, removalOps);
@ -469,9 +505,11 @@ AddressService.prototype._removeOutput = function(output, tx, block, index, call
address.network = self._network;
address = address.toString();
assert(block && block.__ts && block.__height, 'Missing block or block values.');
removalOps.push({
type: 'del',
key: self._encoding.encodeAddressIndexKey(address, block.height, tx.txid(), index, 0, block.ts)
key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 0, block.__ts)
});
//remove the utxo for this output from the collection
@ -491,13 +529,14 @@ AddressService.prototype.onReorg = function(args, callback) {
// for every tx, remove the address index key for every input and output
// for every input record, we need to find its previous output and put it back into the utxo collection
async.eachSeries(oldBlockList, self._removeBlock.bind(self), function(err, ops) {
async.mapSeries(oldBlockList, self._removeBlock.bind(self), function(err, ops) {
if (err) {
return callback(err);
}
callback(null, _.compact(_.flatten(ops)));
var operations = _.compact(_.flattenDeep(ops));
callback(null, operations);
});
};
@ -517,7 +556,7 @@ AddressService.prototype.onBlock = function(block, callback) {
operations.push(ops);
}
operations = _.flatten(operations);
operations = _.flattenDeep(operations);
callback(null, operations);
};
@ -539,7 +578,7 @@ AddressService.prototype._processInput = function(tx, input, index, opts) {
assert(timestamp, 'Must have a timestamp in order to process input.');
// address index
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.height, txid, index, 1, timestamp);
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 1, timestamp);
var operations = [{
type: 'put',
@ -573,11 +612,11 @@ AddressService.prototype._processOutput = function(tx, output, index, opts) {
assert(timestamp, 'Must have a timestamp in order to process output.');
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.height, txid, index, 0, timestamp);
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 0, timestamp);
var utxoKey = this._encoding.encodeUtxoIndexKey(address, txid, index);
var utxoValue = this._encoding.encodeUtxoIndexValue(
opts.block.height,
opts.block.__height,
output.value,
timestamp,
output.script.toRaw()
@ -608,7 +647,7 @@ AddressService.prototype._processTransaction = function(tx, opts) {
return self._processOutput(tx, output, index, _opts);
});
outputOperations = _.flatten(_.compact(outputOperations));
outputOperations = _.compact(_.flattenDeep(outputOperations));
assert(outputOperations.length % 2 === 0 &&
outputOperations.length <= tx.outputs.length * 2,
'Output operations count is not reflective of what should be possible.');
@ -617,7 +656,7 @@ AddressService.prototype._processTransaction = function(tx, opts) {
return self._processInput(tx, input, index, _opts);
});
inputOperations = _.flatten(_.compact(inputOperations));
inputOperations = _.compact(_.flattenDeep(inputOperations));
assert(inputOperations.length % 2 === 0 &&
inputOperations.length <= tx.inputs.length * 2,

View File

@ -165,6 +165,36 @@ BlockService.prototype.getRawBlock = function(hash, callback) {
});
};
BlockService.prototype._checkTip = function(callback) {
// check to see if our own tip is no longer in the main chain
// (there was a reorg while we were shut down)
var self = this;
self._header.getBlockHeader(self._tip.height, function(err, header) {
if (err || !header) {
return callback(err || new Error('Header at height: ' + self._tip.height + ' was not found.'));
}
if (header.hash === self._tip.hash) {
return callback();
}
// means the header service no longer tracks our tip on the main chain
// so we should consider ourselves in a reorg situation
self._header.getAllHeaders(function(err, headers) {
if(err || !headers) {
return callback(err || new Error('All headers not found.'));
}
self._handleReorg(header.hash, headers, callback);
});
});
};
BlockService.prototype.start = function(callback) {
var self = this;
@ -247,7 +277,7 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback
var self = this;
var count = 0;
var _oldTip = this._tip.hash;
var _oldTip = self._tip.hash;
var _newTip = hash;
var oldBlocks = [];
@ -272,13 +302,11 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback
// once we've found the old tip, we will find its prev and check to see if matches new tip's prev
var block = self._encoding.decodeBlockValue(data);
// for the purposes of building the old blocks list
// apply the block's height
var blockHdr = allHeaders.get(block.rhash());
if (!blockHdr) {
return next(new Error('Could not find block in list of headers: ' + block.rhash()));
}
block.height = blockHdr.height;
assert(block.height >= 0, 'We mamaged to save a header with an incorrect height.');
block.__height = self._tip.height;
// apply the block's timestamp
self._timestamp.getTimestamp(block.rhash(), function(err, timestamp) {
@ -287,23 +315,21 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback
return next(err || new Error('missing timestamp'));
}
block.ts = timestamp;
block.__ts = timestamp;
// we will squirrel away the block because our services will need to remove it after we've found the common ancestor
oldBlocks.push(block);
// this is our current tip's prev hash
_oldTip = bcoin.util.revHex(block.prevBlock);
// our current headers have the correct state of the chain, so consult that for its prev aash
// our current headers have the correct state of the chain, so consult that for its prev hash
var header = allHeaders.get(_newTip);
if (!header) {
return next(new Error('Header missing from list of headers'));
}
assert(header, 'The expected hash is not in the headers list.');
// set new tip to the prev hash
_newTip = header.prevHash;
next();
});
@ -315,8 +341,9 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback
return callback(err);
}
var commonAncestorHash = _newTip;
callback(null, commonAncestorHash, oldBlocks);
assert(_newTip === _oldTip, 'A common ancestor hash could not be found.');
callback(null, _newTip, oldBlocks);
});
};
@ -364,7 +391,7 @@ BlockService.prototype._getHash = function(blockArg, callback) {
};
BlockService.prototype._handleReorg = function(hash, allHeaders, block) {
BlockService.prototype._handleReorg = function(hash, allHeaders, callback) {
// hash is the hash of the new block that we are reorging to.
assert(hash, 'We were asked to reorg to a non-existent hash.');
@ -379,19 +406,18 @@ BlockService.prototype._handleReorg = function(hash, allHeaders, block) {
if (err) {
log.error('Block Service: A common ancestor block between hash: ' +
self._tip.hash + ' (our current tip) and: ' + hash +
' (the forked block) could not be found. Bitcore-node must exit.');
return callback(err);
self.node.stop();
return;
}
var commonAncestorHeader = allHeaders.get(commonAncestorHash);
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash);
assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.');
self._processReorg(commonAncestorHeader, oldBlocks, block);
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash +
' at height: ' + commonAncestorHeader.height + '. Removing: ' + oldBlocks.length + ' block(s) from our indexes.');
self._processReorg(commonAncestorHeader, oldBlocks, callback);
});
@ -399,7 +425,7 @@ BlockService.prototype._handleReorg = function(hash, allHeaders, block) {
};
// this JUST rewinds the chain back to the common ancestor block, nothing more
BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, newBlock) {
BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, callback) {
// set the tip to the common ancestor in case something goes wrong with the reorg
var self = this;
@ -420,23 +446,38 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, n
});
});
self._db.batch(removalOps, function() {
self._db.batch(removalOps, function(err) {
self._reorging = false;
if(err) {
return callback(err);
}
self._reorging = false;
self._p2p.clearInventoryCache();
callback();
if (newBlock) {
self._onBlock(newBlock);
}
});
};
BlockService.prototype._onAllHeaders = function() {
this._startSync();
// once the header service has all of its headers, we know we can check our
// own tip for consistency and make sure our it is on the mainchain
var self = this;
self._checkTip(function(err) {
if(err) {
log.error(err);
return self.node.stop();
}
self._startSync();
});
};
BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, newBlock) {
BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, callback) {
var self = this;
var operations = [];
@ -463,22 +504,16 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks,
function(err) {
if (err) {
if (!self.node.stopping) {
log.error('Block Service: ' + err);
self.node.stop();
}
return;
return callback(err);
}
self._db.batch(operations, function(err) {
if (err && !self.node.stopping) {
log.error('Block Service: ' + err);
self.node.stop();
if (err) {
return callback(err);
}
self._onReorg(commonAncestorHeader, oldBlocks, newBlock);
self._onReorg(commonAncestorHeader, oldBlocks, callback);
});
}
@ -568,7 +603,6 @@ BlockService.prototype._onBlock = function(block) {
return;
}
// this will prevent the block service from hammering us with blocks
// before we are ready
// this will be turned to false once all the blocks have been processed
@ -581,7 +615,7 @@ BlockService.prototype._onBlock = function(block) {
return;
}
log.debug('Block Service: new block: ' + block.rhash());
block.height = this._tip.height + 1;
block.__height = this._tip.height + 1;
this._processBlock(block);
};
@ -589,12 +623,26 @@ BlockService.prototype._onBlock = function(block) {
BlockService.prototype._setListeners = function() {
var self = this;
self._header.once('headers', self._onAllHeaders.bind(self));
self._header.on('reorg', function(hash, headers, block) {
self._header.on('reorg', function(hash, headers) {
if (!self._reorging && !this._initialSync) {
log.debug('Block Service: detected a reorg from the header service.');
self._handleReorg(hash, headers, block);
self._handleReorg(hash, headers, function(err) {
if(err) {
return callback(err);
}
self._startSync();
});
}
});
};

View File

@ -139,10 +139,6 @@ HeaderService.prototype.start = function(callback) {
function(tip, next) {
self._tip = tip;
log.debug('Header Service: original tip height is: ' + self._tip.height);
log.debug('Header Service: original tip hash is: ' + self._tip.hash);
self._originalTip = { height: self._tip.height, hash: self._tip.hash };
if (self._tip.height === 0) {
@ -469,29 +465,7 @@ HeaderService.prototype._onHeadersSave = function(err) {
self._setBestHeader();
self._detectStartupReorg(function(err, reorg) {
if (err) {
log.error(err);
self.node.stop();
return;
}
if (reorg) {
return self._handleReorg(null, null, function(err) {
if (err) {
log.error(err);
this.node.stop();
return;
}
});
}
log.debug('Header Service: emitting headers to block service.');
self.emit('headers');
});
self.emit('headers');
};
@ -594,30 +568,6 @@ HeaderService.prototype._detectReorg = function(block, callback) {
};
HeaderService.prototype._detectStartupReorg = function(callback) {
var self = this;
self._getHeader(self._originalTip.height, null, function(err, header) {
if (err) {
return callback(err);
}
if (!header) {
return callback(null, true);
}
if (header.hash !== self._originalTip.hash) {
return callback(null, true);
}
callback(null, false);
});
};
HeaderService.prototype._handleReorg = function(block, header, callback) {
var self = this;
@ -628,35 +578,19 @@ HeaderService.prototype._handleReorg = function(block, header, callback) {
return callback(err || new Error('Missing headers'));
}
var originalTipHeader = headers.getIndex(self._originalTip.height);
var hash = block.rhash();
headers.set(hash, header); // appends to the end
assert(header, 'Atempted to get reorg header for the original tip: ' + self._originalTip.hash +
' at height: ' + self._originalTip.height + ' but could not find in the database.');
// this will ensure our own headers collection is correct
self._onReorg(block, headers, function(err) {
var hash = originalTipHeader.hash;
if (err) {
return callback(err);
}
// reorg from new blocks
if (block && header) {
hash = block.rhash();
headers.set(hash, header); // appends to the end
// this will ensure our own headers collection is correct
return self._onReorg(block, headers, function(err) {
if (err) {
return callback(err);
}
self.emit('reorg', hash, headers, block);
return callback();
});
}
// reorg from startup
assert(hash, 'To reorg, we need a hash to reorg to.');
self.emit('reorg', hash, headers);
callback();
self.emit('reorg', hash, headers);
return callback();
});
});

View File

@ -131,7 +131,7 @@ TimestampService.prototype.onReorg = function(args, callback) {
removalOps.concat([
{
type: 'del',
key: self._encoding.encodeTimestampBlockKey(block.ts),
key: self._encoding.encodeTimestampBlockKey(block.__ts),
},
{
type: 'del',

View File

@ -205,6 +205,7 @@ TransactionService.prototype._getTransaction = function(txid, options, callback)
tx = self._encoding.decodeTransactionValue(tx);
callback(null, txid, tx, options);
});
};
@ -345,7 +346,7 @@ TransactionService.prototype._processTransaction = function(tx, opts, callback)
assert(tx.__timestamp, 'Timestamp is required when saving a transaction.');
// height
tx.__height = opts.block.height;
tx.__height = opts.block.__height;
assert(tx.__height, 'Block height is required when saving a trasnaction.');
callback(null, {

14
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "bitcore-node",
"version": "5.0.0-beta.7",
"version": "5.0.0-beta.8",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@ -2034,6 +2034,11 @@
"sntp": "1.0.9"
}
},
"he": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/he/-/he-1.1.1.tgz",
"integrity": "sha1-k0EP0hsAlzUVH4howvJx80J+I/0="
},
"hmac-drbg": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/hmac-drbg/-/hmac-drbg-1.0.1.tgz",
@ -2901,9 +2906,9 @@
}
},
"mocha": {
"version": "3.5.0",
"resolved": "https://registry.npmjs.org/mocha/-/mocha-3.5.0.tgz",
"integrity": "sha512-pIU2PJjrPYvYRqVpjXzj76qltO9uBYI7woYAMoxbSefsa+vqAfptjoeevd6bUgwD0mPIO+hv9f7ltvsNreL2PA==",
"version": "3.5.1",
"resolved": "https://registry.npmjs.org/mocha/-/mocha-3.5.1.tgz",
"integrity": "sha512-2jD6NS4PNKVDpaICERx8vEkXaisx2MlRKxj5KuFJVZJdK1zRGs/HnS3OeH7zXhXAbGlzaMIan4Kwpm4O5hORnA==",
"requires": {
"browser-stdout": "1.3.0",
"commander": "2.9.0",
@ -2912,6 +2917,7 @@
"escape-string-regexp": "1.0.5",
"glob": "7.1.1",
"growl": "1.9.2",
"he": "1.1.1",
"json3": "3.3.2",
"lodash.create": "3.1.1",
"mkdirp": "0.5.1",

View File

@ -5,7 +5,7 @@
"node": ">=8.0.0"
},
"author": "BitPay <dev@bitpay.com>",
"version": "5.0.0-beta.7",
"version": "5.0.0-beta.8",
"main": "./index.js",
"repository": "git://github.com/bitpay/bitcore-node.git",
"homepage": "https://github.com/bitpay/bitcore-node",

View File

@ -228,11 +228,14 @@ describe('Address Service', function() {
it('should reorg when there is nothing to reorg', function(done ) {
var commonAncestorHeader = bcoin.block.fromRaw(blocks[5], 'hex').toHeaders().toJSON();
var oldBlocks = [bcoin.block.fromRaw(blocks[6], 'hex')];
var block = bcoin.block.fromRaw(blocks[6], 'hex');
block.__ts = 55555;
block.__height = 999;
var oldBlocks = [block];
addressService.onReorg([commonAncestorHeader, oldBlocks], function(err, ops) {
expect(ops.length).to.equal(0);
expect(ops.length).to.equal(2);
done();
});

View File

@ -49,23 +49,27 @@ describe('Block Service', function() {
it('should find the common ancestor between the current chain and the new chain', function(done) {
blockService._tip = { hash: block2.rhash(), height: 70901 };
var data = blockService._encoding.encodeBlockValue(block2);
blockService._db = { get: sandbox.stub().callsArgWith(1, null, data) };
blockService._timestamp = { getTimestamp: sandbox.stub().callsArgWith(1, null, 1234) };
var commonAncestorHash = bcoin.util.revHex(block2.prevBlock);
var allHeaders = { get: sandbox.stub().returns({ prevHash: commonAncestorHash }), size: 1e6 };
var encodedData = blockService._encoding.encodeBlockValue(block2);
blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) {
var get = sandbox.stub().callsArgWith(1, null, encodedData);
var headers = { get: sandbox.stub().returns({ prevHash: block1.rhash() }) };
blockService._db = { get: get };
blockService._findCommonAncestor('aa', headers, function(err, common, oldBlocks) {
if (err) {
if(err) {
return done(err);
}
expect(common).to.equal('aa');
expect(oldBlocks).to.deep.equal([]);
block2.__ts = 1234;
block2.__height = 70901;
expect(commonAncestorHashActual).to.equal(commonAncestorHash);
expect(oldBlockList).to.deep.equal([block2]);
done();
});
});
});
describe('#getBestBlockHash', function() {