walletdb: new client api.

This commit is contained in:
Christopher Jeffrey 2016-11-19 18:00:13 -08:00
parent 8f743b6e7d
commit 1827b945dd
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
10 changed files with 532 additions and 200 deletions

View File

@ -1313,6 +1313,18 @@ HTTPServer.prototype._initIO = function _initIO() {
callback();
});
socket.on('options', function(args, callback) {
var options = args[0];
try {
socket.setOptions(options);
} catch (e) {
return callback({ error: e.message });
}
callback();
});
socket.on('watch chain', function(args, callback) {
if (!socket.api)
return callback({ error: 'Not authorized.' });
@ -1329,7 +1341,27 @@ HTTPServer.prototype._initIO = function _initIO() {
callback();
});
socket.on('watch data', function(args, callback) {
socket.on('set filter', function(args, callback) {
var data = args[0];
var filter;
if (!util.isHex(data))
return callback({ error: 'Invalid parameter.' });
if (!socket.api)
return callback({ error: 'Not authorized.' });
try {
filter = Bloom.fromRaw(data, 'hex');
socket.setFilter(filter);
} catch (e) {
return callback({ error: e.message });
}
callback();
});
socket.on('add filter', function(args, callback) {
var chunks = args[0];
if (!Array.isArray(chunks))
@ -1347,6 +1379,19 @@ HTTPServer.prototype._initIO = function _initIO() {
callback();
});
socket.on('reset filter', function(args, callback) {
if (!socket.api)
return callback({ error: 'Not authorized.' });
try {
socket.resetFilter();
} catch (e) {
return callback({ error: e.message });
}
callback();
});
socket.on('estimate fee', function(args, callback) {
var blocks = args[0];
var rate;
@ -1354,9 +1399,6 @@ HTTPServer.prototype._initIO = function _initIO() {
if (blocks != null && !util.isNumber(blocks))
return callback({ error: 'Invalid parameter.' });
if (!socket.api)
return callback({ error: 'Not authorized.' });
if (!self.fees) {
rate = self.network.feeRate;
rate = Amount.btc(rate);
@ -1376,9 +1418,6 @@ HTTPServer.prototype._initIO = function _initIO() {
if (!util.isHex(data))
return callback({ error: 'Invalid parameter.' });
if (!socket.api)
return callback({ error: 'Not authorized.' });
try {
tx = TX.fromRaw(data, 'hex');
} catch (e) {
@ -1388,7 +1427,7 @@ HTTPServer.prototype._initIO = function _initIO() {
self.node.send(tx);
});
socket.on('scan', function(args, callback) {
socket.on('rescan', function(args, callback) {
var start = args[0];
if (!util.isHex256(start) && !util.isUInt32(start))
@ -1544,6 +1583,7 @@ function ClientSocket(server, socket) {
this.auth = false;
this.filter = null;
this.api = false;
this.raw = false;
this.network = this.server.network;
this.node = this.server.node;
@ -1589,11 +1629,24 @@ ClientSocket.prototype._init = function _init() {
});
};
ClientSocket.prototype.setOptions = function setOptions(options) {
assert(options && typeof options === 'object', 'Invalid parameter.');
if (options.raw != null) {
assert(typeof options.raw === 'boolean', 'Invalid parameter.');
this.raw = options.raw;
}
};
ClientSocket.prototype.setFilter = function setFilter(filter) {
this.filter = filter;
};
ClientSocket.prototype.addFilter = function addFilter(chunks) {
var i, data;
if (!this.filter)
this.filter = Bloom.fromRate(100000, 0.001, -1);
throw new Error('No filter set.');
for (i = 0; i < chunks.length; i++) {
data = chunks[i];
@ -1608,6 +1661,13 @@ ClientSocket.prototype.addFilter = function addFilter(chunks) {
}
};
ClientSocket.prototype.resetFilter = function resetFilter() {
if (!this.filter)
throw new Error('No filter set.');
this.filter.reset();
};
ClientSocket.prototype.bind = function bind(obj, event, listener) {
this.events.push([obj, event, listener]);
obj.on(event, listener);
@ -1638,29 +1698,27 @@ ClientSocket.prototype.unbindAll = function unbindAll() {
ClientSocket.prototype.watchChain = function watchChain() {
var self = this;
var onError = this.onError.bind(this);
var pool = this.mempool || this.pool;
this.bind(this.chain, 'connect', function(entry, block) {
var txs;
self.emit('block connect', entry.toJSON());
txs = self.testBlock(block);
if (txs)
self.emit('block tx', entry.toJSON(), txs);
self.connectBlock(entry, block).catch(onError);
});
this.bind(this.chain, 'disconnect', function(entry, block) {
self.emit('block disconnect', entry.toJSON());
self.disconnectBlock(entry, block).catch(onError);
});
this.bind(pool, 'tx', function(tx) {
if (self.testFilter(tx))
self.emit('mempool tx', tx.toJSON(self.network));
self.sendTX(tx).catch(onError);
});
};
ClientSocket.prototype.onError = function onError(err) {
var emit = EventEmitter.prototype.emit;
emit.call(this, 'error', err);
};
ClientSocket.prototype.unwatchChain = function unwatchChain() {
var pool = this.mempool || this.pool;
this.unbind(this.chain, 'connect');
@ -1668,6 +1726,89 @@ ClientSocket.prototype.unwatchChain = function unwatchChain() {
this.unbind(pool, 'tx');
};
ClientSocket.prototype.connectBlock = co(function* _connectBlock(entry, block) {
var unlock = this.locker.lock();
try {
return yield this._connectBlock(entry, block);
} finally {
unlock();
}
});
ClientSocket.prototype._connectBlock = function _connectBlock(entry, block) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw = self.frameEntry(entry);
var txs;
self.emit('entry connect', raw);
txs = self.testBlock(block);
if (!txs)
return cb();
self.emit('block connect', raw, txs, cb);
});
};
ClientSocket.prototype.disconnectBlock = co(function* _disconnectBlock(entry, block) {
var unlock = this.locker.lock();
try {
return yield this._disconnectBlock(entry, block);
} finally {
unlock();
}
});
ClientSocket.prototype._disconnectBlock = function _disconnectBlock(entry, block) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw = self.frameEntry(entry);
self.emit('entry disconnect', raw);
if (!self.filter)
return cb();
self.emit('block disconnect', raw, cb);
});
};
ClientSocket.prototype.sendTX = co(function* sendTX(tx) {
var unlock = this.locker.lock();
try {
return yield this._sendTX(tx);
} finally {
unlock();
}
});
ClientSocket.prototype._sendTX = function _sendTX(tx) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
var raw;
if (!self.testFilter(tx))
return cb();
raw = self.frameTX(tx);
self.emit('tx', raw, cb);
});
};
ClientSocket.prototype.rescanBlock = function rescanBlock(entry, txs) {
var self = this;
return new Promise(function(resolve, reject) {
var cb = co.wrap(resolve, reject);
self.emit('block rescan', entry, txs, cb);
});
};
ClientSocket.prototype.testBlock = function testBlock(block) {
var txs = [];
var i, tx;
@ -1678,7 +1819,7 @@ ClientSocket.prototype.testBlock = function testBlock(block) {
for (i = 0; i < block.txs.length; i++) {
tx = block.txs[i];
if (this.testFilter(tx))
txs.push(tx.toJSON(this.network));
txs.push(this.frameTX(tx));
}
if (txs.length === 0)
@ -1688,31 +1829,8 @@ ClientSocket.prototype.testBlock = function testBlock(block) {
};
ClientSocket.prototype.testFilter = function testFilter(tx) {
if (this.chain.db.options.spv)
return this.testFilterSPV(tx);
return this.testFilterFull(tx);
};
ClientSocket.prototype.testFilterFull = function testFilterFull(tx) {
var i, hashes, hash;
if (!this.filter)
return false;
hashes = tx.getHashes();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.filter.test(hash))
return true;
}
return false;
};
ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) {
var found = false;
var i, hash, input, prevout, output, outpoint;
var i, hash, input, prevout, output;
if (!this.filter)
return false;
@ -1725,8 +1843,8 @@ ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) {
continue;
if (this.filter.test(hash)) {
outpoint = Outpoint.fromTX(tx, i);
this.filter.add(outpoint.toRaw());
prevout = Outpoint.fromTX(tx, i);
this.filter.add(prevout.toRaw());
found = true;
}
}
@ -1752,15 +1870,26 @@ ClientSocket.prototype.scan = co(function* scan(start) {
});
ClientSocket.prototype.scanner = function scanner(entry, txs) {
var json = new Array(txs.length);
var block = this.frameEntry(entry);
var raw = new Array(txs.length);
var i;
for (i = 0; i < txs.length; i++)
json[i] = txs[i].toJSON(this.network);
raw[i] = this.frameTX(txs[i]);
this.emit('block tx', entry.toJSON(), json);
return this.rescanBlock(block, raw);
};
return Promise.resolve();
ClientSocket.prototype.frameEntry = function frameEntry(entry) {
if (this.raw)
return entry.toRaw().toString('hex');
return entry.toJSON();
};
ClientSocket.prototype.frameTX = function frameTX(tx) {
if (this.raw)
return tx.toRaw().toString('hex');
return tx.toJSON(this.network);
};
ClientSocket.prototype.join = function join(id) {

View File

@ -1636,30 +1636,16 @@ MempoolPacket.fromRaw = function fromRaw(data, enc) {
* Represents a `filterload` packet.
* @exports FilterLoadPacket
* @constructor
* @param {Buffer|null} filter
* @param {Number|null} n
* @param {Number|null} tweak
* @param {Number|null} update
* @property {Buffer} filter
* @property {Number} n
* @property {Number} tweak
* @property {Number} update
* @param {Bloom|null} filter
*/
function FilterLoadPacket(filter, n, tweak, update) {
function FilterLoadPacket(filter) {
if (!(this instanceof FilterLoadPacket))
return new FilterLoadPacket(filter, n, tweak, update);
return new FilterLoadPacket(filter);
Packet.call(this);
if (filter instanceof Bloom) {
this.fromFilter(filter);
} else {
this.filter = filter || DUMMY;
this.n = n || 0;
this.tweak = tweak || 0;
this.update = update || 0;
}
this.filter = filter || new Bloom(0, 0, 0, -1);
}
util.inherits(FilterLoadPacket, Packet);
@ -1673,26 +1659,7 @@ FilterLoadPacket.prototype.type = exports.types.FILTERLOAD;
*/
FilterLoadPacket.prototype.toRaw = function toRaw(writer) {
var p = BufferWriter(writer);
p.writeVarBytes(this.filter);
p.writeU32(this.n);
p.writeU32(this.tweak);
p.writeU8(this.update);
if (!writer)
p = p.render();
return p;
};
/**
* Serialize filterload packet.
* @returns {Buffer}
*/
FilterLoadPacket.prototype.toFilter = function toFilter() {
return new Bloom(this.filter, this.n, this.tweak, this.update);
return this.filter.toRaw(writer);
};
/**
@ -1702,29 +1669,7 @@ FilterLoadPacket.prototype.toFilter = function toFilter() {
*/
FilterLoadPacket.prototype.fromRaw = function fromRaw(data) {
var p = BufferReader(data);
this.filter = p.readVarBytes();
this.n = p.readU32();
this.tweak = p.readU32();
this.update = p.readU8();
assert(constants.filterFlagsByVal[this.update] != null, 'Bad filter flag.');
return this;
};
/**
* Inject properties from bloom filter.
* @private
* @param {Bloom} filter
*/
FilterLoadPacket.prototype.fromFilter = function fromFilter(filter) {
this.filter = filter.filter;
this.n = filter.n;
this.tweak = filter.tweak;
this.update = filter.update;
this.filter.fromRaw(data);
return this;
};
@ -1741,27 +1686,16 @@ FilterLoadPacket.fromRaw = function fromRaw(data, enc) {
return new FilterLoadPacket().fromRaw(data);
};
/**
* Instantiate filterload packet from serialized data.
* @param {Buffer} data
* @param {String?} enc
* @returns {FilterLoadPacket}
*/
FilterLoadPacket.fromFilter = function fromFilter(filter) {
return new FilterLoadPacket().fromFilter(filter);
};
/**
* Ensure the filter is within the size limits.
* @returns {Boolean}
*/
FilterLoadPacket.prototype.isWithinConstraints = function isWithinConstraints() {
if (this.filter.length > constants.bloom.MAX_BLOOM_FILTER_SIZE)
if (this.filter.size > constants.bloom.MAX_BLOOM_FILTER_SIZE * 8)
return false;
if (this.n > constants.bloom.MAX_HASH_FUNCS)
if (this.filter.n > constants.bloom.MAX_HASH_FUNCS)
return false;
return true;

View File

@ -728,7 +728,7 @@ Peer.prototype.destroy = function destroy() {
if (this.destroyed)
return;
this.finishDrain(new Error('Destroyed.'));
this.finishDrain();
this.destroyed = true;
this.connected = false;
@ -812,7 +812,7 @@ Peer.prototype.onDrain = function onDrain(size) {
util.mb(this.drainSize),
this.hostname);
this.error('Peer stalled.');
return Promise.reject(new Error('Peer stalled.'));
return Promise.resolve();
}
return new Promise(function(resolve, reject) {
@ -832,7 +832,7 @@ Peer.prototype.maybeStall = function maybeStall() {
if (util.now() < this.drainStart + 10)
return false;
this.finishDrain(new Error('Peer stalled.'));
this.finishDrain();
this.error('Peer stalled.');
return true;
@ -1180,7 +1180,7 @@ Peer.prototype._handleFilterLoad = function _handleFilterLoad(packet) {
return;
}
this.spvFilter = packet.toFilter();
this.spvFilter = packet.filter;
this.relay = true;
};
@ -2690,11 +2690,6 @@ function compare(a, b) {
return a.id - b.id;
}
function Drainer(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
}
/*
* Expose
*/

View File

@ -276,19 +276,39 @@ FullNode.prototype._close = co(function* close() {
});
/**
* Watch address or outpoints (nop).
* @param {Hash[]} chunks
* Set bloom filter.
* @param {Bloom} filter
* @returns {Promise}
*/
FullNode.prototype.watchData = function watchData(chunks) {
FullNode.prototype.loadFilter = function loadFilter(filter) {
this.filter = filter;
return Promise.resolve();
};
/**
* Add data to filter.
* @param {Buffer} data
* @returns {Promise}
*/
FullNode.prototype.addFilter = function addFilter(data) {
return Promise.resolve();
};
/**
* Reset filter.
* @returns {Promise}
*/
FullNode.prototype.resetFilter = function resetFilter() {
return Promise.resolve();
};
/**
* Rescan for any missed transactions.
* @param {Hash} start - Block hash to start at.
* @param {Hash[]} hashes - Address hashes.
* @param {Number|Hash} start - Start block.
* @param {Bloom} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
@ -297,6 +317,20 @@ FullNode.prototype.scan = function scan(start, filter, iter) {
return this.chain.scan(start, filter, iter);
};
/**
* Rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
FullNode.prototype.rescan = co(function* rescan(start, iter) {
if (!this.filter)
throw new Error('No filter set.');
yield this.scan(start, this.filter, iter);
});
/**
* Esimate smart fee.
* @param {Number?} blocks

View File

@ -49,6 +49,7 @@ function Node(options) {
this.walletdb = null;
this.wallet = null;
this.http = null;
this.filter = null;
this._bound = [];

View File

@ -9,6 +9,7 @@
var util = require('../utils/util');
var co = require('../utils/co');
var Locker = require('../utils/locker');
var Node = require('./node');
var Chain = require('../blockchain/chain');
var Pool = require('../net/pool');
@ -105,6 +106,10 @@ function SPVNode(options) {
});
}
this.rescanJob = null;
this.scanLock = new Locker();
this.watchLock = new Locker();
this._init();
}
@ -133,6 +138,10 @@ SPVNode.prototype._init = function _init() {
this.pool.on('tx', function(tx) {
self.emit('tx', tx);
if (self.rescanJob)
return;
self.walletdb.addTX(tx).catch(onError);
});
@ -141,6 +150,11 @@ SPVNode.prototype._init = function _init() {
});
this.chain.on('connect', function(entry, block) {
if (self.rescanJob) {
self.watchBlock(entry, block).catch(onError);
return;
}
self.walletdb.addBlock(entry, block.txs).catch(onError);
});
@ -151,6 +165,10 @@ SPVNode.prototype._init = function _init() {
this.chain.on('reset', function(tip) {
self.walletdb.resetChain(tip).catch(onError);
});
this.walletdb.on('watch', function(data) {
self.pool.watch(data);
});
};
/**
@ -194,31 +212,114 @@ SPVNode.prototype._close = co(function* close() {
});
/**
* Watch address hashes or outpoints.
* @param {Hash[]} chunks
* Set bloom filter.
* @param {Bloom} filter
* @returns {Promise}
*/
SPVNode.prototype.watchData = function watchData(chunks) {
var i;
SPVNode.prototype.loadFilter = function loadFilter(filter) {
this.filter = filter;
return Promise.resolve();
};
for (i = 0; i < chunks.length; i++)
this.pool.watch(chunks[i], 'hex');
/**
* Add data to filter.
* @param {Buffer} data
* @returns {Promise}
*/
SPVNode.prototype.addFilter = function addFilter(data) {
return Promise.resolve();
};
/**
* Reset filter.
* @returns {Promise}
*/
SPVNode.prototype.resetFilter = function resetFilter() {
return Promise.resolve();
};
/**
* Scan for any missed transactions.
* Note that this will replay the blockchain sync.
* @param {Number|Hash} start
* @param {Number|Hash} start - Start block.
* @param {Bloom} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
SPVNode.prototype.scan = function rescan(start) {
return this.chain.replay(start);
SPVNode.prototype.scan = co(function* scan(start, filter, iter) {
var unlock = yield this.scanLock.lock();
var height = this.chain.height;
try {
yield this.chain.replay(start);
if (this.chain.height < height) {
// We need to somehow defer this.
// yield this.pool.startSync();
// yield this.watchUntil(height, iter);
}
} finally {
unlock();
}
});
/**
* Scan for any missed transactions.
* Note that this will replay the blockchain sync.
* @param {Number|Hash} start - Start block.
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
SPVNode.prototype.rescan = co(function* rescan(start, iter) {
if (!this.filter)
throw new Error('No filter set.');
return yield this.scan(start, this.filter, iter);
});
/**
* Watch the blockchain until a certain height.
* @param {Number} height
* @param {Function} iter
* @returns {Promise}
*/
SPVNode.prototype.watchUntil = function watchUntil(height, iter) {
var self = this;
return new Promise(function(resolve, reject) {
self.rescanJob = new RescanJob(resolve, reject, height, iter);
});
};
/**
* Handled watched block.
* @param {ChainEntry} entry
* @param {MerkleBlock} block
* @returns {Promise}
*/
SPVNode.prototype.watchBlock = co(function* watchBlock(entry, block) {
var unlock = yield this.watchLock.lock();
try {
if (entry.height < this.rescanJob.height) {
yield this.rescanJob.iter(entry, block.txs);
return;
}
this.rescanJob.resolve();
this.rescanJob = null;
} catch (e) {
this.rescanJob.reject(e);
this.rescanJob = null;
} finally {
unlock();
}
});
/**
* Estimate smart fee (returns network fee rate).
* @param {Number?} blocks
@ -277,6 +378,17 @@ SPVNode.prototype.stopSync = function stopSync() {
return this.pool.stopSync();
};
/*
* Helpers
*/
function RescanJob(resolve, reject, height, iter) {
this.resolve = resolve;
this.reject = reject;
this.height = height;
this.iter = iter;
}
/*
* Expose
*/

View File

@ -1938,7 +1938,7 @@ TX.prototype.getPrevout = function getPrevout() {
TX.prototype.isWatched = function isWatched(filter) {
var found = false;
var i, input, output, outpoint;
var i, input, output, prevout;
if (!filter)
return false;
@ -1954,12 +1954,12 @@ TX.prototype.isWatched = function isWatched(filter) {
// Test the output script
if (output.script.test(filter)) {
if (filter.update === constants.filterFlags.ALL) {
outpoint = Outpoint.fromTX(this, i);
filter.add(outpoint.toRaw());
prevout = Outpoint.fromTX(this, i);
filter.add(prevout.toRaw());
} else if (filter.update === constants.filterFlags.PUBKEY_ONLY) {
if (output.script.isPubkey() || output.script.isMultisig()) {
outpoint = Outpoint.fromTX(this, i);
filter.add(outpoint.toRaw());
prevout = Outpoint.fromTX(this, i);
filter.add(prevout.toRaw());
}
}
found = true;
@ -1973,10 +1973,10 @@ TX.prototype.isWatched = function isWatched(filter) {
// 4. Test data elements in input scripts
for (i = 0; i < this.inputs.length; i++) {
input = this.inputs[i];
outpoint = input.prevout.toRaw();
prevout = input.prevout;
// Test the COutPoint structure
if (filter.test(outpoint))
if (filter.test(prevout.toRaw()))
return true;
// Test the input script
@ -1984,8 +1984,8 @@ TX.prototype.isWatched = function isWatched(filter) {
return true;
// Test the witness
// if (input.witness.test(filter))
// return true;
if (input.witness.test(filter))
return true;
}
// 5. No match

View File

@ -7,8 +7,11 @@
'use strict';
var assert = require('assert');
var constants = require('../protocol/constants');
var murmur3 = require('./murmur3');
var BufferWriter = require('./writer');
var BufferReader = require('./reader');
var sum32 = murmur3.sum32;
var mul32 = murmur3.mul32;
@ -170,6 +173,57 @@ Bloom.fromRate = function fromRate(items, rate, update) {
return new Bloom(size, n, -1, update);
};
/**
* Serialize bloom filter.
* @returns {Buffer}
*/
Bloom.prototype.toRaw = function toRaw(writer) {
var p = BufferWriter(writer);
p.writeVarBytes(this.filter);
p.writeU32(this.n);
p.writeU32(this.tweak);
p.writeU8(this.update);
if (!writer)
p = p.render();
return p;
};
/**
* Inject properties from serialized data.
* @private
* @param {Buffer} data
*/
Bloom.prototype.fromRaw = function fromRaw(data) {
var p = BufferReader(data);
this.filter = p.readVarBytes();
this.n = p.readU32();
this.tweak = p.readU32();
this.update = p.readU8();
assert(constants.filterFlagsByVal[this.update] != null, 'Bad filter flag.');
return this;
};
/**
* Instantiate bloom filter from serialized data.
* @param {Buffer} data
* @param {String?} enc
* @returns {Bloom}
*/
Bloom.fromRaw = function fromRaw(data, enc) {
if (typeof data === 'string')
data = new Buffer(data, enc);
return new Bloom().fromRaw(data);
};
/**
* A rolling bloom filter used internally
* (do not relay this on the p2p network).

View File

@ -301,7 +301,7 @@ if (typeof window !== 'undefined') {
throw event.reason;
};
} else {
process.on('unhandledRejection', function(err, promise) {
process.on('unhandledRejection', function(err, promise) {
throw err;
});
}

View File

@ -160,25 +160,62 @@ WalletDB.prototype._close = co(function* close() {
*/
WalletDB.prototype.watch = co(function* watch() {
var hashes = yield this.getHashes();
var outpoints = yield this.getOutpoints();
var chunks = [];
var i, hash, outpoint;
var hashes = 0;
var outpoints = 0;
var iter, item, data, outpoint, items;
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
chunks.push(hash);
iter = this.db.iterator({
gte: layout.p(constants.NULL_HASH),
lte: layout.p(constants.HIGH_HASH)
});
for (;;) {
item = yield iter.next();
if (!item)
break;
try {
data = layout.pp(item.key);
this.filter.add(data, 'hex');
this.emit('watch', data);
} catch (e) {
yield iter.end();
throw e;
}
hashes++;
}
for (i = 0; i < outpoints.length; i++) {
outpoint = outpoints[i];
chunks.push(outpoint.toRaw());
iter = this.db.iterator({
gte: layout.o(constants.NULL_HASH, 0),
lte: layout.o(constants.HIGH_HASH, 0xffffffff)
});
for (;;) {
item = yield iter.next();
if (!item)
break;
try {
items = layout.oo(item.key);
outpoint = new Outpoint(items[0], items[1]);
data = outpoint.toRaw();
this.filter.add(data);
this.emit('watch', data);
} catch (e) {
yield iter.end();
throw e;
}
outpoints++;
}
this.logger.info('Adding %d hashes to WalletDB filter.', hashes.length);
this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints.length);
this.logger.info('Adding %d hashes to WalletDB filter.', hashes);
this.logger.info('Adding %d outpoints to WalletDB filter.', outpoints);
this.addFilter(chunks);
yield this.loadFilter();
});
/**
@ -289,27 +326,12 @@ WalletDB.prototype.scan = co(function* scan(height) {
this.rescanning = true;
try {
yield this.client.scan(tip.hash, this.filter, iter);
yield this.client.rescan(tip.hash, iter);
} finally {
this.rescanning = false;
}
});
/**
* Add address or outpoints to chain server filter.
* @param {Hashes[]} chunks
* @returns {Promise}
*/
WalletDB.prototype.watchData = co(function* watchData(chunks) {
if (!this.client) {
this.emit('watch data', chunks);
return;
}
yield this.client.watchData(chunks);
});
/**
* Broadcast a transaction via chain server.
* @param {TX} tx
@ -338,6 +360,52 @@ WalletDB.prototype.estimateFee = co(function* estimateFee(blocks) {
return yield this.client.estimateFee(blocks);
});
/**
* Send filter to the remote node.
* @private
* @returns {Promise}
*/
WalletDB.prototype.loadFilter = function loadFilter() {
if (!this.client) {
this.emit('load filter', this.filter);
return Promise.resolve();
}
return this.client.loadFilter(this.filter);
};
/**
* Add data to remote filter.
* @private
* @param {Buffer} data
* @returns {Promise}
*/
WalletDB.prototype.addFilter = function addFilter(data) {
if (!this.client) {
this.emit('add filter', data);
return Promise.resolve();
}
return this.client.addFilter(data);
};
/**
* Reset remote filter.
* @private
* @returns {Promise}
*/
WalletDB.prototype.resetFilter = function resetFilter() {
if (!this.client) {
this.emit('reset filter');
return Promise.resolve();
}
return this.client.resetFilter();
};
/**
* Backup the wallet db.
* @param {String} path
@ -521,24 +589,29 @@ WalletDB.prototype.testFilter = function testFilter(data) {
};
/**
* Add hash to filter.
* Add hash to local and remote filters.
* @private
* @param {Hash} hash
*/
WalletDB.prototype.addFilter = function addFilter(chunks) {
var i, data;
WalletDB.prototype.addHash = function addHash(hash) {
this.filter.add(hash, 'hex');
this.emit('watch', hash);
return this.addFilter(hash);
};
if (!Array.isArray(chunks))
chunks = [chunks];
/**
* Add outpoint to local filter.
* @private
* @param {Buffer} data
*/
if (this.client)
this.client.watchData(chunks);
WalletDB.prototype.addOutpoint = function addOutpoint(hash, i) {
var outpoint = new Outpoint(hash, i);
var data = outpoint.toRaw();
for (i = 0; i < chunks.length; i++) {
data = chunks[i];
this.filter.add(data, 'hex');
}
this.filter.add(outpoint.toRaw());
this.emit('watch', data);
};
/**
@ -991,7 +1064,7 @@ WalletDB.prototype.savePath = co(function* savePath(wallet, path) {
var batch = this.batch(wallet);
var map;
this.addFilter(hash);
yield this.addHash(hash);
map = yield this.getPathMap(hash);
@ -1613,7 +1686,7 @@ WalletDB.prototype.getOutpointMap = co(function* getOutpointMap(hash, i) {
WalletDB.prototype.writeOutpointMap = function writeOutpointMap(wallet, hash, i, map) {
var batch = this.batch(wallet);
this.addFilter(new Outpoint(hash, i).toRaw());
this.addOutpoint(hash, i);
batch.put(layout.o(hash, i), map.toRaw());
};