diff --git a/lib/http/server.js b/lib/http/server.js index 49085850..462b80cd 100644 --- a/lib/http/server.js +++ b/lib/http/server.js @@ -1313,6 +1313,18 @@ HTTPServer.prototype._initIO = function _initIO() { callback(); }); + socket.on('options', function(args, callback) { + var options = args[0]; + + try { + socket.setOptions(options); + } catch (e) { + return callback({ error: e.message }); + } + + callback(); + }); + socket.on('watch chain', function(args, callback) { if (!socket.api) return callback({ error: 'Not authorized.' }); @@ -1329,7 +1341,27 @@ HTTPServer.prototype._initIO = function _initIO() { callback(); }); - socket.on('watch data', function(args, callback) { + socket.on('set filter', function(args, callback) { + var data = args[0]; + var filter; + + if (!util.isHex(data)) + return callback({ error: 'Invalid parameter.' }); + + if (!socket.api) + return callback({ error: 'Not authorized.' }); + + try { + filter = Bloom.fromRaw(data, 'hex'); + socket.setFilter(filter); + } catch (e) { + return callback({ error: e.message }); + } + + callback(); + }); + + socket.on('add filter', function(args, callback) { var chunks = args[0]; if (!Array.isArray(chunks)) @@ -1347,6 +1379,19 @@ HTTPServer.prototype._initIO = function _initIO() { callback(); }); + socket.on('reset filter', function(args, callback) { + if (!socket.api) + return callback({ error: 'Not authorized.' }); + + try { + socket.resetFilter(); + } catch (e) { + return callback({ error: e.message }); + } + + callback(); + }); + socket.on('estimate fee', function(args, callback) { var blocks = args[0]; var rate; @@ -1354,9 +1399,6 @@ HTTPServer.prototype._initIO = function _initIO() { if (blocks != null && !util.isNumber(blocks)) return callback({ error: 'Invalid parameter.' }); - if (!socket.api) - return callback({ error: 'Not authorized.' }); - if (!self.fees) { rate = self.network.feeRate; rate = Amount.btc(rate); @@ -1376,9 +1418,6 @@ HTTPServer.prototype._initIO = function _initIO() { if (!util.isHex(data)) return callback({ error: 'Invalid parameter.' }); - if (!socket.api) - return callback({ error: 'Not authorized.' }); - try { tx = TX.fromRaw(data, 'hex'); } catch (e) { @@ -1388,7 +1427,7 @@ HTTPServer.prototype._initIO = function _initIO() { self.node.send(tx); }); - socket.on('scan', function(args, callback) { + socket.on('rescan', function(args, callback) { var start = args[0]; if (!util.isHex256(start) && !util.isUInt32(start)) @@ -1544,6 +1583,7 @@ function ClientSocket(server, socket) { this.auth = false; this.filter = null; this.api = false; + this.raw = false; this.network = this.server.network; this.node = this.server.node; @@ -1589,11 +1629,24 @@ ClientSocket.prototype._init = function _init() { }); }; +ClientSocket.prototype.setOptions = function setOptions(options) { + assert(options && typeof options === 'object', 'Invalid parameter.'); + + if (options.raw != null) { + assert(typeof options.raw === 'boolean', 'Invalid parameter.'); + this.raw = options.raw; + } +}; + +ClientSocket.prototype.setFilter = function setFilter(filter) { + this.filter = filter; +}; + ClientSocket.prototype.addFilter = function addFilter(chunks) { var i, data; if (!this.filter) - this.filter = Bloom.fromRate(100000, 0.001, -1); + throw new Error('No filter set.'); for (i = 0; i < chunks.length; i++) { data = chunks[i]; @@ -1608,6 +1661,13 @@ ClientSocket.prototype.addFilter = function addFilter(chunks) { } }; +ClientSocket.prototype.resetFilter = function resetFilter() { + if (!this.filter) + throw new Error('No filter set.'); + + this.filter.reset(); +}; + ClientSocket.prototype.bind = function bind(obj, event, listener) { this.events.push([obj, event, listener]); obj.on(event, listener); @@ -1638,29 +1698,27 @@ ClientSocket.prototype.unbindAll = function unbindAll() { ClientSocket.prototype.watchChain = function watchChain() { var self = this; + var onError = this.onError.bind(this); var pool = this.mempool || this.pool; this.bind(this.chain, 'connect', function(entry, block) { - var txs; - - self.emit('block connect', entry.toJSON()); - - txs = self.testBlock(block); - - if (txs) - self.emit('block tx', entry.toJSON(), txs); + self.connectBlock(entry, block).catch(onError); }); this.bind(this.chain, 'disconnect', function(entry, block) { - self.emit('block disconnect', entry.toJSON()); + self.disconnectBlock(entry, block).catch(onError); }); this.bind(pool, 'tx', function(tx) { - if (self.testFilter(tx)) - self.emit('mempool tx', tx.toJSON(self.network)); + self.sendTX(tx).catch(onError); }); }; +ClientSocket.prototype.onError = function onError(err) { + var emit = EventEmitter.prototype.emit; + emit.call(this, 'error', err); +}; + ClientSocket.prototype.unwatchChain = function unwatchChain() { var pool = this.mempool || this.pool; this.unbind(this.chain, 'connect'); @@ -1668,6 +1726,89 @@ ClientSocket.prototype.unwatchChain = function unwatchChain() { this.unbind(pool, 'tx'); }; +ClientSocket.prototype.connectBlock = co(function* _connectBlock(entry, block) { + var unlock = this.locker.lock(); + try { + return yield this._connectBlock(entry, block); + } finally { + unlock(); + } +}); + +ClientSocket.prototype._connectBlock = function _connectBlock(entry, block) { + var self = this; + return new Promise(function(resolve, reject) { + var cb = co.wrap(resolve, reject); + var raw = self.frameEntry(entry); + var txs; + + self.emit('entry connect', raw); + + txs = self.testBlock(block); + + if (!txs) + return cb(); + + self.emit('block connect', raw, txs, cb); + }); +}; + +ClientSocket.prototype.disconnectBlock = co(function* _disconnectBlock(entry, block) { + var unlock = this.locker.lock(); + try { + return yield this._disconnectBlock(entry, block); + } finally { + unlock(); + } +}); + +ClientSocket.prototype._disconnectBlock = function _disconnectBlock(entry, block) { + var self = this; + return new Promise(function(resolve, reject) { + var cb = co.wrap(resolve, reject); + var raw = self.frameEntry(entry); + + self.emit('entry disconnect', raw); + + if (!self.filter) + return cb(); + + self.emit('block disconnect', raw, cb); + }); +}; + +ClientSocket.prototype.sendTX = co(function* sendTX(tx) { + var unlock = this.locker.lock(); + try { + return yield this._sendTX(tx); + } finally { + unlock(); + } +}); + +ClientSocket.prototype._sendTX = function _sendTX(tx) { + var self = this; + return new Promise(function(resolve, reject) { + var cb = co.wrap(resolve, reject); + var raw; + + if (!self.testFilter(tx)) + return cb(); + + raw = self.frameTX(tx); + + self.emit('tx', raw, cb); + }); +}; + +ClientSocket.prototype.rescanBlock = function rescanBlock(entry, txs) { + var self = this; + return new Promise(function(resolve, reject) { + var cb = co.wrap(resolve, reject); + self.emit('block rescan', entry, txs, cb); + }); +}; + ClientSocket.prototype.testBlock = function testBlock(block) { var txs = []; var i, tx; @@ -1678,7 +1819,7 @@ ClientSocket.prototype.testBlock = function testBlock(block) { for (i = 0; i < block.txs.length; i++) { tx = block.txs[i]; if (this.testFilter(tx)) - txs.push(tx.toJSON(this.network)); + txs.push(this.frameTX(tx)); } if (txs.length === 0) @@ -1688,31 +1829,8 @@ ClientSocket.prototype.testBlock = function testBlock(block) { }; ClientSocket.prototype.testFilter = function testFilter(tx) { - if (this.chain.db.options.spv) - return this.testFilterSPV(tx); - return this.testFilterFull(tx); -}; - -ClientSocket.prototype.testFilterFull = function testFilterFull(tx) { - var i, hashes, hash; - - if (!this.filter) - return false; - - hashes = tx.getHashes(); - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - if (this.filter.test(hash)) - return true; - } - - return false; -}; - -ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) { var found = false; - var i, hash, input, prevout, output, outpoint; + var i, hash, input, prevout, output; if (!this.filter) return false; @@ -1725,8 +1843,8 @@ ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) { continue; if (this.filter.test(hash)) { - outpoint = Outpoint.fromTX(tx, i); - this.filter.add(outpoint.toRaw()); + prevout = Outpoint.fromTX(tx, i); + this.filter.add(prevout.toRaw()); found = true; } } @@ -1752,15 +1870,26 @@ ClientSocket.prototype.scan = co(function* scan(start) { }); ClientSocket.prototype.scanner = function scanner(entry, txs) { - var json = new Array(txs.length); + var block = this.frameEntry(entry); + var raw = new Array(txs.length); var i; for (i = 0; i < txs.length; i++) - json[i] = txs[i].toJSON(this.network); + raw[i] = this.frameTX(txs[i]); - this.emit('block tx', entry.toJSON(), json); + return this.rescanBlock(block, raw); +}; - return Promise.resolve(); +ClientSocket.prototype.frameEntry = function frameEntry(entry) { + if (this.raw) + return entry.toRaw().toString('hex'); + return entry.toJSON(); +}; + +ClientSocket.prototype.frameTX = function frameTX(tx) { + if (this.raw) + return tx.toRaw().toString('hex'); + return tx.toJSON(this.network); }; ClientSocket.prototype.join = function join(id) { diff --git a/lib/net/packets.js b/lib/net/packets.js index abf63e70..04398fa9 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -1636,30 +1636,16 @@ MempoolPacket.fromRaw = function fromRaw(data, enc) { * Represents a `filterload` packet. * @exports FilterLoadPacket * @constructor - * @param {Buffer|null} filter - * @param {Number|null} n - * @param {Number|null} tweak - * @param {Number|null} update - * @property {Buffer} filter - * @property {Number} n - * @property {Number} tweak - * @property {Number} update + * @param {Bloom|null} filter */ -function FilterLoadPacket(filter, n, tweak, update) { +function FilterLoadPacket(filter) { if (!(this instanceof FilterLoadPacket)) - return new FilterLoadPacket(filter, n, tweak, update); + return new FilterLoadPacket(filter); Packet.call(this); - if (filter instanceof Bloom) { - this.fromFilter(filter); - } else { - this.filter = filter || DUMMY; - this.n = n || 0; - this.tweak = tweak || 0; - this.update = update || 0; - } + this.filter = filter || new Bloom(0, 0, 0, -1); } util.inherits(FilterLoadPacket, Packet); @@ -1673,26 +1659,7 @@ FilterLoadPacket.prototype.type = exports.types.FILTERLOAD; */ FilterLoadPacket.prototype.toRaw = function toRaw(writer) { - var p = BufferWriter(writer); - - p.writeVarBytes(this.filter); - p.writeU32(this.n); - p.writeU32(this.tweak); - p.writeU8(this.update); - - if (!writer) - p = p.render(); - - return p; -}; - -/** - * Serialize filterload packet. - * @returns {Buffer} - */ - -FilterLoadPacket.prototype.toFilter = function toFilter() { - return new Bloom(this.filter, this.n, this.tweak, this.update); + return this.filter.toRaw(writer); }; /** @@ -1702,29 +1669,7 @@ FilterLoadPacket.prototype.toFilter = function toFilter() { */ FilterLoadPacket.prototype.fromRaw = function fromRaw(data) { - var p = BufferReader(data); - - this.filter = p.readVarBytes(); - this.n = p.readU32(); - this.tweak = p.readU32(); - this.update = p.readU8(); - - assert(constants.filterFlagsByVal[this.update] != null, 'Bad filter flag.'); - - return this; -}; - -/** - * Inject properties from bloom filter. - * @private - * @param {Bloom} filter - */ - -FilterLoadPacket.prototype.fromFilter = function fromFilter(filter) { - this.filter = filter.filter; - this.n = filter.n; - this.tweak = filter.tweak; - this.update = filter.update; + this.filter.fromRaw(data); return this; }; @@ -1741,27 +1686,16 @@ FilterLoadPacket.fromRaw = function fromRaw(data, enc) { return new FilterLoadPacket().fromRaw(data); }; -/** - * Instantiate filterload packet from serialized data. - * @param {Buffer} data - * @param {String?} enc - * @returns {FilterLoadPacket} - */ - -FilterLoadPacket.fromFilter = function fromFilter(filter) { - return new FilterLoadPacket().fromFilter(filter); -}; - /** * Ensure the filter is within the size limits. * @returns {Boolean} */ FilterLoadPacket.prototype.isWithinConstraints = function isWithinConstraints() { - if (this.filter.length > constants.bloom.MAX_BLOOM_FILTER_SIZE) + if (this.filter.size > constants.bloom.MAX_BLOOM_FILTER_SIZE * 8) return false; - if (this.n > constants.bloom.MAX_HASH_FUNCS) + if (this.filter.n > constants.bloom.MAX_HASH_FUNCS) return false; return true; diff --git a/lib/net/peer.js b/lib/net/peer.js index 7198acdd..62f60cf8 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -728,7 +728,7 @@ Peer.prototype.destroy = function destroy() { if (this.destroyed) return; - this.finishDrain(new Error('Destroyed.')); + this.finishDrain(); this.destroyed = true; this.connected = false; @@ -812,7 +812,7 @@ Peer.prototype.onDrain = function onDrain(size) { util.mb(this.drainSize), this.hostname); this.error('Peer stalled.'); - return Promise.reject(new Error('Peer stalled.')); + return Promise.resolve(); } return new Promise(function(resolve, reject) { @@ -832,7 +832,7 @@ Peer.prototype.maybeStall = function maybeStall() { if (util.now() < this.drainStart + 10) return false; - this.finishDrain(new Error('Peer stalled.')); + this.finishDrain(); this.error('Peer stalled.'); return true; @@ -1180,7 +1180,7 @@ Peer.prototype._handleFilterLoad = function _handleFilterLoad(packet) { return; } - this.spvFilter = packet.toFilter(); + this.spvFilter = packet.filter; this.relay = true; }; @@ -2690,11 +2690,6 @@ function compare(a, b) { return a.id - b.id; } -function Drainer(resolve, reject) { - this.resolve = resolve; - this.reject = reject; -} - /* * Expose */ diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index 05a57fb3..86ddedec 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -276,19 +276,39 @@ FullNode.prototype._close = co(function* close() { }); /** - * Watch address or outpoints (nop). - * @param {Hash[]} chunks + * Set bloom filter. + * @param {Bloom} filter * @returns {Promise} */ -FullNode.prototype.watchData = function watchData(chunks) { +FullNode.prototype.loadFilter = function loadFilter(filter) { + this.filter = filter; + return Promise.resolve(); +}; + +/** + * Add data to filter. + * @param {Buffer} data + * @returns {Promise} + */ + +FullNode.prototype.addFilter = function addFilter(data) { + return Promise.resolve(); +}; + +/** + * Reset filter. + * @returns {Promise} + */ + +FullNode.prototype.resetFilter = function resetFilter() { return Promise.resolve(); }; /** * Rescan for any missed transactions. - * @param {Hash} start - Block hash to start at. - * @param {Hash[]} hashes - Address hashes. + * @param {Number|Hash} start - Start block. + * @param {Bloom} filter * @param {Function} iter - Iterator. * @returns {Promise} */ @@ -297,6 +317,20 @@ FullNode.prototype.scan = function scan(start, filter, iter) { return this.chain.scan(start, filter, iter); }; +/** + * Rescan for any missed transactions. + * @param {Number|Hash} start - Start block. + * @param {Function} iter - Iterator. + * @returns {Promise} + */ + +FullNode.prototype.rescan = co(function* rescan(start, iter) { + if (!this.filter) + throw new Error('No filter set.'); + + yield this.scan(start, this.filter, iter); +}); + /** * Esimate smart fee. * @param {Number?} blocks diff --git a/lib/node/node.js b/lib/node/node.js index 12220b7f..0c23089d 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -49,6 +49,7 @@ function Node(options) { this.walletdb = null; this.wallet = null; this.http = null; + this.filter = null; this._bound = []; diff --git a/lib/node/spvnode.js b/lib/node/spvnode.js index 67304589..e0be2f99 100644 --- a/lib/node/spvnode.js +++ b/lib/node/spvnode.js @@ -9,6 +9,7 @@ var util = require('../utils/util'); var co = require('../utils/co'); +var Locker = require('../utils/locker'); var Node = require('./node'); var Chain = require('../blockchain/chain'); var Pool = require('../net/pool'); @@ -105,6 +106,10 @@ function SPVNode(options) { }); } + this.rescanJob = null; + this.scanLock = new Locker(); + this.watchLock = new Locker(); + this._init(); } @@ -133,6 +138,10 @@ SPVNode.prototype._init = function _init() { this.pool.on('tx', function(tx) { self.emit('tx', tx); + + if (self.rescanJob) + return; + self.walletdb.addTX(tx).catch(onError); }); @@ -141,6 +150,11 @@ SPVNode.prototype._init = function _init() { }); this.chain.on('connect', function(entry, block) { + if (self.rescanJob) { + self.watchBlock(entry, block).catch(onError); + return; + } + self.walletdb.addBlock(entry, block.txs).catch(onError); }); @@ -151,6 +165,10 @@ SPVNode.prototype._init = function _init() { this.chain.on('reset', function(tip) { self.walletdb.resetChain(tip).catch(onError); }); + + this.walletdb.on('watch', function(data) { + self.pool.watch(data); + }); }; /** @@ -194,31 +212,114 @@ SPVNode.prototype._close = co(function* close() { }); /** - * Watch address hashes or outpoints. - * @param {Hash[]} chunks + * Set bloom filter. + * @param {Bloom} filter * @returns {Promise} */ -SPVNode.prototype.watchData = function watchData(chunks) { - var i; +SPVNode.prototype.loadFilter = function loadFilter(filter) { + this.filter = filter; + return Promise.resolve(); +}; - for (i = 0; i < chunks.length; i++) - this.pool.watch(chunks[i], 'hex'); +/** + * Add data to filter. + * @param {Buffer} data + * @returns {Promise} + */ +SPVNode.prototype.addFilter = function addFilter(data) { + return Promise.resolve(); +}; + +/** + * Reset filter. + * @returns {Promise} + */ + +SPVNode.prototype.resetFilter = function resetFilter() { return Promise.resolve(); }; /** * Scan for any missed transactions. * Note that this will replay the blockchain sync. - * @param {Number|Hash} start + * @param {Number|Hash} start - Start block. + * @param {Bloom} filter + * @param {Function} iter - Iterator. * @returns {Promise} */ -SPVNode.prototype.scan = function rescan(start) { - return this.chain.replay(start); +SPVNode.prototype.scan = co(function* scan(start, filter, iter) { + var unlock = yield this.scanLock.lock(); + var height = this.chain.height; + + try { + yield this.chain.replay(start); + + if (this.chain.height < height) { + // We need to somehow defer this. + // yield this.pool.startSync(); + // yield this.watchUntil(height, iter); + } + } finally { + unlock(); + } +}); + +/** + * Scan for any missed transactions. + * Note that this will replay the blockchain sync. + * @param {Number|Hash} start - Start block. + * @param {Function} iter - Iterator. + * @returns {Promise} + */ + +SPVNode.prototype.rescan = co(function* rescan(start, iter) { + if (!this.filter) + throw new Error('No filter set.'); + + return yield this.scan(start, this.filter, iter); +}); + +/** + * Watch the blockchain until a certain height. + * @param {Number} height + * @param {Function} iter + * @returns {Promise} + */ + +SPVNode.prototype.watchUntil = function watchUntil(height, iter) { + var self = this; + return new Promise(function(resolve, reject) { + self.rescanJob = new RescanJob(resolve, reject, height, iter); + }); }; +/** + * Handled watched block. + * @param {ChainEntry} entry + * @param {MerkleBlock} block + * @returns {Promise} + */ + +SPVNode.prototype.watchBlock = co(function* watchBlock(entry, block) { + var unlock = yield this.watchLock.lock(); + try { + if (entry.height < this.rescanJob.height) { + yield this.rescanJob.iter(entry, block.txs); + return; + } + this.rescanJob.resolve(); + this.rescanJob = null; + } catch (e) { + this.rescanJob.reject(e); + this.rescanJob = null; + } finally { + unlock(); + } +}); + /** * Estimate smart fee (returns network fee rate). * @param {Number?} blocks @@ -277,6 +378,17 @@ SPVNode.prototype.stopSync = function stopSync() { return this.pool.stopSync(); }; +/* + * Helpers + */ + +function RescanJob(resolve, reject, height, iter) { + this.resolve = resolve; + this.reject = reject; + this.height = height; + this.iter = iter; +} + /* * Expose */ diff --git a/lib/primitives/tx.js b/lib/primitives/tx.js index 42294e38..eba52316 100644 --- a/lib/primitives/tx.js +++ b/lib/primitives/tx.js @@ -1938,7 +1938,7 @@ TX.prototype.getPrevout = function getPrevout() { TX.prototype.isWatched = function isWatched(filter) { var found = false; - var i, input, output, outpoint; + var i, input, output, prevout; if (!filter) return false; @@ -1954,12 +1954,12 @@ TX.prototype.isWatched = function isWatched(filter) { // Test the output script if (output.script.test(filter)) { if (filter.update === constants.filterFlags.ALL) { - outpoint = Outpoint.fromTX(this, i); - filter.add(outpoint.toRaw()); + prevout = Outpoint.fromTX(this, i); + filter.add(prevout.toRaw()); } else if (filter.update === constants.filterFlags.PUBKEY_ONLY) { if (output.script.isPubkey() || output.script.isMultisig()) { - outpoint = Outpoint.fromTX(this, i); - filter.add(outpoint.toRaw()); + prevout = Outpoint.fromTX(this, i); + filter.add(prevout.toRaw()); } } found = true; @@ -1973,10 +1973,10 @@ TX.prototype.isWatched = function isWatched(filter) { // 4. Test data elements in input scripts for (i = 0; i < this.inputs.length; i++) { input = this.inputs[i]; - outpoint = input.prevout.toRaw(); + prevout = input.prevout; // Test the COutPoint structure - if (filter.test(outpoint)) + if (filter.test(prevout.toRaw())) return true; // Test the input script @@ -1984,8 +1984,8 @@ TX.prototype.isWatched = function isWatched(filter) { return true; // Test the witness - // if (input.witness.test(filter)) - // return true; + if (input.witness.test(filter)) + return true; } // 5. No match diff --git a/lib/utils/bloom.js b/lib/utils/bloom.js index d36627f8..32ba1ca2 100644 --- a/lib/utils/bloom.js +++ b/lib/utils/bloom.js @@ -7,8 +7,11 @@ 'use strict'; +var assert = require('assert'); var constants = require('../protocol/constants'); var murmur3 = require('./murmur3'); +var BufferWriter = require('./writer'); +var BufferReader = require('./reader'); var sum32 = murmur3.sum32; var mul32 = murmur3.mul32; @@ -170,6 +173,57 @@ Bloom.fromRate = function fromRate(items, rate, update) { return new Bloom(size, n, -1, update); }; +/** + * Serialize bloom filter. + * @returns {Buffer} + */ + +Bloom.prototype.toRaw = function toRaw(writer) { + var p = BufferWriter(writer); + + p.writeVarBytes(this.filter); + p.writeU32(this.n); + p.writeU32(this.tweak); + p.writeU8(this.update); + + if (!writer) + p = p.render(); + + return p; +}; + +/** + * Inject properties from serialized data. + * @private + * @param {Buffer} data + */ + +Bloom.prototype.fromRaw = function fromRaw(data) { + var p = BufferReader(data); + + this.filter = p.readVarBytes(); + this.n = p.readU32(); + this.tweak = p.readU32(); + this.update = p.readU8(); + + assert(constants.filterFlagsByVal[this.update] != null, 'Bad filter flag.'); + + return this; +}; + +/** + * Instantiate bloom filter from serialized data. + * @param {Buffer} data + * @param {String?} enc + * @returns {Bloom} + */ + +Bloom.fromRaw = function fromRaw(data, enc) { + if (typeof data === 'string') + data = new Buffer(data, enc); + return new Bloom().fromRaw(data); +}; + /** * A rolling bloom filter used internally * (do not relay this on the p2p network). diff --git a/lib/utils/co.js b/lib/utils/co.js index 80794f38..b35013ed 100644 --- a/lib/utils/co.js +++ b/lib/utils/co.js @@ -301,7 +301,7 @@ if (typeof window !== 'undefined') { throw event.reason; }; } else { - process.on('unhandledRejection', function(err, promise) { + process.on('unhandledRejection', function(err, promise) { throw err; }); } diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index c076714d..c522b5b3 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -160,25 +160,62 @@ WalletDB.prototype._close = co(function* close() { */ WalletDB.prototype.watch = co(function* watch() { - var hashes = yield this.getHashes(); - var outpoints = yield this.getOutpoints(); - var chunks = []; - var i, hash, outpoint; + var hashes = 0; + var outpoints = 0; + var iter, item, data, outpoint, items; - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - chunks.push(hash); + iter = this.db.iterator({ + gte: layout.p(constants.NULL_HASH), + lte: layout.p(constants.HIGH_HASH) + }); + + for (;;) { + item = yield iter.next(); + + if (!item) + break; + + try { + data = layout.pp(item.key); + this.filter.add(data, 'hex'); + this.emit('watch', data); + } catch (e) { + yield iter.end(); + throw e; + } + + hashes++; } - for (i = 0; i < outpoints.length; i++) { - outpoint = outpoints[i]; - chunks.push(outpoint.toRaw()); + iter = this.db.iterator({ + gte: layout.o(constants.NULL_HASH, 0), + lte: layout.o(constants.HIGH_HASH, 0xffffffff) + }); + + for (;;) { + item = yield iter.next(); + + if (!item) + break; + + try { + items = layout.oo(item.key); + outpoint = new Outpoint(items[0], items[1]); + data = outpoint.toRaw(); + this.filter.add(data); + this.emit('watch', data); + } catch (e) { + yield iter.end(); + throw e; + } + + outpoints++; } - this.logger.info('Adding %d hashes to WalletDB filter.', hashes.length); - this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints.length); + this.logger.info('Adding %d hashes to WalletDB filter.', hashes); + this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints); - this.addFilter(chunks); + yield this.loadFilter(); }); /** @@ -289,27 +326,12 @@ WalletDB.prototype.scan = co(function* scan(height) { this.rescanning = true; try { - yield this.client.scan(tip.hash, this.filter, iter); + yield this.client.rescan(tip.hash, iter); } finally { this.rescanning = false; } }); -/** - * Add address or outpoints to chain server filter. - * @param {Hashes[]} chunks - * @returns {Promise} - */ - -WalletDB.prototype.watchData = co(function* watchData(chunks) { - if (!this.client) { - this.emit('watch data', chunks); - return; - } - - yield this.client.watchData(chunks); -}); - /** * Broadcast a transaction via chain server. * @param {TX} tx @@ -338,6 +360,52 @@ WalletDB.prototype.estimateFee = co(function* estimateFee(blocks) { return yield this.client.estimateFee(blocks); }); +/** + * Send filter to the remote node. + * @private + * @returns {Promise} + */ + +WalletDB.prototype.loadFilter = function loadFilter() { + if (!this.client) { + this.emit('load filter', this.filter); + return Promise.resolve(); + } + + return this.client.loadFilter(this.filter); +}; + +/** + * Add data to remote filter. + * @private + * @param {Buffer} data + * @returns {Promise} + */ + +WalletDB.prototype.addFilter = function addFilter(data) { + if (!this.client) { + this.emit('add filter', data); + return Promise.resolve(); + } + + return this.client.addFilter(data); +}; + +/** + * Reset remote filter. + * @private + * @returns {Promise} + */ + +WalletDB.prototype.resetFilter = function resetFilter() { + if (!this.client) { + this.emit('reset filter'); + return Promise.resolve(); + } + + return this.client.resetFilter(); +}; + /** * Backup the wallet db. * @param {String} path @@ -521,24 +589,29 @@ WalletDB.prototype.testFilter = function testFilter(data) { }; /** - * Add hash to filter. + * Add hash to local and remote filters. * @private * @param {Hash} hash */ -WalletDB.prototype.addFilter = function addFilter(chunks) { - var i, data; +WalletDB.prototype.addHash = function addHash(hash) { + this.filter.add(hash, 'hex'); + this.emit('watch', hash); + return this.addFilter(hash); +}; - if (!Array.isArray(chunks)) - chunks = [chunks]; +/** + * Add outpoint to local filter. + * @private + * @param {Buffer} data + */ - if (this.client) - this.client.watchData(chunks); +WalletDB.prototype.addOutpoint = function addOutpoint(hash, i) { + var outpoint = new Outpoint(hash, i); + var data = outpoint.toRaw(); - for (i = 0; i < chunks.length; i++) { - data = chunks[i]; - this.filter.add(data, 'hex'); - } + this.filter.add(outpoint.toRaw()); + this.emit('watch', data); }; /** @@ -991,7 +1064,7 @@ WalletDB.prototype.savePath = co(function* savePath(wallet, path) { var batch = this.batch(wallet); var map; - this.addFilter(hash); + yield this.addHash(hash); map = yield this.getPathMap(hash); @@ -1613,7 +1686,7 @@ WalletDB.prototype.getOutpointMap = co(function* getOutpointMap(hash, i) { WalletDB.prototype.writeOutpointMap = function writeOutpointMap(wallet, hash, i, map) { var batch = this.batch(wallet); - this.addFilter(new Outpoint(hash, i).toRaw()); + this.addOutpoint(hash, i); batch.put(layout.o(hash, i), map.toRaw()); };