indexer: fix reset and reorg handling

This commit is contained in:
Braydon Fuller 2019-04-19 14:08:27 -07:00
parent 865f7401ba
commit efb2551555
No known key found for this signature in database
GPG Key ID: F24F232D108B3AD4
7 changed files with 292 additions and 136 deletions

View File

@ -1392,10 +1392,6 @@ class ChainDB {
await this.commit();
// Remove undo data _after_ successful commit.
if (this.blocks)
await this.blocks.pruneUndo(entry.hash);
return view;
}
@ -1512,12 +1508,6 @@ class ChainDB {
await this.commit();
// Remove block and undo data _after_ successful commit.
if (this.blocks) {
await this.blocks.pruneUndo(tip.hash);
await this.blocks.prune(tip.hash);
}
// Update caches _after_ successful commit.
this.cacheHeight.remove(tip.height);
this.cacheHash.remove(tip.hash);
@ -1541,23 +1531,15 @@ class ChainDB {
// one giant atomic write!
this.start();
let hashes = [];
try {
for (const tip of tips)
hashes = hashes.concat(await this._removeChain(tip));
await this._removeChain(tip);
} catch (e) {
this.drop();
throw e;
}
await this.commit();
// SPV doesn't store blocks.
if (this.blocks) {
for (const hash of hashes)
await this.blocks.prune(hash);
}
}
/**
@ -1575,8 +1557,6 @@ class ChainDB {
this.logger.debug('Removing alternate chain: %h.', tip.hash);
const hashes = [];
for (;;) {
if (await this.isMainChain(tip))
break;
@ -1588,10 +1568,6 @@ class ChainDB {
this.del(layout.h.encode(tip.hash));
this.del(layout.e.encode(tip.hash));
// Queue block to be pruned on
// successful write.
hashes.push(tip.hash);
// Queue up hash to be removed
// on successful write.
this.cacheHash.unpush(tip.hash);
@ -1599,8 +1575,6 @@ class ChainDB {
tip = await this.getPrevious(tip);
assert(tip);
}
return hashes;
}
/**

View File

@ -125,13 +125,13 @@ class AddrIndexer extends Indexer {
/**
* Index transactions by address.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
*/
async indexBlock(entry, block, view) {
const height = entry.height;
async indexBlock(meta, block, view) {
const height = meta.height;
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];
@ -163,13 +163,13 @@ class AddrIndexer extends Indexer {
/**
* Remove addresses from index.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
*/
async unindexBlock(entry, block, view) {
const height = entry.height;
async unindexBlock(meta, block, view) {
const height = meta.height;
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];

View File

@ -166,29 +166,19 @@ class Indexer extends EventEmitter {
*/
bind() {
this.chain.on('connect', async (entry, block, view) => {
try {
await this.sync(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
const listener = async (entry, block, view) => {
const meta = new BlockMeta(entry.hash, entry.height);
this.chain.on('disconnect', async (entry, block, view) => {
try {
await this.sync(entry, block, view);
await this.sync(meta, block, view);
} catch (e) {
this.emit('error', e);
}
});
};
this.chain.on('reset', async (tip) => {
try {
await this.sync(tip);
} catch (e) {
this.emit('error', e);
}
});
this.chain.on('connect', listener);
this.chain.on('disconnect', listener);
this.chain.on('reset', listener);
}
/**
@ -226,26 +216,31 @@ class Indexer extends EventEmitter {
/**
* Sync with the chain.
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async sync(entry, block, view) {
async sync(meta, block, view) {
if (this.syncing)
return;
this.syncing = true;
const connected = await this._syncBlock(entry, block, view);
const connected = await this._syncBlock(meta, block, view);
if (connected) {
this.syncing = false;
} else {
(async () => {
await this._syncChain(entry);
this.syncing = false;
try {
await this._syncChain();
} catch (e) {
this.emit('error', e);
} finally {
this.syncing = false;
}
})();
}
}
@ -253,23 +248,23 @@ class Indexer extends EventEmitter {
/**
* Sync with the chain with a block.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async _syncBlock(entry, block, view) {
async _syncBlock(meta, block, view) {
// In the case that the next block is being
// connected or the current block disconnected
// use the block and view being passed directly,
// instead of reading that information again.
if (entry && block && view) {
if (entry.height === this.height + 1) {
await this._addBlock(entry, block, view);
if (meta && block && view) {
if (meta.height === this.height + 1) {
await this._addBlock(meta, block, view);
return true;
} else if (entry.height === this.height) {
await this._removeBlock(entry, block, view);
} else if (meta.height === this.height) {
await this._removeBlock(meta, block, view);
return true;
}
}
@ -279,11 +274,10 @@ class Indexer extends EventEmitter {
/**
* Sync with the chain.
* @private
* @param {ChainEntry} entry
* @returns {Promise}
*/
async _syncChain(entry) {
async _syncChain() {
let height = this.height;
// In the case that the indexer has never
@ -297,23 +291,16 @@ class Indexer extends EventEmitter {
// leave chain in a different state.
// Scan chain backwards until we
// find a common height.
for (;;) {
const tip = await this.getBlockMeta(height);
assert(tip);
while (height > 0) {
const meta = await this.getBlockMeta(height);
assert(meta);
if (await this.getEntry(tip.hash))
if (await this.getEntry(meta.hash))
break;
assert(height !== 0);
height -= 1;
}
// In the case that the chain is reset
// the entry will be less than the
// current height.
if (entry && entry.height < height)
height = entry.height;
if (height < this.height) {
await this._rollback(height);
await this._rollforward();
@ -329,20 +316,22 @@ class Indexer extends EventEmitter {
*/
async _rollforward() {
this.logger.info('Indexing to best height.');
this.logger.info('Indexing to best height from height (%d).', this.height);
for (let i = this.height + 1; ; i++) {
const entry = await this.getEntry(i);
for (let height = this.height + 1; ; height++) {
const entry = await this.getEntry(height);
if (!entry)
break;
const meta = new BlockMeta(entry.hash, height);
const block = await this.chain.getBlock(entry.hash);
assert(block);
const view = await this.chain.getBlockView(block);
assert(view);
await this._addBlock(entry, block, view);
await this._addBlock(meta, block, view);
}
}
@ -362,50 +351,46 @@ class Indexer extends EventEmitter {
this.logger.info('Rolling back to height %d.', height);
while (this.height > height) {
const tip = await this.getBlockMeta(this.height);
assert(tip);
while (this.height > height && this.height > 1) {
const meta = await this.getBlockMeta(this.height);
assert(meta);
const entry = await this.chain.getEntry(tip.hash);
assert(entry);
const block = await this.chain.getBlock(entry.hash);
const block = await this.chain.getBlock(meta.hash);
assert(block);
const view = await this.chain.getBlockView(block);
assert(view);
await this._removeBlock(entry, block, view);
await this._removeBlock(meta, block, view);
}
}
/**
* Add a block's transactions without a lock.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async _addBlock(entry, block, view) {
async _addBlock(meta, block, view) {
assert(block.hasRaw(), 'Expected raw data for block.');
const start = util.bench();
if (entry.height !== this.height + 1)
if (meta.height !== this.height + 1)
throw new Error('Indexer: Can not add block.');
const tip = new BlockMeta(entry.hash, entry.height);
// Start the batch write.
this.start();
// Call the implemented indexer to add to
// the batch write.
await this.indexBlock(entry, block, view);
await this.indexBlock(meta, block, view);
// Sync the height to the new tip.
const height = await this._setTip(tip);
const height = await this._setTip(meta);
// Commit the write batch to disk.
await this.commit();
@ -414,56 +399,58 @@ class Indexer extends EventEmitter {
this.height = height;
// Log the current indexer status.
this.logStatus(start, block, entry);
this.logStatus(start, block, meta);
}
/**
* Process block indexing
* Indexers will implement this method to process the block for indexing
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async indexBlock(entry, block, view) {
async indexBlock(meta, block, view) {
;
}
/**
* Undo block indexing
* Indexers will implement this method to undo indexing for the block
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async unindexBlock(entry, block, view) {
async unindexBlock(meta, block, view) {
;
}
/**
* Unconfirm a block's transactions.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async _removeBlock(entry, block, view) {
async _removeBlock(meta, block, view) {
const start = util.bench();
if (entry.height !== this.height)
if (meta.height !== this.height)
throw new Error('Indexer: Can not remove block.');
const tip = new BlockMeta(entry.hash, entry.height);
// Start the batch write.
this.start();
// Call the implemented indexer to add to
// the batch write.
await this.unindexBlock(entry, block, view);
await this.unindexBlock(meta, block, view);
const prev = await this.getBlockMeta(tip.height - 1);
const prev = await this.getBlockMeta(meta.height - 1);
assert(prev);
// Sync the height to the previous tip.
@ -476,7 +463,7 @@ class Indexer extends EventEmitter {
this.height = height;
// Log the current indexer status.
this.logStatus(start, block, entry);
this.logStatus(start, block, meta, true);
}
/**
@ -485,21 +472,21 @@ class Indexer extends EventEmitter {
* @returns {Promise}
*/
async _setTip(tip) {
if (tip.height < this.height) {
assert(tip.height === this.height - 1);
async _setTip(meta) {
if (meta.height < this.height) {
assert(meta.height === this.height - 1);
this.del(layout.h.encode(this.height));
} else if (tip.height > this.height) {
assert(tip.height === this.height + 1);
} else if (meta.height > this.height) {
assert(meta.height === this.height + 1);
}
// Add to batch write to save tip and height.
this.put(layout.h.encode(tip.height), tip.hash);
this.put(layout.h.encode(meta.height), meta.hash);
const raw = bio.write(4).writeU32(tip.height).render();
const raw = bio.write(4).writeU32(meta.height).render();
this.put(layout.R.encode(), raw);
return tip.height;
return meta.height;
}
/**
@ -523,18 +510,22 @@ class Indexer extends EventEmitter {
* @private
* @param {Array} start
* @param {Block} block
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Boolean} reverse
*/
logStatus(start, block, entry) {
logStatus(start, block, meta, reverse) {
if (!this.isSlow())
return;
const elapsed = util.bench(start);
const msg = reverse ? 'removed from' : 'added to';
this.logger.info(
'Block (%d) added to indexer (txs=%d time=%d).',
entry.height,
'Block (%d) %s indexer (txs=%d time=%d).',
meta.height,
msg,
block.txs.length,
elapsed);
}

View File

@ -180,18 +180,18 @@ class TXIndexer extends Indexer {
/**
* Index transactions by txid.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
*/
async indexBlock(entry, block, view) {
async indexBlock(meta, block, view) {
const brecord = new BlockRecord({
block: entry.hash,
time: entry.time
block: meta.hash,
time: block.time
});
this.put(layout.b.encode(entry.height), brecord.toRaw());
this.put(layout.b.encode(meta.height), brecord.toRaw());
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];
@ -200,7 +200,7 @@ class TXIndexer extends Indexer {
const {offset, size} = tx.getPosition();
const txrecord = new TxRecord({
height: entry.height,
height: meta.height,
index: i,
offset: offset,
length: size
@ -213,13 +213,13 @@ class TXIndexer extends Indexer {
/**
* Remove transactions from index.
* @private
* @param {ChainEntry} entry
* @param {BlockMeta} meta
* @param {Block} block
* @param {CoinView} view
*/
async unindexBlock(entry, block, view) {
this.del(layout.b.encode(entry.height));
async unindexBlock(meta, block, view) {
this.del(layout.b.encode(meta.height));
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];

View File

@ -643,11 +643,9 @@ class Pool extends EventEmitter {
*/
startSync() {
if (!this.opened)
if (!this.opened || !this.connected)
return;
assert(this.connected, 'Pool is not connected!');
this.syncing = true;
this.resync(false);
}
@ -657,11 +655,9 @@ class Pool extends EventEmitter {
*/
forceSync() {
if (!this.opened)
if (!this.opened || !this.connected)
return;
assert(this.connected, 'Pool is not connected!');
this.resync(true);
}

View File

@ -420,7 +420,7 @@ describe('Indexer', function() {
assert(node.txindex);
assert.equal(node.txindex.height, 0);
node.txindex.sync();
node.startSync();
await forValue(node.txindex, 'height', 150);
} finally {
@ -428,6 +428,201 @@ describe('Indexer', function() {
await node.close();
}
});
it('will sync if disabled during reorganization', async () => {
let node, nclient, wclient = null;
try {
// Generate initial set of blocks that are are spending
// coins and therefore data in undo blocks.
node = new FullNode({
prefix: prefix,
network: 'regtest',
apiKey: 'foo',
memory: false,
indexTX: true,
indexAddress: false,
port: ports.p2p,
httpPort: ports.node,
plugins: [require('../lib/wallet/plugin')],
env: {
'BCOIN_WALLET_HTTP_PORT': ports.wallet.toString()
},
logLevel: 'none'
});
await node.ensure();
await node.open();
nclient = new NodeClient({
port: ports.node,
apiKey: 'foo',
timeout: 120000
});
await nclient.open();
wclient = new WalletClient({
port: ports.wallet,
apiKey: 'foo',
timeout: 120000
});
await wclient.open();
const coinbase = await wclient.execute(
'getnewaddress', ['default']);
const blocks = await nclient.execute(
'generatetoaddress', [150, coinbase]);
assert.equal(blocks.length, 150);
for (let i = 0; i < 10; i++) {
for (const v of vectors)
await wclient.execute('sendtoaddress', [v.addr, v.amount]);
const blocks = await nclient.execute(
'generatetoaddress', [1, coinbase]);
assert.equal(blocks.length, 1);
}
await forValue(node.chain, 'height', 160);
await forValue(node.txindex, 'height', 160);
} finally {
if (wclient)
await wclient.close();
if (nclient)
await nclient.close();
if (node)
await node.close();
}
try {
// Now create a reorganization in the chain while
// the indexer is disabled.
node = new FullNode({
prefix: prefix,
network: 'regtest',
apiKey: 'foo',
memory: false,
indexTX: false,
indexAddress: false,
port: ports.p2p,
httpPort: ports.node,
logLevel: 'none'
});
await node.ensure();
await node.open();
nclient = new NodeClient({
port: ports.node,
apiKey: 'foo',
timeout: 120000
});
await nclient.open();
for (let i = 0; i < 10; i++) {
const hash = await nclient.execute('getbestblockhash');
await nclient.execute('invalidateblock', [hash]);
}
await forValue(node.chain, 'height', 150);
const blocks = await nclient.execute(
'generatetoaddress', [20, vectors[0].addr]);
assert.equal(blocks.length, 20);
await forValue(node.chain, 'height', 170);
} finally {
if (nclient)
await nclient.close();
if (node)
await node.close();
}
try {
// Now turn the indexer back on and check that it
// is able to disconnect blocks and add the new blocks.
node = new FullNode({
prefix: prefix,
network: 'regtest',
apiKey: 'foo',
memory: false,
indexTX: true,
indexAddress: false,
port: ports.p2p,
httpPort: ports.node,
logLevel: 'none'
});
await node.ensure();
await node.open();
assert(node.txindex);
assert.equal(node.txindex.height, 160);
node.txindex.sync();
await forValue(node.txindex, 'height', 170, 5000);
} finally {
if (node)
await node.close();
}
});
it('will reset indexes', async () => {
let node, nclient = null;
try {
node = new FullNode({
prefix: prefix,
network: 'regtest',
apiKey: 'foo',
memory: false,
indexTX: true,
indexAddress: false,
port: ports.p2p,
httpPort: ports.node,
logLevel: 'none'
});
await node.ensure();
await node.open();
nclient = new NodeClient({
port: ports.node,
apiKey: 'foo',
timeout: 120000
});
await nclient.open();
const blocks = await nclient.execute(
'generatetoaddress', [150, vectors[0].addr]);
assert.equal(blocks.length, 150);
await forValue(node.txindex, 'height', 150);
await node.chain.reset(0);
await forValue(node.txindex, 'height', 1);
} finally {
if (nclient)
await nclient.close();
if (node)
await node.close();
}
});
});
});

View File

@ -102,7 +102,7 @@ common.rimraf = async function(p) {
return await fs.rimraf(p);
};
common.forValue = async function(obj, key, val, timeout = 60000) {
common.forValue = async function(obj, key, val, timeout = 30000) {
assert(typeof obj === 'object');
assert(typeof key === 'string');