indexer: fix, simplify and rewrite indexer base

- Write indexer state and index atomically.
- Simplify chain state with current height.
- Roll forward to best height.
- Synchronize the index with the chain with common method `sync` for
  the events 'connect', 'disconnect' and 'reset'. This will prevent
  any of the events from conflicting with each other.
- Fix the chain rollback and reset.
- Make sure blocks are connected in the correct order.
- Improve logging to log similar to chain.
This commit is contained in:
Braydon Fuller 2019-04-05 17:11:18 -07:00
parent e2a6a92ebc
commit 93c6ff845e
No known key found for this signature in database
GPG Key ID: F24F232D108B3AD4
6 changed files with 338 additions and 525 deletions

View File

@ -127,7 +127,6 @@ class AddrIndexer extends Indexer {
*/
async indexBlock(entry, block, view) {
const b = this.db.batch();
const height = entry.height;
for (let i = 0; i < block.txs.length; i++) {
@ -136,15 +135,17 @@ class AddrIndexer extends Indexer {
for (const addr of tx.getAddresses(view)) {
const prefix = addr.getPrefix();
if (prefix < 0)
continue;
const addrHash = addr.getHash();
const count = new Count(height, i);
b.put(layout.A.encode(prefix, addrHash, height, i, hash), null);
b.put(layout.a.encode(prefix, addrHash, hash), count.toRaw());
this.put(layout.A.encode(prefix, addrHash, height, i, hash), null);
this.put(layout.a.encode(prefix, addrHash, hash), count.toRaw());
}
}
return b.write();
}
/**
@ -156,7 +157,6 @@ class AddrIndexer extends Indexer {
*/
async unindexBlock(entry, block, view) {
const b = this.db.batch();
const height = entry.height;
for (let i = 0; i < block.txs.length; i++) {
@ -165,13 +165,15 @@ class AddrIndexer extends Indexer {
for (const addr of tx.getAddresses(view)) {
const prefix = addr.getPrefix();
if (prefix < 0)
continue;
const addrHash = addr.getHash();
b.del(layout.A.encode(prefix, addrHash, height, i, hash));
b.del(layout.a.encode(prefix, addrHash, hash));
this.del(layout.A.encode(prefix, addrHash, height, i, hash));
this.del(layout.a.encode(prefix, addrHash, hash));
}
}
return b.write();
}
/**

View File

@ -9,17 +9,13 @@
const assert = require('assert');
const path = require('path');
const fs = require('bfile');
const bio = require('bufio');
const EventEmitter = require('events');
const {Lock} = require('bmutex');
const Logger = require('blgr');
const Network = require('../protocol/network');
const util = require('../utils/util');
const layout = require('./layout');
const records = require('./records');
const {
ChainState,
BlockMeta
} = records;
const {BlockMeta} = require('./records');
/**
* Indexer
@ -27,13 +23,12 @@ const {
* @extends EventEmitter
* @property {IndexerDB} db
* @property {Number} height
* @property {ChainState} state
* @emits Indexer#chain tip
*/
class Indexer extends EventEmitter {
/**
* Create a index db.
* Create an indexer.
* @constructor
* @param {String} module
* @param {Object} options
@ -53,103 +48,72 @@ class Indexer extends EventEmitter {
this.chain = this.options.chain;
this.db = null;
this.rescanning = false;
this.state = new ChainState();
this.batch = null;
this.syncing = false;
this.height = 0;
this.lock = new Lock();
}
/**
* Bind to chain events.
* @private
* Start a new batch write.
* @returns {Batch}
*/
bind() {
this.chain.on('connect', async (entry, block, view) => {
if (this.rescanning)
return;
try {
await this.addBlock(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
this.chain.on('disconnect', async (entry, block, view) => {
if (this.rescanning)
return;
try {
await this.removeBlock(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
this.chain.on('reset', async (tip) => {
try {
await this.resetChain(tip);
} catch (e) {
this.emit('error', e);
}
});
start() {
assert(this.batch === null, 'Already started.');
this.batch = this.db.batch();
return this.batch;
}
/**
* Ensure prefix directory (prefix/index).
* Put key and value to the current batch.
* @param {String} key
* @param {Buffer} value
*/
put(key, value) {
this.batch.put(key, value);
}
/**
* Delete key from the current batch.
* @param {String} key
*/
del(key) {
this.batch.del(key);
}
/**
* Commit the current batch.
* @returns {Promise}
*/
async ensure() {
if (fs.unsupported)
return undefined;
if (this.options.memory)
return undefined;
return fs.mkdirp(this.options.prefix);
async commit() {
await this.batch.write();
this.batch = null;
}
/**
* Open the indexdb, wait for the database to load.
* Open the indexer, open the database,
* initialize height, and bind to events.
* @returns {Promise}
*/
async open() {
this.logger.info('Indexer is loading.');
await this.ensure();
await this.db.open();
await this.db.verify(layout.V.encode(), 'index', 0);
await this.verifyNetwork();
// Initialize the indexed height.
const data = await this.db.get(layout.R.encode());
if (data)
this.height = bio.readU32(data, 0);
// Bind to chain events.
this.bind();
await this.sync();
}
/**
* Verify network.
* @returns {Promise}
*/
async verifyNetwork() {
const raw = await this.db.get(layout.O.encode());
if (!raw) {
const b = this.db.batch();
b.put(layout.O.encode(), fromU32(this.network.magic));
return b.write();
}
const magic = raw.readUInt32LE(0, true);
if (magic !== this.network.magic)
throw new Error('Network mismatch for Indexer.');
return undefined;
}
/**
@ -162,68 +126,69 @@ class Indexer extends EventEmitter {
}
/**
* Sync state with server on every connect.
* Ensure prefix directory (prefix/index).
* @returns {Promise}
*/
async sync() {
const unlock = await this.lock.lock();
try {
this.logger.info('Resyncing from server...');
await this.syncState();
await this.syncChain();
} finally {
unlock();
}
async ensure() {
if (fs.unsupported)
return;
if (this.options.memory)
return;
await fs.mkdirp(this.options.prefix);
}
/**
* Initialize and write initial sync state.
* Verify network of index.
* @returns {Promise}
*/
async syncState() {
const cache = await this.getState();
async verifyNetwork() {
let raw = await this.db.get(layout.O.encode());
if (cache) {
this.state = cache;
this.height = cache.height;
this.logger.info(
'Indexer loaded (height=%d, start=%d).',
this.state.height,
this.state.startHeight);
return undefined;
if (!raw) {
raw = bio.write(4).writeU32(this.network.magic).render();
await this.db.put(layout.O.encode(), raw);
return;
}
this.logger.info('Initializing database state from server.');
const magic = bio.readU32(raw, 0);
const b = this.db.batch();
const hashes = await this.chain.getHashes();
if (magic !== this.network.magic)
throw new Error('Indexer: Network mismatch.');
}
let tip = null;
/**
* Bind to chain events.
* @private
*/
for (let height = 0; height < hashes.length; height++) {
const hash = hashes[height];
const meta = new BlockMeta(hash, height);
b.put(layout.h.encode(height), meta.toHash());
tip = meta;
}
bind() {
this.chain.on('connect', async (entry, block, view) => {
try {
await this.sync(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
assert(tip);
this.chain.on('disconnect', async (entry, block, view) => {
try {
await this.sync(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
const state = this.state.clone();
state.startHeight = 0;
state.height = tip.height;
b.put(layout.R.encode(), state.toRaw());
await b.write();
this.state = state;
this.height = state.height;
return undefined;
this.chain.on('reset', async (tip) => {
try {
await this.sync(tip);
} catch (e) {
this.emit('error', e);
}
});
}
/**
@ -244,179 +209,13 @@ class Indexer extends EventEmitter {
return entry;
}
/**
* Connect and sync with the chain server.
* @private
* @returns {Promise}
*/
async syncChain() {
let height = this.state.height;
this.logger.info('Syncing state from height %d.', height);
// A re-org when we're offline might
// leave chain in a different state.
// Scan chain backwards until we
// find a known 'good' height.
for (;;) {
const tip = await this.getBlock(height);
assert(tip);
if (await this.getEntry(tip.hash))
break;
assert(height !== 0);
height -= 1;
}
// Start scan from last indexed OR
// last known 'good' height whichever
// is lower, because `scan` scans from
// low to high blocks
if (this.state.startHeight < height)
height = this.state.startHeight;
return this._rescan(height);
}
/**
* Rescan a block.
* @private
* @param {ChainEntry} entry
* @param {TX[]} txs
* @returns {Promise}
*/
async rescanBlock(entry, block, view) {
this.logger.spam('Rescanning block: %d.', entry.height);
if (!this.rescanning) {
this.logger.warning('Unsolicited rescan block: %d.', entry.height);
return;
}
if (entry.height % 1000 === 0)
this.logger.debug('Rescanned block: %d.', entry.height);
if (entry.height > this.state.height + 1) {
this.logger.warning('Rescan block too high: %d.', entry.height);
return;
}
try {
await this._addBlock(entry, block, view);
} catch (e) {
this.emit('error', e);
throw e;
}
}
/**
* Force a rescan.
* @param {Number} height
* @returns {Promise}
*/
async rescan(height) {
const unlock = await this.lock.lock();
try {
return await this._rescan(height);
} finally {
unlock();
}
}
/**
* Rescan blockchain from a given height.
* @private
* @param {Number} height
* @returns {Promise}
*/
async _rescan(height) {
assert((height >>> 0) === height, 'Must pass in a height.');
await this.rollback(height);
const tip = this.state.height;
this.logger.debug('Rescanning from %d to %d', height, tip);
this.rescanning = true;
for (let i = height; ; i++) {
const entry = await this.chain.getEntry(i);
if (!entry)
break;
const block = await this.chain.getBlock(entry.hash);
assert(block);
const view = await this.chain.getBlockView(block);
assert(view);
await this.rescanBlock(entry, block, view);
}
this.rescanning = false;
}
/**
* Get the best block hash.
* @returns {Promise}
*/
async getState() {
const data = await this.db.get(layout.R.encode());
if (!data)
return null;
return ChainState.fromRaw(data);
}
/**
* Sync the current chain state to tip.
* @param {BlockMeta} tip
* @returns {Promise}
*/
async setTip(tip) {
const b = this.db.batch();
const state = this.state.clone();
if (tip.height < state.height) {
// Hashes ahead of our new tip
// that we need to delete.
while (state.height !== tip.height) {
b.del(layout.h.encode(state.height));
state.height -= 1;
}
} else if (tip.height > state.height) {
assert(tip.height === state.height + 1, 'Bad chain sync.');
state.height += 1;
}
state.startHeight = tip.height;
// Save tip and state.
b.put(layout.h.encode(tip.height), tip.toHash());
b.put(layout.R.encode(), state.toRaw());
await b.write();
this.state = state;
this.height = state.height;
}
/**
* Get a index block meta.
* @param {Hash} hash
* @returns {Promise}
*/
async getBlock(height) {
async getBlockMeta(height) {
const data = await this.db.get(layout.h.encode(height));
if (!data)
@ -430,59 +229,157 @@ class Indexer extends EventEmitter {
}
/**
* Get index tip.
* @param {Hash} hash
* Sync with the chain.
* @param {ChainEntry} entry
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async getTip() {
const tip = await this.getBlock(this.state.height);
async sync(entry, block, view) {
if (this.syncing)
return;
if (!tip)
throw new Error('Indexer: Tip not found!');
this.syncing = true;
return tip;
const connected = await this._syncBlock(entry, block, view);
if (connected) {
this.syncing = false;
} else {
(async () => {
await this._syncChain(entry);
this.syncing = false;
})();
}
}
/**
* Sync with chain height.
* Sync with the chain with a block.
* @private
* @param {ChainEntry} entry
* @param {Block} block
* @param {CoinView} view
* @returns {Promise}
*/
async _syncBlock(entry, block, view) {
// In the case that the next block is being
// connected or the current block disconnected
// use the block and view being passed directly,
// instead of reading that information again.
if (entry && block && view) {
if (entry.height === this.height + 1) {
await this._addBlock(entry, block, view);
return true;
} else if (entry.height === this.height) {
await this._removeBlock(entry, block, view);
return true;
}
}
return false;
}
/**
* Sync with the chain.
* @private
* @param {ChainEntry} entry
* @returns {Promise}
*/
async _syncChain(entry) {
let height = this.height;
// In the case that the indexer has never
// started, sync to the best height.
if (!height) {
await this._rollforward();
return;
}
// Check for a re-org that might
// leave chain in a different state.
// Scan chain backwards until we
// find a common height.
for (;;) {
const tip = await this.getBlockMeta(height);
assert(tip);
if (await this.getEntry(tip.hash))
break;
assert(height !== 0);
height -= 1;
}
// In the case that the chain is reset
// the entry will be less than the
// current height.
if (entry && entry.height < height)
height = entry.height;
if (height < this.height) {
await this._rollback(height);
await this._rollforward();
} else {
await this._rollforward();
}
}
/**
* Scan blockchain to the best chain height.
* @private
* @returns {Promise}
*/
async _rollforward() {
this.logger.info('Indexing to best height.');
for (let i = this.height + 1; ; i++) {
const entry = await this.getEntry(i);
if (!entry)
break;
const block = await this.chain.getBlock(entry.hash);
assert(block);
const view = await this.chain.getBlockView(block);
assert(view);
await this._addBlock(entry, block, view);
}
}
/**
* Rollback to a given chain height.
* @param {Number} height
* @returns {Promise}
*/
async rollback(height) {
if (height > this.state.height)
throw new Error('Indexer: Cannot rollback to the future.');
if (height === this.state.height) {
this.logger.info('Rolled back to same height (%d).', height);
async _rollback(height) {
if (height > this.height) {
this.logger.warning(
'Ignoring rollback to future height (%d).',
height);
return;
}
this.logger.info(
'Rolling back %d Indexer blocks to height %d.',
this.state.height - height, height);
this.logger.info('Rolling back to height %d.', height);
const tip = await this.getBlock(height);
assert(tip);
while (this.height > height) {
const tip = await this.getBlockMeta(this.height);
assert(tip);
await this.revert(tip.height);
await this.setTip(tip);
}
const entry = await this.chain.getEntry(tip.hash);
assert(entry);
/**
* Add a block's transactions and write the new best hash.
* @param {ChainEntry} entry
* @param {Block} block
* @returns {Promise}
*/
const block = await this.chain.getBlock(entry.hash);
assert(block);
async addBlock(entry, block, view) {
const unlock = await this.lock.lock();
try {
return await this._addBlock(entry, block, view);
} finally {
unlock();
const view = await this.chain.getBlockView(block);
assert(view);
await this._removeBlock(entry, block, view);
}
}
@ -497,34 +394,31 @@ class Indexer extends EventEmitter {
async _addBlock(entry, block, view) {
assert(block.hasRaw(), 'Expected raw data for block.');
const start = util.bench();
if (entry.height !== this.height + 1)
throw new Error('Indexer: Can not add block.');
const tip = BlockMeta.fromEntry(entry);
if (tip.height >= this.network.block.slowHeight && !this.rescanning)
this.logger.debug('Adding block: %d.', tip.height);
this.logger.spam('Adding block: %d.', entry.height);
if (tip.height === this.state.height) {
// We let blocks of the same height
// through specifically for rescans:
// we always want to rescan the last
// block since the state may have
// updated before the block was fully
// processed (in the case of a crash).
this.logger.warning('Already saw Indexer block (%d).', tip.height);
} else if (tip.height !== this.state.startHeight + 1) {
await this._rescan(this.state.height);
return;
}
this.logger.spam('Indexing block: %d.', entry.height);
// Start the batch write.
this.start();
// Call the implemented indexer to add to
// the batch write.
await this.indexBlock(entry, block, view);
// Sync the state to the new tip.
await this.setTip(tip);
// Sync the height to the new tip.
const height = await this._setTip(tip);
return;
// Commit the write batch to disk.
await this.commit();
// Update height _after_ successful commit.
this.height = height;
// Log the current indexer status.
this.logStatus(start, block, entry);
}
/**
@ -551,32 +445,6 @@ class Indexer extends EventEmitter {
;
}
/**
* Revert db to an older state.
* @param {Number} target
* @returns {Promise}
*/
async revert(target) {
;
}
/**
* Unconfirm a block's transactions
* and write the new best hash (SPV version).
* @param {ChainEntry} entry
* @returns {Promise}
*/
async removeBlock(entry, block, view) {
const unlock = await this.lock.lock();
try {
return await this._removeBlock(entry, block, view);
} finally {
unlock();
}
}
/**
* Unconfirm a block's transactions.
* @private
@ -585,63 +453,94 @@ class Indexer extends EventEmitter {
*/
async _removeBlock(entry, block, view) {
const start = util.bench();
if (entry.height !== this.height)
throw new Error('Indexer: Can not remove block.');
const tip = BlockMeta.fromEntry(entry);
this.logger.spam('Removing block: %d.', entry.height);
if (tip.height === 0)
throw new Error('Indexer: Bad disconnection (genesis block).');
if (tip.height > this.state.height) {
this.logger.warning(
'Indexer is disconnecting high blocks (%d).',
tip.height);
return;
}
if (tip.height !== this.state.height)
throw new Error('Indexer: Bad disconnection (height mismatch).');
this.logger.spam('Unindexing block: %d.', entry.height);
// Start the batch write.
this.start();
// Call the implemented indexer to add to
// the batch write.
await this.unindexBlock(entry, block, view);
const prev = await this.getBlock(tip.height - 1);
const prev = await this.getBlockMeta(tip.height - 1);
assert(prev);
// Sync the state to the previous tip.
await this.setTip(prev);
// Sync the height to the previous tip.
const height = await this._setTip(prev);
return;
// Commit the write batch to disk.
await this.commit();
// Update height _after_ successful commit.
this.height = height;
// Log the current indexer status.
this.logStatus(start, block, entry);
}
/**
* Handle a chain reset.
* @param {ChainEntry} entry
* Update the current height to tip.
* @param {BlockMeta} tip
* @returns {Promise}
*/
async resetChain(entry) {
const unlock = await this.lock.lock();
try {
return await this._resetChain(entry);
} finally {
unlock();
async _setTip(tip) {
if (tip.height < this.height) {
assert(tip.height === this.height - 1);
this.del(layout.h.encode(this.height));
} else if (tip.height > this.height) {
assert(tip.height === this.height + 1);
}
// Add to batch write to save tip and height.
this.put(layout.h.encode(tip.height), tip.toHash());
const raw = bio.write(4).writeU32(tip.height).render();
this.put(layout.R.encode(), raw);
return tip.height;
}
/**
* Handle a chain reset without a lock.
* Test whether the indexer has reached its slow height.
* @private
* @param {ChainEntry} entry
* @returns {Promise}
* @returns {Boolean}
*/
async _resetChain(entry) {
if (entry.height > this.state.height)
throw new Error('Indexer: Bad reset height.');
isSlow() {
if (this.height === 1 || this.height % 20 === 0)
return true;
return this.rollback(entry.height);
if (this.height >= this.network.block.slowHeight)
return true;
return false;
}
/**
* Log the current indexer status.
* @private
* @param {Array} start
* @param {Block} block
* @param {ChainEntry} entry
*/
logStatus(start, block, entry) {
if (!this.isSlow())
return;
const elapsed = util.bench(start);
this.logger.info(
'Block (%d) added to indexer (txs=%d time=%d).',
entry.height,
block.txs.length,
elapsed);
}
}
@ -664,7 +563,6 @@ class IndexOptions {
this.logger = Logger.global;
this.blocks = null;
this.chain = null;
this.indexers = null;
this.prefix = null;
this.location = null;
@ -737,7 +635,7 @@ class IndexOptions {
}
/**
* Instantiate chain options from object.
* Instantiate indexer options from object.
* @param {Object} options
* @returns {IndexOptions}
*/
@ -747,22 +645,6 @@ class IndexOptions {
}
}
/*
* Helpers
*/
/**
* fromU32
* read a 4 byte Uint32LE
* @param {Number} num number
* @returns {Buffer} buffer
*/
function fromU32(num) {
const data = Buffer.allocUnsafe(4);
data.writeUInt32LE(num, 0, true);
return data;
}
/*
* Expose
*/

View File

@ -14,74 +14,6 @@ const bio = require('bufio');
const util = require('../utils/util');
const consensus = require('../protocol/consensus');
/**
* Chain State
* @alias module:indexer.ChainState
*/
class ChainState {
/**
* Create a chain state.
* @constructor
*/
constructor() {
this.startHeight = 0;
this.height = 0;
}
/**
* Clone the state.
* @returns {ChainState}
*/
clone() {
const state = new ChainState();
state.startHeight = this.startHeight;
state.height = this.height;
return state;
}
/**
* Inject properties from serialized data.
* @private
* @param {Buffer} data
*/
fromRaw(data) {
const br = bio.read(data);
this.startHeight = br.readU32();
this.height = br.readU32();
return this;
}
/**
* Instantiate chain state from serialized data.
* @param {Buffer} data
* @returns {ChainState}
*/
static fromRaw(data) {
return new this().fromRaw(data);
}
/**
* Serialize the chain state.
* @returns {Buffer}
*/
toRaw() {
const bw = bio.write(8);
bw.writeU32(this.startHeight);
bw.writeU32(this.height);
return bw.render();
}
}
/**
* Block Meta
* @alias module:indexer.BlockMeta
@ -215,7 +147,6 @@ class BlockMeta {
* Expose
*/
exports.ChainState = ChainState;
exports.BlockMeta = BlockMeta;
module.exports = exports;

View File

@ -129,8 +129,6 @@ class TXIndexer extends Indexer {
*/
async indexBlock(entry, block, view) {
const b = this.db.batch();
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];
@ -146,10 +144,8 @@ class TXIndexer extends Indexer {
length: size
});
b.put(layout.t.encode(hash), txrecord.toRaw());
this.put(layout.t.encode(hash), txrecord.toRaw());
}
return b.write();
}
/**
@ -161,15 +157,11 @@ class TXIndexer extends Indexer {
*/
async unindexBlock(entry, block, view) {
const b = this.db.batch();
for (let i = 0; i < block.txs.length; i++) {
const tx = block.txs[i];
const hash = tx.hash();
b.del(layout.t.encode(hash));
this.del(layout.t.encode(hash));
}
return b.write();
}
/**

View File

@ -410,6 +410,12 @@ class FullNode extends Node {
*/
startSync() {
if (this.txindex)
this.txindex.sync();
if (this.addrindex)
this.addrindex.sync();
return this.pool.startSync();
}

View File

@ -90,8 +90,8 @@ describe('Indexer', function() {
}
assert.strictEqual(chain.height, 10);
assert.strictEqual(txindexer.state.startHeight, 10);
assert.strictEqual(addrindexer.state.startHeight, 10);
assert.strictEqual(txindexer.height, 10);
assert.strictEqual(addrindexer.height, 10);
});
it('should get txs by address', async () => {
@ -172,8 +172,8 @@ describe('Indexer', function() {
}
assert.strictEqual(chain.height, 20);
assert.strictEqual(txindexer.state.startHeight, 20);
assert.strictEqual(addrindexer.state.startHeight, 20);
assert.strictEqual(txindexer.height, 20);
assert.strictEqual(addrindexer.height, 20);
const hashes = await addrindexer.getHashesByAddress(miner.getAddress());
assert.strictEqual(hashes.length, 20);
@ -187,8 +187,8 @@ describe('Indexer', function() {
it('should handle indexing a reorg', async () => {
await reorg(chain, cpu, 10);
assert.strictEqual(txindexer.state.startHeight, 31);
assert.strictEqual(addrindexer.state.startHeight, 31);
assert.strictEqual(txindexer.height, 31);
assert.strictEqual(addrindexer.height, 31);
const hashes = await addrindexer.getHashesByAddress(miner.getAddress());
assert.strictEqual(hashes.length, 31);
@ -269,9 +269,9 @@ describe('Indexer', function() {
'getnewaddress', ['default']);
const blocks = await nclient.execute(
'generatetoaddress', [120, coinbase]);
'generatetoaddress', [150, coinbase]);
assert.equal(blocks.length, 120);
assert.equal(blocks.length, 150);
// Send to the vector addresses for several blocks.
for (let i = 0; i < 10; i++) {