diff --git a/lib/blockchain/chaindb.js b/lib/blockchain/chaindb.js index 2ca1448a..184afbf1 100644 --- a/lib/blockchain/chaindb.js +++ b/lib/blockchain/chaindb.js @@ -1392,10 +1392,6 @@ class ChainDB { await this.commit(); - // Remove undo data _after_ successful commit. - if (this.blocks) - await this.blocks.pruneUndo(entry.hash); - return view; } @@ -1512,12 +1508,6 @@ class ChainDB { await this.commit(); - // Remove block and undo data _after_ successful commit. - if (this.blocks) { - await this.blocks.pruneUndo(tip.hash); - await this.blocks.prune(tip.hash); - } - // Update caches _after_ successful commit. this.cacheHeight.remove(tip.height); this.cacheHash.remove(tip.hash); @@ -1541,23 +1531,15 @@ class ChainDB { // one giant atomic write! this.start(); - let hashes = []; - try { for (const tip of tips) - hashes = hashes.concat(await this._removeChain(tip)); + await this._removeChain(tip); } catch (e) { this.drop(); throw e; } await this.commit(); - - // SPV doesn't store blocks. - if (this.blocks) { - for (const hash of hashes) - await this.blocks.prune(hash); - } } /** @@ -1575,8 +1557,6 @@ class ChainDB { this.logger.debug('Removing alternate chain: %h.', tip.hash); - const hashes = []; - for (;;) { if (await this.isMainChain(tip)) break; @@ -1588,10 +1568,6 @@ class ChainDB { this.del(layout.h.encode(tip.hash)); this.del(layout.e.encode(tip.hash)); - // Queue block to be pruned on - // successful write. - hashes.push(tip.hash); - // Queue up hash to be removed // on successful write. this.cacheHash.unpush(tip.hash); @@ -1599,8 +1575,6 @@ class ChainDB { tip = await this.getPrevious(tip); assert(tip); } - - return hashes; } /** diff --git a/lib/indexer/addrindexer.js b/lib/indexer/addrindexer.js index 192581ef..cdcd78dd 100644 --- a/lib/indexer/addrindexer.js +++ b/lib/indexer/addrindexer.js @@ -125,13 +125,13 @@ class AddrIndexer extends Indexer { /** * Index transactions by address. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view */ - async indexBlock(entry, block, view) { - const height = entry.height; + async indexBlock(meta, block, view) { + const height = meta.height; for (let i = 0; i < block.txs.length; i++) { const tx = block.txs[i]; @@ -163,13 +163,13 @@ class AddrIndexer extends Indexer { /** * Remove addresses from index. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view */ - async unindexBlock(entry, block, view) { - const height = entry.height; + async unindexBlock(meta, block, view) { + const height = meta.height; for (let i = 0; i < block.txs.length; i++) { const tx = block.txs[i]; diff --git a/lib/indexer/indexer.js b/lib/indexer/indexer.js index 7c092cf2..d16f8dc9 100644 --- a/lib/indexer/indexer.js +++ b/lib/indexer/indexer.js @@ -166,29 +166,19 @@ class Indexer extends EventEmitter { */ bind() { - this.chain.on('connect', async (entry, block, view) => { - try { - await this.sync(entry, block, view); - } catch (e) { - this.emit('error', e); - } - }); + const listener = async (entry, block, view) => { + const meta = new BlockMeta(entry.hash, entry.height); - this.chain.on('disconnect', async (entry, block, view) => { try { - await this.sync(entry, block, view); + await this.sync(meta, block, view); } catch (e) { this.emit('error', e); } - }); + }; - this.chain.on('reset', async (tip) => { - try { - await this.sync(tip); - } catch (e) { - this.emit('error', e); - } - }); + this.chain.on('connect', listener); + this.chain.on('disconnect', listener); + this.chain.on('reset', listener); } /** @@ -226,26 +216,31 @@ class Indexer extends EventEmitter { /** * Sync with the chain. - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ - async sync(entry, block, view) { + async sync(meta, block, view) { if (this.syncing) return; this.syncing = true; - const connected = await this._syncBlock(entry, block, view); + const connected = await this._syncBlock(meta, block, view); if (connected) { this.syncing = false; } else { (async () => { - await this._syncChain(entry); - this.syncing = false; + try { + await this._syncChain(); + } catch (e) { + this.emit('error', e); + } finally { + this.syncing = false; + } })(); } } @@ -253,23 +248,23 @@ class Indexer extends EventEmitter { /** * Sync with the chain with a block. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ - async _syncBlock(entry, block, view) { + async _syncBlock(meta, block, view) { // In the case that the next block is being // connected or the current block disconnected // use the block and view being passed directly, // instead of reading that information again. - if (entry && block && view) { - if (entry.height === this.height + 1) { - await this._addBlock(entry, block, view); + if (meta && block && view) { + if (meta.height === this.height + 1) { + await this._addBlock(meta, block, view); return true; - } else if (entry.height === this.height) { - await this._removeBlock(entry, block, view); + } else if (meta.height === this.height) { + await this._removeBlock(meta, block, view); return true; } } @@ -279,11 +274,10 @@ class Indexer extends EventEmitter { /** * Sync with the chain. * @private - * @param {ChainEntry} entry * @returns {Promise} */ - async _syncChain(entry) { + async _syncChain() { let height = this.height; // In the case that the indexer has never @@ -297,23 +291,16 @@ class Indexer extends EventEmitter { // leave chain in a different state. // Scan chain backwards until we // find a common height. - for (;;) { - const tip = await this.getBlockMeta(height); - assert(tip); + while (height > 0) { + const meta = await this.getBlockMeta(height); + assert(meta); - if (await this.getEntry(tip.hash)) + if (await this.getEntry(meta.hash)) break; - assert(height !== 0); height -= 1; } - // In the case that the chain is reset - // the entry will be less than the - // current height. - if (entry && entry.height < height) - height = entry.height; - if (height < this.height) { await this._rollback(height); await this._rollforward(); @@ -329,20 +316,22 @@ class Indexer extends EventEmitter { */ async _rollforward() { - this.logger.info('Indexing to best height.'); + this.logger.info('Indexing to best height from height (%d).', this.height); - for (let i = this.height + 1; ; i++) { - const entry = await this.getEntry(i); + for (let height = this.height + 1; ; height++) { + const entry = await this.getEntry(height); if (!entry) break; + const meta = new BlockMeta(entry.hash, height); + const block = await this.chain.getBlock(entry.hash); assert(block); const view = await this.chain.getBlockView(block); assert(view); - await this._addBlock(entry, block, view); + await this._addBlock(meta, block, view); } } @@ -362,50 +351,46 @@ class Indexer extends EventEmitter { this.logger.info('Rolling back to height %d.', height); - while (this.height > height) { - const tip = await this.getBlockMeta(this.height); - assert(tip); + while (this.height > height && this.height > 1) { + const meta = await this.getBlockMeta(this.height); + assert(meta); - const entry = await this.chain.getEntry(tip.hash); - assert(entry); - - const block = await this.chain.getBlock(entry.hash); + const block = await this.chain.getBlock(meta.hash); assert(block); const view = await this.chain.getBlockView(block); assert(view); - await this._removeBlock(entry, block, view); + await this._removeBlock(meta, block, view); } } /** * Add a block's transactions without a lock. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block + * @param {CoinView} view * @returns {Promise} */ - async _addBlock(entry, block, view) { + async _addBlock(meta, block, view) { assert(block.hasRaw(), 'Expected raw data for block.'); const start = util.bench(); - if (entry.height !== this.height + 1) + if (meta.height !== this.height + 1) throw new Error('Indexer: Can not add block.'); - const tip = new BlockMeta(entry.hash, entry.height); - // Start the batch write. this.start(); // Call the implemented indexer to add to // the batch write. - await this.indexBlock(entry, block, view); + await this.indexBlock(meta, block, view); // Sync the height to the new tip. - const height = await this._setTip(tip); + const height = await this._setTip(meta); // Commit the write batch to disk. await this.commit(); @@ -414,56 +399,58 @@ class Indexer extends EventEmitter { this.height = height; // Log the current indexer status. - this.logStatus(start, block, entry); + this.logStatus(start, block, meta); } /** * Process block indexing * Indexers will implement this method to process the block for indexing - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block + * @param {CoinView} view * @returns {Promise} */ - async indexBlock(entry, block, view) { + async indexBlock(meta, block, view) { ; } /** * Undo block indexing * Indexers will implement this method to undo indexing for the block - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block + * @param {CoinView} view * @returns {Promise} */ - async unindexBlock(entry, block, view) { + async unindexBlock(meta, block, view) { ; } /** * Unconfirm a block's transactions. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta + * @param {Block} block + * @param {CoinView} view * @returns {Promise} */ - async _removeBlock(entry, block, view) { + async _removeBlock(meta, block, view) { const start = util.bench(); - if (entry.height !== this.height) + if (meta.height !== this.height) throw new Error('Indexer: Can not remove block.'); - const tip = new BlockMeta(entry.hash, entry.height); - // Start the batch write. this.start(); // Call the implemented indexer to add to // the batch write. - await this.unindexBlock(entry, block, view); + await this.unindexBlock(meta, block, view); - const prev = await this.getBlockMeta(tip.height - 1); + const prev = await this.getBlockMeta(meta.height - 1); assert(prev); // Sync the height to the previous tip. @@ -476,7 +463,7 @@ class Indexer extends EventEmitter { this.height = height; // Log the current indexer status. - this.logStatus(start, block, entry); + this.logStatus(start, block, meta, true); } /** @@ -485,21 +472,21 @@ class Indexer extends EventEmitter { * @returns {Promise} */ - async _setTip(tip) { - if (tip.height < this.height) { - assert(tip.height === this.height - 1); + async _setTip(meta) { + if (meta.height < this.height) { + assert(meta.height === this.height - 1); this.del(layout.h.encode(this.height)); - } else if (tip.height > this.height) { - assert(tip.height === this.height + 1); + } else if (meta.height > this.height) { + assert(meta.height === this.height + 1); } // Add to batch write to save tip and height. - this.put(layout.h.encode(tip.height), tip.hash); + this.put(layout.h.encode(meta.height), meta.hash); - const raw = bio.write(4).writeU32(tip.height).render(); + const raw = bio.write(4).writeU32(meta.height).render(); this.put(layout.R.encode(), raw); - return tip.height; + return meta.height; } /** @@ -523,18 +510,22 @@ class Indexer extends EventEmitter { * @private * @param {Array} start * @param {Block} block - * @param {ChainEntry} entry + * @param {BlockMeta} meta + * @param {Boolean} reverse */ - logStatus(start, block, entry) { + logStatus(start, block, meta, reverse) { if (!this.isSlow()) return; const elapsed = util.bench(start); + const msg = reverse ? 'removed from' : 'added to'; + this.logger.info( - 'Block (%d) added to indexer (txs=%d time=%d).', - entry.height, + 'Block (%d) %s indexer (txs=%d time=%d).', + meta.height, + msg, block.txs.length, elapsed); } diff --git a/lib/indexer/txindexer.js b/lib/indexer/txindexer.js index 4c1d5eb0..64a5f3ac 100644 --- a/lib/indexer/txindexer.js +++ b/lib/indexer/txindexer.js @@ -180,18 +180,18 @@ class TXIndexer extends Indexer { /** * Index transactions by txid. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view */ - async indexBlock(entry, block, view) { + async indexBlock(meta, block, view) { const brecord = new BlockRecord({ - block: entry.hash, - time: entry.time + block: meta.hash, + time: block.time }); - this.put(layout.b.encode(entry.height), brecord.toRaw()); + this.put(layout.b.encode(meta.height), brecord.toRaw()); for (let i = 0; i < block.txs.length; i++) { const tx = block.txs[i]; @@ -200,7 +200,7 @@ class TXIndexer extends Indexer { const {offset, size} = tx.getPosition(); const txrecord = new TxRecord({ - height: entry.height, + height: meta.height, index: i, offset: offset, length: size @@ -213,13 +213,13 @@ class TXIndexer extends Indexer { /** * Remove transactions from index. * @private - * @param {ChainEntry} entry + * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view */ - async unindexBlock(entry, block, view) { - this.del(layout.b.encode(entry.height)); + async unindexBlock(meta, block, view) { + this.del(layout.b.encode(meta.height)); for (let i = 0; i < block.txs.length; i++) { const tx = block.txs[i]; diff --git a/lib/net/pool.js b/lib/net/pool.js index 82d31c9c..834fe8aa 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -643,11 +643,9 @@ class Pool extends EventEmitter { */ startSync() { - if (!this.opened) + if (!this.opened || !this.connected) return; - assert(this.connected, 'Pool is not connected!'); - this.syncing = true; this.resync(false); } @@ -657,11 +655,9 @@ class Pool extends EventEmitter { */ forceSync() { - if (!this.opened) + if (!this.opened || !this.connected) return; - assert(this.connected, 'Pool is not connected!'); - this.resync(true); } diff --git a/test/indexer-test.js b/test/indexer-test.js index 019faf58..021f8100 100644 --- a/test/indexer-test.js +++ b/test/indexer-test.js @@ -420,7 +420,7 @@ describe('Indexer', function() { assert(node.txindex); assert.equal(node.txindex.height, 0); - node.txindex.sync(); + node.startSync(); await forValue(node.txindex, 'height', 150); } finally { @@ -428,6 +428,201 @@ describe('Indexer', function() { await node.close(); } }); + + it('will sync if disabled during reorganization', async () => { + let node, nclient, wclient = null; + + try { + // Generate initial set of blocks that are are spending + // coins and therefore data in undo blocks. + node = new FullNode({ + prefix: prefix, + network: 'regtest', + apiKey: 'foo', + memory: false, + indexTX: true, + indexAddress: false, + port: ports.p2p, + httpPort: ports.node, + plugins: [require('../lib/wallet/plugin')], + env: { + 'BCOIN_WALLET_HTTP_PORT': ports.wallet.toString() + }, + logLevel: 'none' + }); + + await node.ensure(); + await node.open(); + + nclient = new NodeClient({ + port: ports.node, + apiKey: 'foo', + timeout: 120000 + }); + + await nclient.open(); + + wclient = new WalletClient({ + port: ports.wallet, + apiKey: 'foo', + timeout: 120000 + }); + + await wclient.open(); + + const coinbase = await wclient.execute( + 'getnewaddress', ['default']); + + const blocks = await nclient.execute( + 'generatetoaddress', [150, coinbase]); + + assert.equal(blocks.length, 150); + + for (let i = 0; i < 10; i++) { + for (const v of vectors) + await wclient.execute('sendtoaddress', [v.addr, v.amount]); + + const blocks = await nclient.execute( + 'generatetoaddress', [1, coinbase]); + + assert.equal(blocks.length, 1); + } + + await forValue(node.chain, 'height', 160); + await forValue(node.txindex, 'height', 160); + } finally { + if (wclient) + await wclient.close(); + + if (nclient) + await nclient.close(); + + if (node) + await node.close(); + } + + try { + // Now create a reorganization in the chain while + // the indexer is disabled. + node = new FullNode({ + prefix: prefix, + network: 'regtest', + apiKey: 'foo', + memory: false, + indexTX: false, + indexAddress: false, + port: ports.p2p, + httpPort: ports.node, + logLevel: 'none' + }); + + await node.ensure(); + await node.open(); + + nclient = new NodeClient({ + port: ports.node, + apiKey: 'foo', + timeout: 120000 + }); + + await nclient.open(); + + for (let i = 0; i < 10; i++) { + const hash = await nclient.execute('getbestblockhash'); + await nclient.execute('invalidateblock', [hash]); + } + + await forValue(node.chain, 'height', 150); + + const blocks = await nclient.execute( + 'generatetoaddress', [20, vectors[0].addr]); + + assert.equal(blocks.length, 20); + + await forValue(node.chain, 'height', 170); + } finally { + if (nclient) + await nclient.close(); + + if (node) + await node.close(); + } + + try { + // Now turn the indexer back on and check that it + // is able to disconnect blocks and add the new blocks. + node = new FullNode({ + prefix: prefix, + network: 'regtest', + apiKey: 'foo', + memory: false, + indexTX: true, + indexAddress: false, + port: ports.p2p, + httpPort: ports.node, + logLevel: 'none' + }); + + await node.ensure(); + await node.open(); + + assert(node.txindex); + assert.equal(node.txindex.height, 160); + + node.txindex.sync(); + + await forValue(node.txindex, 'height', 170, 5000); + } finally { + if (node) + await node.close(); + } + }); + + it('will reset indexes', async () => { + let node, nclient = null; + + try { + node = new FullNode({ + prefix: prefix, + network: 'regtest', + apiKey: 'foo', + memory: false, + indexTX: true, + indexAddress: false, + port: ports.p2p, + httpPort: ports.node, + logLevel: 'none' + }); + + await node.ensure(); + await node.open(); + + nclient = new NodeClient({ + port: ports.node, + apiKey: 'foo', + timeout: 120000 + }); + + await nclient.open(); + + const blocks = await nclient.execute( + 'generatetoaddress', [150, vectors[0].addr]); + + assert.equal(blocks.length, 150); + + await forValue(node.txindex, 'height', 150); + + await node.chain.reset(0); + + await forValue(node.txindex, 'height', 1); + } finally { + if (nclient) + await nclient.close(); + + if (node) + await node.close(); + } + }); }); }); diff --git a/test/util/common.js b/test/util/common.js index 3fd9f501..04760129 100644 --- a/test/util/common.js +++ b/test/util/common.js @@ -102,7 +102,7 @@ common.rimraf = async function(p) { return await fs.rimraf(p); }; -common.forValue = async function(obj, key, val, timeout = 60000) { +common.forValue = async function(obj, key, val, timeout = 30000) { assert(typeof obj === 'object'); assert(typeof key === 'string');