/*! * indexer.js - abstract interface for bcoin indexers * Copyright (c) 2018, the bcoin developers (MIT License). * https://github.com/bcoin-org/bcoin */ 'use strict'; const assert = require('assert'); const path = require('path'); const fs = require('bfile'); const bio = require('bufio'); const EventEmitter = require('events'); const Logger = require('blgr'); const Network = require('../protocol/network'); const util = require('../utils/util'); const layout = require('./layout'); const CoinView = require('../coins/coinview'); const Block = require('../primitives/block'); const {ZERO_HASH} = require('../protocol/consensus'); /** * Indexer * The class which indexers inherit from and implement the * `indexBlock` and `unindexBlock` methods and database * and storage initialization for indexing blocks. * @alias module:indexer.Indexer * @extends EventEmitter * @abstract */ class Indexer extends EventEmitter { /** * Create an indexer. * @constructor * @param {String} module * @param {Object} options */ constructor(module, options) { super(); assert(typeof module === 'string'); assert(module.length > 0); this.options = new IndexOptions(module, options); this.network = this.options.network; this.logger = this.options.logger.context(`${module}indexer`); this.blocks = this.options.blocks; this.chain = this.options.chain; this.db = null; this.batch = null; this.syncing = false; this.height = 0; } /** * Start a new batch write. * @returns {Batch} */ start() { assert(this.batch === null, 'Already started.'); this.batch = this.db.batch(); return this.batch; } /** * Put key and value to the current batch. * @param {String} key * @param {Buffer} value */ put(key, value) { this.batch.put(key, value); } /** * Delete key from the current batch. * @param {String} key */ del(key) { this.batch.del(key); } /** * Commit the current batch. * @returns {Promise} */ async commit() { await this.batch.write(); this.batch = null; } /** * Open the indexer, open the database, * initialize height, and bind to events. * @returns {Promise} */ async open() { this.logger.info('Indexer is loading.'); await this.ensure(); await this.db.open(); await this.db.verify(layout.V.encode(), 'index', 0); await this.verifyNetwork(); // Initialize the indexed height. const data = await this.db.get(layout.R.encode()); if (data) this.height = bio.readU32(data, 0); else await this.saveGenesis(); // Bind to chain events. this.bind(); } /** * Close the indexer, wait for the database to close. * @returns {Promise} */ async close() { return this.db.close(); } /** * Ensure prefix directory (prefix/index). * @returns {Promise} */ async ensure() { if (fs.unsupported) return; if (this.options.memory) return; await fs.mkdirp(this.options.prefix); } /** * Verify network of index. * @returns {Promise} */ async verifyNetwork() { let raw = await this.db.get(layout.O.encode()); if (!raw) { raw = bio.write(4).writeU32(this.network.magic).render(); await this.db.put(layout.O.encode(), raw); return; } const magic = bio.readU32(raw, 0); if (magic !== this.network.magic) throw new Error('Indexer: Network mismatch.'); } /** * A special case for indexing the genesis block. The genesis * block coins are not spendable, however indexers can still index * the block for historical and informational purposes. * @private * @returns {Promise} */ async saveGenesis() { this.start(); const block = Block.fromRaw(Buffer.from(this.network.genesisBlock, 'hex')); const meta = new BlockMeta(block.hash(), 0); await this.indexBlock(meta, block, new CoinView()); await this._setTip(meta); await this.commit(); this.height = 0; } /** * Bind to chain events. * @private */ bind() { const listener = async (entry, block, view) => { const meta = new BlockMeta(entry.hash, entry.height); try { await this.sync(meta, block, view); } catch (e) { this.emit('error', e); } }; this.chain.on('connect', listener); this.chain.on('disconnect', listener); this.chain.on('reset', listener); } /** * Get a chain entry for the main chain only. * @private * @returns {Promise} */ async getEntry(hash) { const entry = await this.chain.getEntry(hash); if (!entry) return null; if (!await this.chain.isMainChain(entry)) return null; return entry; } /** * Get a index block meta. * @param {Hash} hash * @returns {Promise} */ async getBlockMeta(height) { const data = await this.db.get(layout.h.encode(height)); if (!data) return null; return new BlockMeta(data, height); } /** * Sync with the chain. * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async sync(meta, block, view) { if (this.syncing) return; this.syncing = true; const connected = await this._syncBlock(meta, block, view); if (connected) { this.syncing = false; } else { (async () => { try { await this._syncChain(); } catch (e) { this.emit('error', e); } finally { this.syncing = false; } })(); } } /** * Sync with the chain with a block. * @private * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async _syncBlock(meta, block, view) { // In the case that the next block is being // connected or the current block disconnected // use the block and view being passed directly, // instead of reading that information again. if (meta && block && view) { if (meta.height === this.height + 1) { // Make sure that the block is connected to // the indexer chain. const prev = await this.getBlockMeta(this.height); if (prev.hash.compare(block.prevBlock) !== 0) return false; await this._addBlock(meta, block, view); return true; } else if (meta.height === this.height) { // Make sure that this is the current block. const current = await this.getBlockMeta(this.height); if (current.hash.compare(block.hash()) !== 0) return false; await this._removeBlock(meta, block, view); return true; } } return false; } /** * Sync with the chain. * @private * @returns {Promise} */ async _syncChain() { let height = this.height; // In the case that the indexer has never // started, sync to the best height. if (!height) { await this._rollforward(); return; } // Check for a re-org that might // leave chain in a different state. // Scan chain backwards until we // find a common height. while (height > 0) { const meta = await this.getBlockMeta(height); assert(meta); if (await this.getEntry(meta.hash)) break; height -= 1; } if (height < this.height) { await this._rollback(height); await this._rollforward(); } else { await this._rollforward(); } } /** * Scan blockchain to the best chain height. * @private * @returns {Promise} */ async _rollforward() { this.logger.info('Indexing to best height from height (%d).', this.height); for (let height = this.height + 1; ; height++) { const entry = await this.getEntry(height); if (!entry) break; const meta = new BlockMeta(entry.hash, height); const block = await this.chain.getBlock(entry.hash); assert(block); const view = await this.chain.getBlockView(block); assert(view); await this._addBlock(meta, block, view); } } /** * Rollback to a given chain height. * @param {Number} height * @returns {Promise} */ async _rollback(height) { if (height > this.height) { this.logger.warning( 'Ignoring rollback to future height (%d).', height); return; } this.logger.info('Rolling back to height %d.', height); while (this.height > height && this.height > 1) { const meta = await this.getBlockMeta(this.height); assert(meta); const block = await this.chain.getBlock(meta.hash); assert(block); const view = await this.chain.getBlockView(block); assert(view); await this._removeBlock(meta, block, view); } } /** * Add a block's transactions without a lock. * @private * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async _addBlock(meta, block, view) { assert(block.hasRaw(), 'Expected raw data for block.'); const start = util.bench(); if (meta.height !== this.height + 1) throw new Error('Indexer: Can not add block.'); // Start the batch write. this.start(); // Call the implemented indexer to add to // the batch write. await this.indexBlock(meta, block, view); // Sync the height to the new tip. const height = await this._setTip(meta); // Commit the write batch to disk. await this.commit(); // Update height _after_ successful commit. this.height = height; // Log the current indexer status. this.logStatus(start, block, meta); } /** * Process block indexing * Indexers will implement this method to process the block for indexing * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async indexBlock(meta, block, view) { ; } /** * Undo block indexing * Indexers will implement this method to undo indexing for the block * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async unindexBlock(meta, block, view) { ; } /** * Unconfirm a block's transactions. * @private * @param {BlockMeta} meta * @param {Block} block * @param {CoinView} view * @returns {Promise} */ async _removeBlock(meta, block, view) { const start = util.bench(); if (meta.height !== this.height) throw new Error('Indexer: Can not remove block.'); // Start the batch write. this.start(); // Call the implemented indexer to add to // the batch write. await this.unindexBlock(meta, block, view); const prev = await this.getBlockMeta(meta.height - 1); assert(prev); // Sync the height to the previous tip. const height = await this._setTip(prev); // Commit the write batch to disk. await this.commit(); // Update height _after_ successful commit. this.height = height; // Log the current indexer status. this.logStatus(start, block, meta, true); } /** * Update the current height to tip. * @param {BlockMeta} tip * @returns {Promise} */ async _setTip(meta) { if (meta.height < this.height) { assert(meta.height === this.height - 1); this.del(layout.h.encode(this.height)); } else if (meta.height > this.height) { assert(meta.height === this.height + 1); } // Add to batch write to save tip and height. this.put(layout.h.encode(meta.height), meta.hash); const raw = bio.write(4).writeU32(meta.height).render(); this.put(layout.R.encode(), raw); return meta.height; } /** * Test whether the indexer has reached its slow height. * @private * @returns {Boolean} */ isSlow() { if (this.height === 1 || this.height % 20 === 0) return true; if (this.height >= this.network.block.slowHeight) return true; return false; } /** * Log the current indexer status. * @private * @param {Array} start * @param {Block} block * @param {BlockMeta} meta * @param {Boolean} reverse */ logStatus(start, block, meta, reverse) { if (!this.isSlow()) return; const elapsed = util.bench(start); const msg = reverse ? 'removed from' : 'added to'; this.logger.info( 'Block (%d) %s indexer (txs=%d time=%d).', meta.height, msg, block.txs.length, elapsed); } } /** * Block Meta */ class BlockMeta { constructor(hash, height) { this.hash = hash || ZERO_HASH; this.height = height || 0; assert(Buffer.isBuffer(this.hash) && this.hash.length === 32); assert(Number.isInteger(this.height)); } } /** * Index Options */ class IndexOptions { /** * Create index options. * @constructor * @param {String} module * @param {Object} options */ constructor(module, options) { this.module = module; this.network = Network.primary; this.logger = Logger.global; this.blocks = null; this.chain = null; this.prefix = null; this.location = null; this.memory = true; this.maxFiles = 64; this.cacheSize = 16 << 20; this.compression = true; if (options) this.fromOptions(options); } /** * Inject properties from object. * @private * @param {Object} options * @returns {IndexOptions} */ fromOptions(options) { assert(options.blocks && typeof options.blocks === 'object', 'Indexer requires a blockstore.'); assert(options.chain && typeof options.chain === 'object', 'Indexer requires chain.'); assert(!options.prune, 'Can not index while pruned.'); this.blocks = options.blocks; this.chain = options.chain; if (options.network != null) this.network = Network.get(options.network); if (options.logger != null) { assert(typeof options.logger === 'object'); this.logger = options.logger; } if (options.prefix != null) { assert(typeof options.prefix === 'string'); this.prefix = options.prefix; this.prefix = path.join(this.prefix, 'index'); this.location = path.join(this.prefix, this.module); } if (options.location != null) { assert(typeof options.location === 'string'); this.location = options.location; } if (options.memory != null) { assert(typeof options.memory === 'boolean'); this.memory = options.memory; } if (options.maxFiles != null) { assert((options.maxFiles >>> 0) === options.maxFiles); this.maxFiles = options.maxFiles; } if (options.cacheSize != null) { assert(Number.isSafeInteger(options.cacheSize) && options.cacheSize >= 0); this.cacheSize = options.cacheSize; } if (options.compression != null) { assert(typeof options.compression === 'boolean'); this.compression = options.compression; } return this; } /** * Instantiate indexer options from object. * @param {Object} options * @returns {IndexOptions} */ static fromOptions(options) { return new this().fromOptions(options); } } /* * Expose */ module.exports = Indexer;