indexer: use chain and blocks for indexer, remove chain client

This commit is contained in:
Braydon Fuller 2019-03-22 16:01:51 -07:00
parent f9aab08c46
commit 8bff122253
No known key found for this signature in database
GPG Key ID: F24F232D108B3AD4
10 changed files with 96 additions and 489 deletions

View File

@ -58,7 +58,6 @@ bcoin.Mnemonic = require('./hd/mnemonic');
// Index
bcoin.indexer = require('./indexer');
bcoin.Indexer = require('./indexer/indexer');
bcoin.ChainClient = require('./indexer/chainclient');
bcoin.TXIndexer = require('./indexer/txindexer');
bcoin.AddrIndexer = require('./indexer/addrindexer');

View File

@ -79,7 +79,6 @@ bcoin.define('Mnemonic', './hd/mnemonic');
// Index
bcoin.define('indexer', './indexer');
bcoin.define('Indexer', './indexer/indexer');
bcoin.define('ChainClient', './indexer/chainclient');
bcoin.define('TXIndexer', './indexer/txindexer');
bcoin.define('AddrIndexer', './indexer/addrindexer');

View File

@ -159,7 +159,7 @@ class AddrIndexer extends Indexer {
});
for (const [hash, index] of keys) {
const coin = await this.client.getCoin(hash, index);
const coin = await this.chain.getCoin(hash, index);
assert(coin);
coins.push(coin);
}

View File

@ -1,200 +0,0 @@
/*!
* chainclient.js - chain client for bcoin
* Copyright (c) 2018, the bcoin developers (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const assert = require('assert');
const AsyncEmitter = require('bevent');
const Chain = require('../blockchain/chain');
/**
* Chain Client
* @extends AsyncEmitter
* @alias module:indexer.ChainClient
*/
class ChainClient extends AsyncEmitter {
/**
* Create a chain client.
* @constructor
* @param {Chain} chain
*/
constructor(chain) {
super();
assert(chain instanceof Chain);
this.chain = chain;
this.network = chain.network;
this.opened = false;
this.init();
}
/**
* Initialize the client.
*/
init() {
this.chain.on('connect', async (entry, block, view) => {
if (!this.opened)
return;
await this.emitAsync('block connect', entry, block, view);
});
this.chain.on('disconnect', async (entry, block, view) => {
if (!this.opened)
return;
await this.emitAsync('block disconnect', entry, block, view);
});
this.chain.on('reset', async (tip) => {
if (!this.opened)
return;
await this.emitAsync('chain reset', tip);
});
}
/**
* Open the client.
* @returns {Promise}
*/
async open(options) {
assert(!this.opened, 'ChainClient is already open.');
this.opened = true;
setImmediate(() => this.emit('connect'));
}
/**
* Close the client.
* @returns {Promise}
*/
async close() {
assert(this.opened, 'ChainClient is not open.');
this.opened = false;
setImmediate(() => this.emit('disconnect'));
}
/**
* Get chain tip.
* @returns {Promise}
*/
async getTip() {
return this.chain.tip;
}
/**
* Get chain entry.
* @param {Hash} hash
* @returns {Promise} - Returns {@link ChainEntry}.
*/
async getEntry(hash) {
const entry = await this.chain.getEntry(hash);
if (!entry)
return null;
if (!await this.chain.isMainChain(entry))
return null;
return entry;
}
/**
* Get a coin (unspents only).
* @param {Hash} hash
* @param {Number} index
* @returns {Promise} - Returns {@link Coin}.
*/
async getCoin(hash, index) {
return this.chain.getCoin(hash, index);
}
/**
* Get hash range.
* @param {Number} start
* @param {Number} end
* @returns {Promise}
*/
async getHashes(start = -1, end = -1) {
return this.chain.getHashes(start, end);
}
/**
* Get block
* @param {Hash} hash
* @returns {Promise} - Returns {@link Block}
*/
async getBlock(hash) {
const block = await this.chain.getBlock(hash);
if (!block)
return null;
return block;
}
/**
* Get a historical block coin viewpoint.
* @param {Block} hash
* @returns {Promise} - Returns {@link CoinView}.
*/
async getBlockView(block) {
return this.chain.getBlockView(block);
}
/**
* Get coin viewpoint.
* @param {TX} tx
* @returns {Promise} - Returns {@link CoinView}.
*/
async getCoinView(tx) {
return this.chain.getCoinView(tx);
}
/**
* Rescan for any missed blocks.
* @param {Number} start - Start block.
* @returns {Promise}
*/
async rescan(start) {
for (let i = start; ; i++) {
const entry = await this.getEntry(i);
if (!entry) {
await this.emitAsync('chain tip');
break;
};
const block = await this.getBlock(entry.hash);
assert(block);
const view = await this.getBlockView(block);
assert(view);
await this.emitAsync('block rescan', entry, block, view);
}
};
}
/*
* Expose
*/
module.exports = ChainClient;

View File

@ -13,4 +13,3 @@
exports.Indexer = require('./indexer');
exports.TXIndexer = require('./txindexer');
exports.AddrIndexer = require('./addrindexer');
exports.ChainClient = require('./chainclient');

View File

@ -15,8 +15,6 @@ const Logger = require('blgr');
const Network = require('../protocol/network');
const layout = require('./layout');
const records = require('./records');
const ChainClient = require('./chainclient');
const NullClient = require('./nullclient');
const {
ChainState,
@ -51,7 +49,9 @@ class Indexer extends EventEmitter {
this.network = this.options.network;
this.logger = this.options.logger.context(`${module}indexer`);
this.client = this.options.client || new NullClient(this);
this.blocks = this.options.blocks;
this.chain = this.options.chain;
this.db = null;
this.rescanning = false;
@ -59,17 +59,6 @@ class Indexer extends EventEmitter {
this.height = 0;
this.lock = new Lock();
this.init();
}
/**
* Initialize indexdb.
* @private
*/
init() {
this._bind();
}
/**
@ -77,22 +66,11 @@ class Indexer extends EventEmitter {
* @private
*/
_bind() {
this.client.on('error', (err) => {
this.emit('error', err);
});
this.client.on('connect', async () => {
try {
await this.syncNode();
} catch (e) {
this.emit('error', e);
}
});
this.client.on('block connect', async (entry, block, view) => {
bind() {
this.chain.on('connect', async (entry, block, view) => {
if (this.rescanning)
return;
try {
await this.addBlock(entry, block, view);
} catch (e) {
@ -100,9 +78,10 @@ class Indexer extends EventEmitter {
}
});
this.client.on('block disconnect', async (entry, block, view) => {
this.chain.on('disconnect', async (entry, block, view) => {
if (this.rescanning)
return;
try {
await this.removeBlock(entry, block, view);
} catch (e) {
@ -110,27 +89,13 @@ class Indexer extends EventEmitter {
}
});
this.client.on('block rescan', async (entry, block, view) => {
try {
await this.rescanBlock(entry, block, view);
} catch (e) {
this.emit('error', e);
}
});
this.client.on('chain reset', async (tip) => {
this.chain.on('reset', async (tip) => {
try {
await this.resetChain(tip);
} catch (e) {
this.emit('error', e);
}
});
this.client.on('chain tip', async () => {
this.logger.debug('Indexer: finished rescan');
const tip = await this.getTip();
this.emit('chain tip', tip);
});
}
/**
@ -160,7 +125,9 @@ class Indexer extends EventEmitter {
await this.verifyNetwork();
await this.connect();
this.bind();
await this.sync();
}
/**
@ -191,34 +158,15 @@ class Indexer extends EventEmitter {
*/
async close() {
await this.disconnect();
return this.db.close();
}
/**
* Connect to the chain server (client required).
* @returns {Promise}
*/
async connect() {
return this.client.open();
}
/**
* Disconnect from chain server (client required).
* @returns {Promise}
*/
async disconnect() {
return this.client.close();
}
/**
* Sync state with server on every connect.
* @returns {Promise}
*/
async syncNode() {
async sync() {
const unlock = await this.lock.lock();
try {
this.logger.info('Resyncing from server...');
@ -251,7 +199,7 @@ class Indexer extends EventEmitter {
this.logger.info('Initializing database state from server.');
const b = this.db.batch();
const hashes = await this.client.getHashes();
const hashes = await this.chain.getHashes();
let tip = null;
@ -278,6 +226,24 @@ class Indexer extends EventEmitter {
return undefined;
}
/**
* Get a chain entry for the main chain only.
* @private
* @returns {Promise}
*/
async getEntry(hash) {
const entry = await this.chain.getEntry(hash);
if (!entry)
return null;
if (!await this.chain.isMainChain(entry))
return null;
return entry;
}
/**
* Connect and sync with the chain server.
* @private
@ -289,30 +255,29 @@ class Indexer extends EventEmitter {
this.logger.info('Syncing state from height %d.', height);
// re-org when we're offline might
// leave chain in different state.
// scan chain backwards until we
// find a known 'good' height
// A re-org when we're offline might
// leave chain in a different state.
// Scan chain backwards until we
// find a known 'good' height.
for (;;) {
const tip = await this.getBlock(height);
assert(tip);
if (await this.client.getEntry(tip.hash))
if (await this.getEntry(tip.hash))
break;
assert(height !== 0);
height -= 1;
}
// start scan from last indexed OR
// Start scan from last indexed OR
// last known 'good' height whichever
// is lower, because `scan` scans from
// low to high blocks
if (this.state.startHeight < height)
height = this.state.startHeight;
this.logger.spam('Starting block rescan from: %d.', height);
return this.scan(height);
return this._rescan(height);
}
/**
@ -332,7 +297,7 @@ class Indexer extends EventEmitter {
}
if (entry.height % 1000 === 0)
this.logger.debug('rescanned block: %d.', entry.height);
this.logger.debug('Rescanned block: %d.', entry.height);
if (entry.height > this.state.height + 1) {
this.logger.warning('Rescan block too high: %d.', entry.height);
@ -347,33 +312,6 @@ class Indexer extends EventEmitter {
}
}
/**
* Rescan blockchain from a given height.
* @private
* @param {Number?} height
* @returns {Promise}
*/
async scan(height) {
assert((height >>> 0) === height, 'Indexer: Must pass in a height.');
await this.rollback(height);
const tip = this.state.height;
this.logger.info(
'Indexer is scanning %d blocks.',
tip - height + 1);
try {
this.rescanning = true;
this.logger.debug('rescanning from %d to %d', height, tip);
await this.client.rescan(height);
} finally {
this.rescanning = false;
}
}
/**
* Force a rescan.
* @param {Number} height
@ -390,14 +328,38 @@ class Indexer extends EventEmitter {
}
/**
* Force a rescan (without a lock).
* Rescan blockchain from a given height.
* @private
* @param {Number} height
* @returns {Promise}
*/
async _rescan(height) {
return this.scan(height);
assert((height >>> 0) === height, 'Must pass in a height.');
await this.rollback(height);
const tip = this.state.height;
this.logger.debug('Rescanning from %d to %d', height, tip);
this.rescanning = true;
for (let i = height; ; i++) {
const entry = await this.chain.getEntry(i);
if (!entry)
break;
const block = await this.chain.getBlock(entry.hash);
assert(block);
const view = await this.chain.getBlockView(block);
assert(view);
await this.rescanBlock(entry, block, view);
}
this.rescanning = false;
}
/**
@ -551,7 +513,7 @@ class Indexer extends EventEmitter {
// processed (in the case of a crash).
this.logger.warning('Already saw Indexer block (%d).', tip.height);
} else if (tip.height !== this.state.startHeight + 1) {
await this.scan(this.state.height);
await this._rescan(this.state.height);
return;
}
@ -700,7 +662,7 @@ class IndexOptions {
this.module = module;
this.network = Network.primary;
this.logger = Logger.global;
this.client = null;
this.blocks = null;
this.chain = null;
this.indexers = null;
@ -723,6 +685,14 @@ class IndexOptions {
*/
fromOptions(options) {
assert(options.blocks && typeof options.blocks === 'object',
'Indexer requires a blockstore.');
assert(options.chain && typeof options.chain === 'object',
'Indexer requires chain.');
this.blocks = options.blocks;
this.chain = options.chain;
if (options.network != null)
this.network = Network.get(options.network);
@ -731,20 +701,6 @@ class IndexOptions {
this.logger = options.logger;
}
if (options.client != null) {
assert(typeof options.client === 'object');
this.client = options.client;
}
if (options.chain != null) {
assert(typeof options.chain === 'object');
this.client = new ChainClient(options.chain);
}
if (!this.client) {
throw new Error('Client is required');
}
if (options.prefix != null) {
assert(typeof options.prefix === 'string');
this.prefix = options.prefix;

View File

@ -1,142 +0,0 @@
/*!
* nullclient.js - chain client for bcoin
* Copyright (c) 2018, the bcoin developers (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const assert = require('assert');
const EventEmitter = require('events');
/**
* Null Client
* Sort of a fake local client for separation of concerns.
* @alias module:indexer.NullClient
*/
class NullClient extends EventEmitter {
/**
* Create a client.
* @constructor
* @param {Chain} chain
*/
constructor(chain) {
super();
this.chain = chain;
this.network = chain.network;
this.opened = false;
}
/**
* Open the client.
* @returns {Promise}
*/
async open(options) {
assert(!this.opened, 'NullClient is already open.');
this.opened = true;
setImmediate(() => this.emit('connect'));
}
/**
* Close the client.
* @returns {Promise}
*/
async close() {
assert(this.opened, 'NullClient is not open.');
this.opened = false;
setImmediate(() => this.emit('disconnect'));
}
/**
* Get chain tip.
* @returns {Promise}
*/
async getTip() {
const {hash, height, time} = this.network.genesis;
return { hash, height, time };
}
/**
* Get chain entry.
* @param {Hash} hash
* @returns {Promise} - Returns {@link ChainEntry}.
*/
async getEntry(hash) {
return { hash, height: 0, time: 0 };
}
/**
* Get a coin (unspents only).
* @param {Hash} hash
* @param {Number} index
* @returns {Promise} - Returns {@link Coin}.
*/
async getCoin(hash, index) {
return null;
}
/**
* Get hash range.
* @param {Number} start
* @param {Number} end
* @returns {Promise}
*/
async getHashes(start = -1, end = -1) {
return [this.network.genesis.hash];
}
/**
* Get block
* @param {Hash} hash
* @returns {Promise}
*/
async getBlock(hash) {
return null;
}
/**
* Get a historical block coin viewpoint.
* @param {Block} hash
* @returns {Promise} - Returns {@link CoinView}.
*/
async getBlockView(block) {
return null;
}
/**
* Get coin viewpoint.
* @param {TX} tx
* @returns {Promise} - Returns {@link CoinView}.
*/
async getCoinView(tx) {
return null;
}
/**
* Rescan for any missed blocks.
* @param {Number} start - Start block.
* @returns {Promise}
*/
async rescan(start) {
;
}
}
/*
* Expose
*/
module.exports = NullClient;

View File

@ -184,8 +184,10 @@ class TXIndexer extends Indexer {
return null;
const record = TxRecord.fromRaw(raw);
const {block, offset, length} = record;
const data = await this.blocks.read(block, offset, length);
const data = await this.read(record.block, record.offset, record.length);
const tx = TX.fromRaw(data);
const meta = TXMeta.fromTX(tx);
@ -228,7 +230,7 @@ class TXIndexer extends Indexer {
*/
async getSpentView(tx) {
const view = await this.client.getCoinView(tx);
const view = await this.chain.getCoinView(tx);
for (const {prevout} of tx.inputs) {
if (view.hasEntry(prevout))

View File

@ -157,25 +157,27 @@ class FullNode extends Node {
});
// Indexers
this.txindex = null;
if (this.config.bool('index-tx'))
if (this.config.bool('index-tx')) {
this.txindex = new TXIndexer({
network: this.network,
logger: this.logger,
blocks: this.blocks,
chain: this.chain,
memory: this.config.bool('memory'),
prefix: this.config.filter('index').str('prefix') || this.config.prefix
});
}
this.addrindex = null;
if (this.config.bool('index-address'))
if (this.config.bool('index-address')) {
this.addrindex= new AddrIndexer({
network: this.network,
logger: this.logger,
blocks: this.blocks,
chain: this.chain,
memory: this.config.bool('memory'),
prefix: this.config.filter('index').str('prefix') || this.config.prefix
});
}
this.init();
}

View File

@ -44,15 +44,17 @@ const wallet = new MemWallet({
});
const txindexer = new TXIndexer({
'memory': true,
'network': network,
'chain': chain
memory: true,
network,
chain,
blocks
});
const addrindexer = new AddrIndexer({
'memory': true,
'network': network,
'chain': chain
memory: true,
network,
chain,
blocks
});
describe('Indexer', function() {
@ -90,9 +92,6 @@ describe('Indexer', function() {
});
it('should rescan and reindex 10 missed blocks', async () => {
await txindexer.disconnect();
await addrindexer.disconnect();
for (let i = 0; i < 10; i++) {
const block = await cpu.mineBlock();
assert(block);
@ -100,17 +99,10 @@ describe('Indexer', function() {
}
assert.strictEqual(chain.height, 20);
await txindexer.connect();
await addrindexer.connect();
await new Promise(r => addrindexer.once('chain tip', r));
assert.strictEqual(txindexer.state.startHeight, 20);
assert.strictEqual(addrindexer.state.startHeight, 20);
const coins =
await addrindexer.getCoinsByAddress(miner.getAddress());
const coins = await addrindexer.getCoinsByAddress(miner.getAddress());
assert.strictEqual(coins.length, 20);
for (const coin of coins) {