walletdb: nodeclient work.

This commit is contained in:
Christopher Jeffrey 2016-11-19 21:40:31 -08:00
parent 1827b945dd
commit d75b5d80cc
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
8 changed files with 324 additions and 323 deletions

View File

@ -1361,6 +1361,37 @@ HTTPServer.prototype._initIO = function _initIO() {
callback();
});
socket.on('get tip', function(args, callback) {
callback(null, socket.frameEntry(self.chain.tip));
});
socket.on('get entry', co(function* (args, callback) {
var block = args[0];
var entry;
if (typeof block === 'string') {
if (!util.isHex256(block))
return callback({ error: 'Invalid parameter.' });
block = util.revHex(block);
} else {
if (!util.isUInt32(block))
return callback({ error: 'Invalid parameter.' });
}
try {
entry = yield self.chain.db.get(block);
if (!(yield entry.isMainChain()))
entry = null;
} catch (e) {
return callback({ error: e.message });
}
if (!entry)
return callback(null, null);
callback(null, socket.frameEntry(entry));
}));
socket.on('add filter', function(args, callback) {
var chunks = args[0];
@ -1698,19 +1729,22 @@ ClientSocket.prototype.unbindAll = function unbindAll() {
ClientSocket.prototype.watchChain = function watchChain() {
var self = this;
var onError = this.onError.bind(this);
var pool = this.mempool || this.pool;
this.bind(this.chain, 'connect', function(entry, block) {
self.connectBlock(entry, block).catch(onError);
self.connectBlock(entry, block);
});
this.bind(this.chain, 'disconnect', function(entry, block) {
self.disconnectBlock(entry, block).catch(onError);
self.disconnectBlock(entry, block);
});
this.bind(this.chain, 'reset', function(tip) {
self.emit('chain reset', self.frameEntry(tip));
});
this.bind(pool, 'tx', function(tx) {
self.sendTX(tx).catch(onError);
self.sendTX(tx);
});
};
@ -1723,93 +1757,55 @@ ClientSocket.prototype.unwatchChain = function unwatchChain() {
var pool = this.mempool || this.pool;
this.unbind(this.chain, 'connect');
this.unbind(this.chain, 'disconnect');
this.unbind(this.chain, 'reset');
this.unbind(pool, 'tx');
};
ClientSocket.prototype.connectBlock = co(function* _connectBlock(entry, block) {
var unlock = this.locker.lock();
try {
return yield this._connectBlock(entry, block);
} finally {
unlock();
}
});
ClientSocket.prototype.connectBlock = function connectBlock(entry, block) {
var raw = this.frameEntry(entry);
var txs;
ClientSocket.prototype._connectBlock = function _connectBlock(entry, block) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw = self.frameEntry(entry);
var txs;
this.emit('entry connect', raw);
self.emit('entry connect', raw);
if (!this.filter)
return;
txs = self.testBlock(block);
txs = this.filterBlock(block);
if (!txs)
return cb();
self.emit('block connect', raw, txs, cb);
});
this.emit('block connect', raw, txs);
};
ClientSocket.prototype.disconnectBlock = co(function* _disconnectBlock(entry, block) {
var unlock = this.locker.lock();
try {
return yield this._disconnectBlock(entry, block);
} finally {
unlock();
}
});
ClientSocket.prototype.disconnectBlock = function disconnectBlock(entry, block) {
var raw = this.frameEntry(entry);
ClientSocket.prototype._disconnectBlock = function _disconnectBlock(entry, block) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw = self.frameEntry(entry);
this.emit('entry disconnect', raw);
self.emit('entry disconnect', raw);
if (!this.filter)
return;
if (!self.filter)
return cb();
self.emit('block disconnect', raw, cb);
});
this.emit('block disconnect', raw);
};
ClientSocket.prototype.sendTX = co(function* sendTX(tx) {
var unlock = this.locker.lock();
try {
return yield this._sendTX(tx);
} finally {
unlock();
}
});
ClientSocket.prototype.sendTX = function sendTX(tx) {
var raw;
ClientSocket.prototype._sendTX = function _sendTX(tx) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw;
if (!this.filterTX(tx))
return;
if (!self.testFilter(tx))
return cb();
raw = this.frameTX(tx);
raw = self.frameTX(tx);
self.emit('tx', raw, cb);
});
this.emit('tx', raw);
};
ClientSocket.prototype.rescanBlock = function rescanBlock(entry, txs) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var cb = TimedCB.wrap(resolve, reject);
self.emit('block rescan', entry, txs, cb);
});
};
ClientSocket.prototype.testBlock = function testBlock(block) {
ClientSocket.prototype.filterBlock = function filterBlock(block) {
var txs = [];
var i, tx;
@ -1818,17 +1814,14 @@ ClientSocket.prototype.testBlock = function testBlock(block) {
for (i = 0; i < block.txs.length; i++) {
tx = block.txs[i];
if (this.testFilter(tx))
if (this.filterTX(tx))
txs.push(this.frameTX(tx));
}
if (txs.length === 0)
return;
return txs;
};
ClientSocket.prototype.testFilter = function testFilter(tx) {
ClientSocket.prototype.filterTX = function filterTX(tx) {
var found = false;
var i, hash, input, prevout, output;
@ -1999,6 +1992,46 @@ function getMemory() {
};
}
/**
* TimedCB
* @constructor
*/
function TimedCB(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
this.done = false;
}
TimedCB.wrap = function wrap(resolve, reject) {
return new TimedCB(resolve, reject).start();
};
TimedCB.prototype.start = function start() {
var self = this;
var timeout;
timeout = setTimeout(function() {
self.cleanup(timeout);
self.reject(new Error('Callback timed out.'));
}, 5000);
return function(err) {
self.cleanup(timeout);
if (err) {
self.reject(err);
return;
}
self.resolve();
};
};
TimedCB.prototype.cleanup = function cleanup(timeout) {
assert(!this.done);
this.done = true;
clearTimeout(timeout);
};
/*
* Expose
*/

View File

@ -1473,6 +1473,19 @@ Pool.prototype.removePeer = function removePeer(peer) {
}
};
/**
* Set the spv filter.
* @param {Bloom} filter
* @param {String?} enc
*/
Pool.prototype.setFilter = function setFilter(filter) {
if (!this.options.spv)
return;
this.spvFilter = filter;
this.updateWatch();
};
/**
* Watch a an address hash (filterload, SPV-only).
* @param {Buffer|Hash} data

View File

@ -136,7 +136,7 @@ function FullNode(options) {
this.walletdb = new WalletDB({
network: this.network,
logger: this.logger,
client: this,
client: this.client,
db: this.options.db,
location: this.location('walletdb'),
witness: false,
@ -195,7 +195,6 @@ FullNode.prototype._init = function _init() {
this.mempool.on('tx', function(tx) {
self.emit('tx', tx);
self.walletdb.addTX(tx).catch(onError);
self.miner.notifyEntry();
});
@ -204,21 +203,21 @@ FullNode.prototype._init = function _init() {
});
this.chain.on('connect', function(entry, block) {
self.walletdb.addBlock(entry, block.txs).catch(onError);
self.emit('connect', entry, block);
if (self.chain.synced)
self.mempool.addBlock(block).catch(onError);
});
this.chain.on('disconnect', function(entry, block) {
self.walletdb.removeBlock(entry).catch(onError);
self.emit('disconnect', entry, block);
if (self.chain.synced)
self.mempool.removeBlock(block).catch(onError);
});
this.chain.on('reset', function(tip) {
self.walletdb.resetChain(tip).catch(onError);
self.emit('reset', tip);
});
this.miner.on('block', function(block) {
@ -275,36 +274,6 @@ FullNode.prototype._close = co(function* close() {
this.logger.info('Node is closed.');
});
/**
* Set bloom filter.
* @param {Bloom} filter
* @returns {Promise}
*/
FullNode.prototype.loadFilter = function loadFilter(filter) {
this.filter = filter;
return Promise.resolve();
};
/**
* Add data to filter.
* @param {Buffer} data
* @returns {Promise}
*/
FullNode.prototype.addFilter = function addFilter(data) {
return Promise.resolve();
};
/**
* Reset filter.
* @returns {Promise}
*/
FullNode.prototype.resetFilter = function resetFilter() {
return Promise.resolve();
};
/**
* Rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
@ -317,30 +286,6 @@ FullNode.prototype.scan = function scan(start, filter, iter) {
return this.chain.scan(start, filter, iter);
};
/**
* Rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
FullNode.prototype.rescan = co(function* rescan(start, iter) {
if (!this.filter)
throw new Error('No filter set.');
yield this.scan(start, this.filter, iter);
});
/**
* Esimate smart fee.
* @param {Number?} blocks
* @returns {Promise}
*/
FullNode.prototype.estimateFee = function estimateFee(blocks) {
return Promise.resolve(this.fees.estimateFee(blocks));
};
/**
* Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from

View File

@ -4,4 +4,5 @@ exports.config = require('./config');
exports.FullNode = require('./fullnode');
exports.Logger = require('./logger');
exports.Node = require('./node');
exports.NodeClient = require('./nodeclient');
exports.SPVNode = require('./spvnode');

View File

@ -13,6 +13,7 @@ var co = require('../utils/co');
var assert = require('assert');
var Network = require('../protocol/network');
var Logger = require('./logger');
var NodeClient = require('./nodeclient');
var time = require('../net/time');
var workerPool = require('../workers/workerpool').pool;
@ -49,7 +50,9 @@ function Node(options) {
this.walletdb = null;
this.wallet = null;
this.http = null;
this.filter = null;
// Local client for walletdb
this.client = new NodeClient(this);
this._bound = [];
@ -259,47 +262,6 @@ Node.prototype.openWallet = co(function* openWallet() {
this.wallet = wallet;
});
/**
* Get chain tip.
* @returns {Promise}
*/
Node.prototype.getTip = function getTip() {
return Promise.resolve(this.chain.tip);
};
/**
* Get chain entry.
* @param {Hash} hash
* @returns {Promise}
*/
Node.prototype.getEntry = co(function* getEntry(hash) {
var entry = yield this.chain.db.get(hash);
if (!entry)
return;
if (!(yield entry.isMainChain()))
return;
return entry;
});
/**
* Send a transaction. Do not wait for promise.
* @param {TX} tx
* @returns {Promise}
*/
Node.prototype.send = function send(tx) {
var onError = this._error.bind(this);
this.sendTX(tx).catch(onError);
return Promise.resolve();
};
/*
* Expose
*/

View File

@ -79,7 +79,7 @@ function SPVNode(options) {
this.walletdb = new WalletDB({
network: this.network,
logger: this.logger,
client: this,
client: this.client,
db: this.options.db,
location: this.location('walletdb'),
witness: false,
@ -87,7 +87,8 @@ function SPVNode(options) {
startHeight: this.options.startHeight,
wipeNoReally: this.options.wipeNoReally,
resolution: true,
verify: true
verify: true,
spv: true
});
if (!HTTPServer.unsupported) {
@ -155,19 +156,15 @@ SPVNode.prototype._init = function _init() {
return;
}
self.walletdb.addBlock(entry, block.txs).catch(onError);
self.emit('connect', entry, block);
});
this.chain.on('disconnect', function(entry, block) {
self.walletdb.removeBlock(entry).catch(onError);
self.emit('disconnect', entry);
});
this.chain.on('reset', function(tip) {
self.walletdb.resetChain(tip).catch(onError);
});
this.walletdb.on('watch', function(data) {
self.pool.watch(data);
self.emit('reset', tip);
});
};
@ -211,36 +208,6 @@ SPVNode.prototype._close = co(function* close() {
yield this.chain.close();
});
/**
* Set bloom filter.
* @param {Bloom} filter
* @returns {Promise}
*/
SPVNode.prototype.loadFilter = function loadFilter(filter) {
this.filter = filter;
return Promise.resolve();
};
/**
* Add data to filter.
* @param {Buffer} data
* @returns {Promise}
*/
SPVNode.prototype.addFilter = function addFilter(data) {
return Promise.resolve();
};
/**
* Reset filter.
* @returns {Promise}
*/
SPVNode.prototype.resetFilter = function resetFilter() {
return Promise.resolve();
};
/**
* Scan for any missed transactions.
* Note that this will replay the blockchain sync.
@ -267,21 +234,6 @@ SPVNode.prototype.scan = co(function* scan(start, filter, iter) {
}
});
/**
* Scan for any missed transactions.
* Note that this will replay the blockchain sync.
* @param {Number|Hash} start - Start block.
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
SPVNode.prototype.rescan = co(function* rescan(start, iter) {
if (!this.filter)
throw new Error('No filter set.');
return yield this.scan(start, this.filter, iter);
});
/**
* Watch the blockchain until a certain height.
* @param {Number} height
@ -320,16 +272,6 @@ SPVNode.prototype.watchBlock = co(function* watchBlock(entry, block) {
}
});
/**
* Estimate smart fee (returns network fee rate).
* @param {Number?} blocks
* @returns {Promise}
*/
SPVNode.prototype.estimateFee = function estimateFee(blocks) {
return Promise.resolve(this.network.feeRate);
};
/**
* Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from

View File

@ -36,6 +36,7 @@ var BlockMeta = records.BlockMeta;
var PathMapRecord = records.PathMapRecord;
var OutpointMapRecord = records.OutpointMapRecord;
var U32 = encoding.U32;
var cob = co.cob;
var DUMMY = new Buffer([0]);
/**
@ -63,13 +64,16 @@ function WalletDB(options) {
this.options = options;
this.network = Network.get(options.network);
this.logger = options.logger || Logger.global;
this.spv = options.spv || false;
this.client = options.client;
this.onError = this._onError.bind(this);
this.state = new ChainState();
this.depth = 0;
this.wallets = {};
this.keepBlocks = this.network.block.keepBlocks;
this.rescanning = false;
this.bound = false;
// We need one read lock for `get` and `create`.
// It will hold locks specific to wallet ids.
@ -86,7 +90,7 @@ function WalletDB(options) {
// lose membership, even if quality
// degrades.
// Memory used: 1.7mb
this.filter = Bloom.fromRate(1000000, 0.001, -1);
this.filter = Bloom.fromRate(1000000, 0.001, this.spv ? 1 : -1);
this.db = LDB({
location: this.options.location,
@ -122,10 +126,7 @@ WalletDB.prototype._open = co(function* open() {
if (this.options.wipeNoReally)
yield this.wipe();
yield this.init();
yield this.watch();
yield this.sync();
yield this.resend();
yield this.load();
this.logger.info(
'WalletDB loaded (depth=%d, height=%d, start=%d).',
@ -144,6 +145,8 @@ WalletDB.prototype._close = co(function* close() {
var keys = Object.keys(this.wallets);
var i, key, wallet;
yield this.disconnect();
for (i = 0; i < keys.length; i++) {
key = keys[i];
wallet = this.wallets[key];
@ -153,6 +156,136 @@ WalletDB.prototype._close = co(function* close() {
yield this.db.close();
});
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype._onError = function onError(err) {
this.emit('error', err);
};
/**
* Load the walletdb.
* @returns {Promise}
*/
WalletDB.prototype.load = co(function* load() {
var unlock = yield this.txLock.lock();
try {
yield this.connect();
yield this.init();
yield this.watch();
yield this.sync();
yield this.resend();
} finally {
unlock();
}
});
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype.bind = function bind() {
var self = this;
if (!this.client)
return;
if (this.bound)
return;
this.bound = true;
this.client.on('error', function(err) {
self.emit('error', err);
});
this.client.on('block connect', function(entry, txs) {
self.addBlock(entry, txs).catch(self.onError);
});
this.client.on('block disconnect', function(entry) {
self.removeBlock(entry).catch(self.onError);
});
this.client.on('block rescan', cob(function* (entry, txs) {
yield self.rescanBlock(entry, txs);
}));
this.client.on('tx', function(tx) {
self.addTX(tx).catch(self.onError);
});
this.client.on('chain reset', function(tip) {
self.resetChain(tip).catch(self.onError);
});
};
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype.connect = co(function* connect() {
if (!this.client)
return;
this.bind();
yield this.client.open({ raw: true });
});
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype.disconnect = co(function* disconnect() {
if (!this.client)
return;
yield this.client.close();
});
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype.init = co(function* init() {
var state = yield this.getState();
var startHeight = this.options.startHeight;
var tip;
if (state) {
this.state = state;
return;
}
if (this.client) {
if (startHeight != null) {
tip = yield this.client.getEntry(startHeight);
if (!tip)
throw new Error('WDB: Could not find start block.');
} else {
tip = yield this.client.getTip();
}
tip = BlockMeta.fromEntry(tip);
} else {
tip = BlockMeta.fromEntry(this.network.genesis);
}
this.logger.info(
'Initializing WalletDB chain state at %s (%d).',
util.revHex(tip.hash), tip.height);
yield this.resetState(tip, false);
});
/**
* Watch addresses and outpoints.
* @private
@ -178,7 +311,6 @@ WalletDB.prototype.watch = co(function* watch() {
try {
data = layout.pp(item.key);
this.filter.add(data, 'hex');
this.emit('watch', data);
} catch (e) {
yield iter.end();
throw e;
@ -203,7 +335,6 @@ WalletDB.prototype.watch = co(function* watch() {
outpoint = new Outpoint(items[0], items[1]);
data = outpoint.toRaw();
this.filter.add(data);
this.emit('watch', data);
} catch (e) {
yield iter.end();
throw e;
@ -212,33 +343,19 @@ WalletDB.prototype.watch = co(function* watch() {
outpoints++;
}
this.logger.info('Adding %d hashes to WalletDB filter.', hashes);
this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints);
this.logger.info('Added %d hashes to WalletDB filter.', hashes);
this.logger.info('Added %d outpoints to WalletDB filter.', outpoints);
yield this.loadFilter();
});
/**
* Connect and sync with the chain server.
* @returns {Promise}
*/
WalletDB.prototype.sync = co(function* sync() {
var unlock = yield this.txLock.lock();
try {
return yield this._sync();
} finally {
unlock();
}
});
/**
* Connect and sync with the chain server (without a lock).
* @private
* @returns {Promise}
*/
WalletDB.prototype._sync = co(function* connect() {
WalletDB.prototype.sync = co(function* sync() {
var height = this.state.height;
var tip, entry;
@ -270,32 +387,6 @@ WalletDB.prototype._sync = co(function* connect() {
yield this.scan(height);
});
/**
* Force a rescan.
* @param {Number} height
* @returns {Promise}
*/
WalletDB.prototype.rescan = co(function* rescan(height) {
var unlock = yield this.txLock.lock();
try {
return yield this._rescan(height);
} finally {
unlock();
}
});
/**
* Force a rescan (without a lock).
* @private
* @param {Number} height
* @returns {Promise}
*/
WalletDB.prototype._rescan = co(function* rescan(height) {
return yield this.scan(height);
});
/**
* Rescan blockchain from a given height.
* @private
@ -332,6 +423,32 @@ WalletDB.prototype.scan = co(function* scan(height) {
}
});
/**
* Force a rescan.
* @param {Number} height
* @returns {Promise}
*/
WalletDB.prototype.rescan = co(function* rescan(height) {
var unlock = yield this.txLock.lock();
try {
return yield this._rescan(height);
} finally {
unlock();
}
});
/**
* Force a rescan (without a lock).
* @private
* @param {Number} height
* @returns {Promise}
*/
WalletDB.prototype._rescan = co(function* rescan(height) {
return yield this.scan(height);
});
/**
* Broadcast a transaction via chain server.
* @param {TX} tx
@ -596,7 +713,6 @@ WalletDB.prototype.testFilter = function testFilter(data) {
WalletDB.prototype.addHash = function addHash(hash) {
this.filter.add(hash, 'hex');
this.emit('watch', hash);
return this.addFilter(hash);
};
@ -608,10 +724,7 @@ WalletDB.prototype.addHash = function addHash(hash) {
WalletDB.prototype.addOutpoint = function addOutpoint(hash, i) {
var outpoint = new Outpoint(hash, i);
var data = outpoint.toRaw();
this.filter.add(outpoint.toRaw());
this.emit('watch', data);
};
/**
@ -1468,40 +1581,6 @@ WalletDB.prototype.getWalletsByTX = co(function* getWalletsByTX(tx) {
return result;
});
/**
* Write the genesis block as the best hash.
* @returns {Promise}
*/
WalletDB.prototype.init = co(function* init() {
var state = yield this.getState();
var tip;
if (state) {
this.state = state;
return;
}
if (this.client) {
if (this.options.startHeight != null) {
tip = yield this.client.getEntry(this.options.startHeight);
if (!tip)
throw new Error('WDB: Could not find start block.');
} else {
tip = yield this.client.getTip();
}
tip = BlockMeta.fromEntry(tip);
} else {
tip = BlockMeta.fromEntry(this.network.genesis);
}
this.logger.info(
'Initializing WalletDB chain state at %s (%d).',
util.revHex(tip.hash), tip.height);
yield this.resetState(tip, false);
});
/**
* Get the best block hash.
* @returns {Promise}
@ -1964,6 +2043,27 @@ WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
return block.txs.length;
});
/**
* Rescan a block.
* @private
* @param {ChainEntry} entry
* @returns {Promise}
*/
WalletDB.prototype.rescanBlock = co(function* rescanBlock(entry, txs) {
if (!this.rescanning) {
this.logger.warning('Unsolicited rescan block: %s.', entry.height);
return;
}
try {
yield this._addBlock(entry, txs);
} catch (e) {
this.emit('error', e);
throw e;
}
});
/**
* Add a transaction to the database, map addresses
* to wallet IDs, potentially store orphans, resolve

View File

@ -426,6 +426,11 @@ describe('Chain', function() {
assert.equal(err.reason, 'bad-txns-nonfinal');
}));
it('should rescan for transactions', cob(function* () {
yield walletdb.rescan(0);
assert.equal(wallet.state.confirmed, 1289250000000);
}));
it('should cleanup', cob(function* () {
constants.tx.COINBASE_MATURITY = 100;
yield node.close();