diff --git a/lib/http/server.js b/lib/http/server.js index 462b80cd..60189f11 100644 --- a/lib/http/server.js +++ b/lib/http/server.js @@ -1361,6 +1361,37 @@ HTTPServer.prototype._initIO = function _initIO() { callback(); }); + socket.on('get tip', function(args, callback) { + callback(null, socket.frameEntry(self.chain.tip)); + }); + + socket.on('get entry', co(function* (args, callback) { + var block = args[0]; + var entry; + + if (typeof block === 'string') { + if (!util.isHex256(block)) + return callback({ error: 'Invalid parameter.' }); + block = util.revHex(block); + } else { + if (!util.isUInt32(block)) + return callback({ error: 'Invalid parameter.' }); + } + + try { + entry = yield self.chain.db.get(block); + if (!(yield entry.isMainChain())) + entry = null; + } catch (e) { + return callback({ error: e.message }); + } + + if (!entry) + return callback(null, null); + + callback(null, socket.frameEntry(entry)); + })); + socket.on('add filter', function(args, callback) { var chunks = args[0]; @@ -1698,19 +1729,22 @@ ClientSocket.prototype.unbindAll = function unbindAll() { ClientSocket.prototype.watchChain = function watchChain() { var self = this; - var onError = this.onError.bind(this); var pool = this.mempool || this.pool; this.bind(this.chain, 'connect', function(entry, block) { - self.connectBlock(entry, block).catch(onError); + self.connectBlock(entry, block); }); this.bind(this.chain, 'disconnect', function(entry, block) { - self.disconnectBlock(entry, block).catch(onError); + self.disconnectBlock(entry, block); + }); + + this.bind(this.chain, 'reset', function(tip) { + self.emit('chain reset', self.frameEntry(tip)); }); this.bind(pool, 'tx', function(tx) { - self.sendTX(tx).catch(onError); + self.sendTX(tx); }); }; @@ -1723,93 +1757,55 @@ ClientSocket.prototype.unwatchChain = function unwatchChain() { var pool = this.mempool || this.pool; this.unbind(this.chain, 'connect'); this.unbind(this.chain, 'disconnect'); + this.unbind(this.chain, 'reset'); this.unbind(pool, 'tx'); }; -ClientSocket.prototype.connectBlock = co(function* _connectBlock(entry, block) { - var unlock = this.locker.lock(); - try { - return yield this._connectBlock(entry, block); - } finally { - unlock(); - } -}); +ClientSocket.prototype.connectBlock = function connectBlock(entry, block) { + var raw = this.frameEntry(entry); + var txs; -ClientSocket.prototype._connectBlock = function _connectBlock(entry, block) { - var self = this; - return new Promise(function(resolve, reject) { - var cb = co.wrap(resolve, reject); - var raw = self.frameEntry(entry); - var txs; + this.emit('entry connect', raw); - self.emit('entry connect', raw); + if (!this.filter) + return; - txs = self.testBlock(block); + txs = this.filterBlock(block); - if (!txs) - return cb(); - - self.emit('block connect', raw, txs, cb); - }); + this.emit('block connect', raw, txs); }; -ClientSocket.prototype.disconnectBlock = co(function* _disconnectBlock(entry, block) { - var unlock = this.locker.lock(); - try { - return yield this._disconnectBlock(entry, block); - } finally { - unlock(); - } -}); +ClientSocket.prototype.disconnectBlock = function disconnectBlock(entry, block) { + var raw = this.frameEntry(entry); -ClientSocket.prototype._disconnectBlock = function _disconnectBlock(entry, block) { - var self = this; - return new Promise(function(resolve, reject) { - var cb = co.wrap(resolve, reject); - var raw = self.frameEntry(entry); + this.emit('entry disconnect', raw); - self.emit('entry disconnect', raw); + if (!this.filter) + return; - if (!self.filter) - return cb(); - - self.emit('block disconnect', raw, cb); - }); + this.emit('block disconnect', raw); }; -ClientSocket.prototype.sendTX = co(function* sendTX(tx) { - var unlock = this.locker.lock(); - try { - return yield this._sendTX(tx); - } finally { - unlock(); - } -}); +ClientSocket.prototype.sendTX = function sendTX(tx) { + var raw; -ClientSocket.prototype._sendTX = function _sendTX(tx) { - var self = this; - return new Promise(function(resolve, reject) { - var cb = co.wrap(resolve, reject); - var raw; + if (!this.filterTX(tx)) + return; - if (!self.testFilter(tx)) - return cb(); + raw = this.frameTX(tx); - raw = self.frameTX(tx); - - self.emit('tx', raw, cb); - }); + this.emit('tx', raw); }; ClientSocket.prototype.rescanBlock = function rescanBlock(entry, txs) { var self = this; return new Promise(function(resolve, reject) { - var cb = co.wrap(resolve, reject); + var cb = TimedCB.wrap(resolve, reject); self.emit('block rescan', entry, txs, cb); }); }; -ClientSocket.prototype.testBlock = function testBlock(block) { +ClientSocket.prototype.filterBlock = function filterBlock(block) { var txs = []; var i, tx; @@ -1818,17 +1814,14 @@ ClientSocket.prototype.testBlock = function testBlock(block) { for (i = 0; i < block.txs.length; i++) { tx = block.txs[i]; - if (this.testFilter(tx)) + if (this.filterTX(tx)) txs.push(this.frameTX(tx)); } - if (txs.length === 0) - return; - return txs; }; -ClientSocket.prototype.testFilter = function testFilter(tx) { +ClientSocket.prototype.filterTX = function filterTX(tx) { var found = false; var i, hash, input, prevout, output; @@ -1999,6 +1992,46 @@ function getMemory() { }; } +/** + * TimedCB + * @constructor + */ + +function TimedCB(resolve, reject) { + this.resolve = resolve; + this.reject = reject; + this.done = false; +} + +TimedCB.wrap = function wrap(resolve, reject) { + return new TimedCB(resolve, reject).start(); +}; + +TimedCB.prototype.start = function start() { + var self = this; + var timeout; + + timeout = setTimeout(function() { + self.cleanup(timeout); + self.reject(new Error('Callback timed out.')); + }, 5000); + + return function(err) { + self.cleanup(timeout); + if (err) { + self.reject(err); + return; + } + self.resolve(); + }; +}; + +TimedCB.prototype.cleanup = function cleanup(timeout) { + assert(!this.done); + this.done = true; + clearTimeout(timeout); +}; + /* * Expose */ diff --git a/lib/net/pool.js b/lib/net/pool.js index e7ff90b0..03f77bd3 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1473,6 +1473,19 @@ Pool.prototype.removePeer = function removePeer(peer) { } }; +/** + * Set the spv filter. + * @param {Bloom} filter + * @param {String?} enc + */ + +Pool.prototype.setFilter = function setFilter(filter) { + if (!this.options.spv) + return; + this.spvFilter = filter; + this.updateWatch(); +}; + /** * Watch a an address hash (filterload, SPV-only). * @param {Buffer|Hash} data diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index 86ddedec..a6b40329 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -136,7 +136,7 @@ function FullNode(options) { this.walletdb = new WalletDB({ network: this.network, logger: this.logger, - client: this, + client: this.client, db: this.options.db, location: this.location('walletdb'), witness: false, @@ -195,7 +195,6 @@ FullNode.prototype._init = function _init() { this.mempool.on('tx', function(tx) { self.emit('tx', tx); - self.walletdb.addTX(tx).catch(onError); self.miner.notifyEntry(); }); @@ -204,21 +203,21 @@ FullNode.prototype._init = function _init() { }); this.chain.on('connect', function(entry, block) { - self.walletdb.addBlock(entry, block.txs).catch(onError); + self.emit('connect', entry, block); if (self.chain.synced) self.mempool.addBlock(block).catch(onError); }); this.chain.on('disconnect', function(entry, block) { - self.walletdb.removeBlock(entry).catch(onError); + self.emit('disconnect', entry, block); if (self.chain.synced) self.mempool.removeBlock(block).catch(onError); }); this.chain.on('reset', function(tip) { - self.walletdb.resetChain(tip).catch(onError); + self.emit('reset', tip); }); this.miner.on('block', function(block) { @@ -275,36 +274,6 @@ FullNode.prototype._close = co(function* close() { this.logger.info('Node is closed.'); }); -/** - * Set bloom filter. - * @param {Bloom} filter - * @returns {Promise} - */ - -FullNode.prototype.loadFilter = function loadFilter(filter) { - this.filter = filter; - return Promise.resolve(); -}; - -/** - * Add data to filter. - * @param {Buffer} data - * @returns {Promise} - */ - -FullNode.prototype.addFilter = function addFilter(data) { - return Promise.resolve(); -}; - -/** - * Reset filter. - * @returns {Promise} - */ - -FullNode.prototype.resetFilter = function resetFilter() { - return Promise.resolve(); -}; - /** * Rescan for any missed transactions. * @param {Number|Hash} start - Start block. @@ -317,30 +286,6 @@ FullNode.prototype.scan = function scan(start, filter, iter) { return this.chain.scan(start, filter, iter); }; -/** - * Rescan for any missed transactions. - * @param {Number|Hash} start - Start block. - * @param {Function} iter - Iterator. - * @returns {Promise} - */ - -FullNode.prototype.rescan = co(function* rescan(start, iter) { - if (!this.filter) - throw new Error('No filter set.'); - - yield this.scan(start, this.filter, iter); -}); - -/** - * Esimate smart fee. - * @param {Number?} blocks - * @returns {Promise} - */ - -FullNode.prototype.estimateFee = function estimateFee(blocks) { - return Promise.resolve(this.fees.estimateFee(blocks)); -}; - /** * Broadcast a transaction (note that this will _not_ be verified * by the mempool - use with care, lest you get banned from diff --git a/lib/node/index.js b/lib/node/index.js index 3edaae4d..a7df515a 100644 --- a/lib/node/index.js +++ b/lib/node/index.js @@ -4,4 +4,5 @@ exports.config = require('./config'); exports.FullNode = require('./fullnode'); exports.Logger = require('./logger'); exports.Node = require('./node'); +exports.NodeClient = require('./nodeclient'); exports.SPVNode = require('./spvnode'); diff --git a/lib/node/node.js b/lib/node/node.js index 0c23089d..d6f7bd3a 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -13,6 +13,7 @@ var co = require('../utils/co'); var assert = require('assert'); var Network = require('../protocol/network'); var Logger = require('./logger'); +var NodeClient = require('./nodeclient'); var time = require('../net/time'); var workerPool = require('../workers/workerpool').pool; @@ -49,7 +50,9 @@ function Node(options) { this.walletdb = null; this.wallet = null; this.http = null; - this.filter = null; + + // Local client for walletdb + this.client = new NodeClient(this); this._bound = []; @@ -259,47 +262,6 @@ Node.prototype.openWallet = co(function* openWallet() { this.wallet = wallet; }); -/** - * Get chain tip. - * @returns {Promise} - */ - -Node.prototype.getTip = function getTip() { - return Promise.resolve(this.chain.tip); -}; - -/** - * Get chain entry. - * @param {Hash} hash - * @returns {Promise} - */ - -Node.prototype.getEntry = co(function* getEntry(hash) { - var entry = yield this.chain.db.get(hash); - - if (!entry) - return; - - if (!(yield entry.isMainChain())) - return; - - return entry; -}); - -/** - * Send a transaction. Do not wait for promise. - * @param {TX} tx - * @returns {Promise} - */ - -Node.prototype.send = function send(tx) { - var onError = this._error.bind(this); - - this.sendTX(tx).catch(onError); - - return Promise.resolve(); -}; - /* * Expose */ diff --git a/lib/node/spvnode.js b/lib/node/spvnode.js index e0be2f99..17f4c841 100644 --- a/lib/node/spvnode.js +++ b/lib/node/spvnode.js @@ -79,7 +79,7 @@ function SPVNode(options) { this.walletdb = new WalletDB({ network: this.network, logger: this.logger, - client: this, + client: this.client, db: this.options.db, location: this.location('walletdb'), witness: false, @@ -87,7 +87,8 @@ function SPVNode(options) { startHeight: this.options.startHeight, wipeNoReally: this.options.wipeNoReally, resolution: true, - verify: true + verify: true, + spv: true }); if (!HTTPServer.unsupported) { @@ -155,19 +156,15 @@ SPVNode.prototype._init = function _init() { return; } - self.walletdb.addBlock(entry, block.txs).catch(onError); + self.emit('connect', entry, block); }); this.chain.on('disconnect', function(entry, block) { - self.walletdb.removeBlock(entry).catch(onError); + self.emit('disconnect', entry); }); this.chain.on('reset', function(tip) { - self.walletdb.resetChain(tip).catch(onError); - }); - - this.walletdb.on('watch', function(data) { - self.pool.watch(data); + self.emit('reset', tip); }); }; @@ -211,36 +208,6 @@ SPVNode.prototype._close = co(function* close() { yield this.chain.close(); }); -/** - * Set bloom filter. - * @param {Bloom} filter - * @returns {Promise} - */ - -SPVNode.prototype.loadFilter = function loadFilter(filter) { - this.filter = filter; - return Promise.resolve(); -}; - -/** - * Add data to filter. - * @param {Buffer} data - * @returns {Promise} - */ - -SPVNode.prototype.addFilter = function addFilter(data) { - return Promise.resolve(); -}; - -/** - * Reset filter. - * @returns {Promise} - */ - -SPVNode.prototype.resetFilter = function resetFilter() { - return Promise.resolve(); -}; - /** * Scan for any missed transactions. * Note that this will replay the blockchain sync. @@ -267,21 +234,6 @@ SPVNode.prototype.scan = co(function* scan(start, filter, iter) { } }); -/** - * Scan for any missed transactions. - * Note that this will replay the blockchain sync. - * @param {Number|Hash} start - Start block. - * @param {Function} iter - Iterator. - * @returns {Promise} - */ - -SPVNode.prototype.rescan = co(function* rescan(start, iter) { - if (!this.filter) - throw new Error('No filter set.'); - - return yield this.scan(start, this.filter, iter); -}); - /** * Watch the blockchain until a certain height. * @param {Number} height @@ -320,16 +272,6 @@ SPVNode.prototype.watchBlock = co(function* watchBlock(entry, block) { } }); -/** - * Estimate smart fee (returns network fee rate). - * @param {Number?} blocks - * @returns {Promise} - */ - -SPVNode.prototype.estimateFee = function estimateFee(blocks) { - return Promise.resolve(this.network.feeRate); -}; - /** * Broadcast a transaction (note that this will _not_ be verified * by the mempool - use with care, lest you get banned from diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index c522b5b3..741b5b7a 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -36,6 +36,7 @@ var BlockMeta = records.BlockMeta; var PathMapRecord = records.PathMapRecord; var OutpointMapRecord = records.OutpointMapRecord; var U32 = encoding.U32; +var cob = co.cob; var DUMMY = new Buffer([0]); /** @@ -63,13 +64,16 @@ function WalletDB(options) { this.options = options; this.network = Network.get(options.network); this.logger = options.logger || Logger.global; + this.spv = options.spv || false; this.client = options.client; + this.onError = this._onError.bind(this); this.state = new ChainState(); this.depth = 0; this.wallets = {}; this.keepBlocks = this.network.block.keepBlocks; this.rescanning = false; + this.bound = false; // We need one read lock for `get` and `create`. // It will hold locks specific to wallet ids. @@ -86,7 +90,7 @@ function WalletDB(options) { // lose membership, even if quality // degrades. // Memory used: 1.7mb - this.filter = Bloom.fromRate(1000000, 0.001, -1); + this.filter = Bloom.fromRate(1000000, 0.001, this.spv ? 1 : -1); this.db = LDB({ location: this.options.location, @@ -122,10 +126,7 @@ WalletDB.prototype._open = co(function* open() { if (this.options.wipeNoReally) yield this.wipe(); - yield this.init(); - yield this.watch(); - yield this.sync(); - yield this.resend(); + yield this.load(); this.logger.info( 'WalletDB loaded (depth=%d, height=%d, start=%d).', @@ -144,6 +145,8 @@ WalletDB.prototype._close = co(function* close() { var keys = Object.keys(this.wallets); var i, key, wallet; + yield this.disconnect(); + for (i = 0; i < keys.length; i++) { key = keys[i]; wallet = this.wallets[key]; @@ -153,6 +156,136 @@ WalletDB.prototype._close = co(function* close() { yield this.db.close(); }); +/** + * Write the genesis block as the best hash. + * @returns {Promise} + */ + +WalletDB.prototype._onError = function onError(err) { + this.emit('error', err); +}; + +/** + * Load the walletdb. + * @returns {Promise} + */ + +WalletDB.prototype.load = co(function* load() { + var unlock = yield this.txLock.lock(); + try { + yield this.connect(); + yield this.init(); + yield this.watch(); + yield this.sync(); + yield this.resend(); + } finally { + unlock(); + } +}); + +/** + * Write the genesis block as the best hash. + * @returns {Promise} + */ + +WalletDB.prototype.bind = function bind() { + var self = this; + + if (!this.client) + return; + + if (this.bound) + return; + + this.bound = true; + + this.client.on('error', function(err) { + self.emit('error', err); + }); + + this.client.on('block connect', function(entry, txs) { + self.addBlock(entry, txs).catch(self.onError); + }); + + this.client.on('block disconnect', function(entry) { + self.removeBlock(entry).catch(self.onError); + }); + + this.client.on('block rescan', cob(function* (entry, txs) { + yield self.rescanBlock(entry, txs); + })); + + this.client.on('tx', function(tx) { + self.addTX(tx).catch(self.onError); + }); + + this.client.on('chain reset', function(tip) { + self.resetChain(tip).catch(self.onError); + }); +}; + +/** + * Write the genesis block as the best hash. + * @returns {Promise} + */ + +WalletDB.prototype.connect = co(function* connect() { + if (!this.client) + return; + + this.bind(); + + yield this.client.open({ raw: true }); +}); + +/** + * Write the genesis block as the best hash. + * @returns {Promise} + */ + +WalletDB.prototype.disconnect = co(function* disconnect() { + if (!this.client) + return; + + yield this.client.close(); +}); + +/** + * Write the genesis block as the best hash. + * @returns {Promise} + */ + +WalletDB.prototype.init = co(function* init() { + var state = yield this.getState(); + var startHeight = this.options.startHeight; + var tip; + + if (state) { + this.state = state; + return; + } + + if (this.client) { + if (startHeight != null) { + tip = yield this.client.getEntry(startHeight); + if (!tip) + throw new Error('WDB: Could not find start block.'); + } else { + tip = yield this.client.getTip(); + } + tip = BlockMeta.fromEntry(tip); + } else { + tip = BlockMeta.fromEntry(this.network.genesis); + } + + this.logger.info( + 'Initializing WalletDB chain state at %s (%d).', + util.revHex(tip.hash), tip.height); + + yield this.resetState(tip, false); +}); + + /** * Watch addresses and outpoints. * @private @@ -178,7 +311,6 @@ WalletDB.prototype.watch = co(function* watch() { try { data = layout.pp(item.key); this.filter.add(data, 'hex'); - this.emit('watch', data); } catch (e) { yield iter.end(); throw e; @@ -203,7 +335,6 @@ WalletDB.prototype.watch = co(function* watch() { outpoint = new Outpoint(items[0], items[1]); data = outpoint.toRaw(); this.filter.add(data); - this.emit('watch', data); } catch (e) { yield iter.end(); throw e; @@ -212,33 +343,19 @@ WalletDB.prototype.watch = co(function* watch() { outpoints++; } - this.logger.info('Adding %d hashes to WalletDB filter.', hashes); - this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints); + this.logger.info('Added %d hashes to WalletDB filter.', hashes); + this.logger.info('Added %d outpoints to WalletDB filter.', outpoints); yield this.loadFilter(); }); /** * Connect and sync with the chain server. - * @returns {Promise} - */ - -WalletDB.prototype.sync = co(function* sync() { - var unlock = yield this.txLock.lock(); - try { - return yield this._sync(); - } finally { - unlock(); - } -}); - -/** - * Connect and sync with the chain server (without a lock). * @private * @returns {Promise} */ -WalletDB.prototype._sync = co(function* connect() { +WalletDB.prototype.sync = co(function* sync() { var height = this.state.height; var tip, entry; @@ -270,32 +387,6 @@ WalletDB.prototype._sync = co(function* connect() { yield this.scan(height); }); -/** - * Force a rescan. - * @param {Number} height - * @returns {Promise} - */ - -WalletDB.prototype.rescan = co(function* rescan(height) { - var unlock = yield this.txLock.lock(); - try { - return yield this._rescan(height); - } finally { - unlock(); - } -}); - -/** - * Force a rescan (without a lock). - * @private - * @param {Number} height - * @returns {Promise} - */ - -WalletDB.prototype._rescan = co(function* rescan(height) { - return yield this.scan(height); -}); - /** * Rescan blockchain from a given height. * @private @@ -332,6 +423,32 @@ WalletDB.prototype.scan = co(function* scan(height) { } }); +/** + * Force a rescan. + * @param {Number} height + * @returns {Promise} + */ + +WalletDB.prototype.rescan = co(function* rescan(height) { + var unlock = yield this.txLock.lock(); + try { + return yield this._rescan(height); + } finally { + unlock(); + } +}); + +/** + * Force a rescan (without a lock). + * @private + * @param {Number} height + * @returns {Promise} + */ + +WalletDB.prototype._rescan = co(function* rescan(height) { + return yield this.scan(height); +}); + /** * Broadcast a transaction via chain server. * @param {TX} tx @@ -596,7 +713,6 @@ WalletDB.prototype.testFilter = function testFilter(data) { WalletDB.prototype.addHash = function addHash(hash) { this.filter.add(hash, 'hex'); - this.emit('watch', hash); return this.addFilter(hash); }; @@ -608,10 +724,7 @@ WalletDB.prototype.addHash = function addHash(hash) { WalletDB.prototype.addOutpoint = function addOutpoint(hash, i) { var outpoint = new Outpoint(hash, i); - var data = outpoint.toRaw(); - this.filter.add(outpoint.toRaw()); - this.emit('watch', data); }; /** @@ -1468,40 +1581,6 @@ WalletDB.prototype.getWalletsByTX = co(function* getWalletsByTX(tx) { return result; }); -/** - * Write the genesis block as the best hash. - * @returns {Promise} - */ - -WalletDB.prototype.init = co(function* init() { - var state = yield this.getState(); - var tip; - - if (state) { - this.state = state; - return; - } - - if (this.client) { - if (this.options.startHeight != null) { - tip = yield this.client.getEntry(this.options.startHeight); - if (!tip) - throw new Error('WDB: Could not find start block.'); - } else { - tip = yield this.client.getTip(); - } - tip = BlockMeta.fromEntry(tip); - } else { - tip = BlockMeta.fromEntry(this.network.genesis); - } - - this.logger.info( - 'Initializing WalletDB chain state at %s (%d).', - util.revHex(tip.hash), tip.height); - - yield this.resetState(tip, false); -}); - /** * Get the best block hash. * @returns {Promise} @@ -1964,6 +2043,27 @@ WalletDB.prototype._removeBlock = co(function* removeBlock(entry) { return block.txs.length; }); +/** + * Rescan a block. + * @private + * @param {ChainEntry} entry + * @returns {Promise} + */ + +WalletDB.prototype.rescanBlock = co(function* rescanBlock(entry, txs) { + if (!this.rescanning) { + this.logger.warning('Unsolicited rescan block: %s.', entry.height); + return; + } + + try { + yield this._addBlock(entry, txs); + } catch (e) { + this.emit('error', e); + throw e; + } +}); + /** * Add a transaction to the database, map addresses * to wallet IDs, potentially store orphans, resolve diff --git a/test/chain-test.js b/test/chain-test.js index 20a38d6d..be92f4ed 100644 --- a/test/chain-test.js +++ b/test/chain-test.js @@ -426,6 +426,11 @@ describe('Chain', function() { assert.equal(err.reason, 'bad-txns-nonfinal'); })); + it('should rescan for transactions', cob(function* () { + yield walletdb.rescan(0); + assert.equal(wallet.state.confirmed, 1289250000000); + })); + it('should cleanup', cob(function* () { constants.tx.COINBASE_MATURITY = 100; yield node.close();