diff --git a/lib/bcoin/bloom.js b/lib/bcoin/bloom.js index 46ec7f40..638a8bb1 100644 --- a/lib/bcoin/bloom.js +++ b/lib/bcoin/bloom.js @@ -9,6 +9,13 @@ var utils = require('./utils'); var assert = utils.assert; var constants = require('./protocol/constants'); +/* + * Constants + */ + +var LN2SQUARED = 0.4804530139182014246671025263266649717305529515945455; +var LN2 = 0.6931471805599453094172321214581765680755001343602552; + /** * Bloom Filter * @exports Bloom @@ -36,6 +43,9 @@ function Bloom(size, n, tweak, update) { this.reset(); } + if (update == null) + update = constants.filterFlags.NONE; + this.n = n; this.tweak = tweak; this.update = update; @@ -136,6 +146,107 @@ Bloom.prototype.isWithinConstraints = function isWithinConstraints() { return true; }; +/** + * Create a filter from a false positive rate. + * @param {Number} items - Expeected number of items. + * @param {Number} rate - False positive rate (0.0-1.0). + * @param {Number} tweak + * @param {Number} update + * @example + * bcoin.bloom.fromRate(800000, 0.01, 0xdeadbeef); + * @returns {Boolean} + */ + +Bloom.fromRate = function fromRate(items, rate, tweak, update) { + var size, n; + + size = (-1 / LN2SQUARED * items * Math.log(rate)) | 0; + size = Math.min(size, constants.bloom.MAX_BLOOM_FILTER_SIZE * 8); + + n = (size / items * LN2) | 0; + n = Math.min(n, constants.bloom.MAX_HASH_FUNCS); + + if (tweak == null) + tweak = Math.random() * 0x100000000 | 0; + + return new Bloom(size, n, tweak, update); +}; + +/** + * A bloom filter that will reset itself + * once the max number of items is reached. + * @exports Bloom + * @constructor + * @param {Number} items - Expected number of items. + * @param {Number} rate - False positive rate. + * @property {Bloom} filter + * @property {Number} items + */ + +function RollingFilter(items, rate) { + if (!(this instanceof RollingFilter)) + return new RollingFilter(items, rate); + + this.count = 0; + this.items = items; + + this.filter = Bloom.fromRate(items, rate); +} + +/** + * Reset the filter. + */ + +RollingFilter.prototype.reset = function reset() { + this.count = 0; + return this.filter.reset(); +}; + +/** + * Add data to the filter. + * @param {Buffer|String} + * @param {String?} enc - Can be any of the Buffer object's encodings. + */ + +RollingFilter.prototype.add = function add(val, enc) { + if (this.count >= this.items) + this.reset(); + + this.count++; + + return this.filter.add(val, enc); +}; + +/** + * Test whether data is present in the filter. + * @param {Buffer|String} val + * @param {String?} enc - Can be any of the Buffer object's encodings. + * @returns {Boolean} + */ + +RollingFilter.prototype.test = function test(val, enc) { + return this.filter.test(val, enc); +}; + +/** + * Test whether data is present in the filter, add it if not. + * @param {Buffer|String} val + * @param {String?} enc - Can be any of the Buffer object's encodings. + * @returns {Boolean} + */ + +RollingFilter.prototype.added = function added(val, enc) { + if (typeof val === 'string') + val = new Buffer(val, enc); + + if (!this.filter.test(val)) { + this.filter.add(val); + return true; + } + + return false; +}; + /* * Murmur */ @@ -239,6 +350,8 @@ function murmur(data, seed) { Bloom.hash = murmur; +Bloom.rolling = RollingFilter; + /* * Expose */ diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 0beb38f9..7a5c915f 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -100,6 +100,8 @@ function Peer(pool, options) { this.relay = true; this.localNonce = utils.nonce(); this.filterRate = -1; + this.addrFilter = new bcoin.bloom.rolling(5000, 0.001); + this.invFilter = new bcoin.bloom.rolling(50000, 0.000001); this.challenge = null; this.lastPong = -1; @@ -279,6 +281,8 @@ Peer.prototype.createSocket = function createSocket(port, host) { */ Peer.prototype.sendInv = function sendInv(items) { + var self = this; + if (this.destroyed) return; @@ -313,6 +317,10 @@ Peer.prototype.sendInv = function sendInv(items) { }; }); + items = items.filter(function(msg) { + return self.invFilter.added(msg.hash, 'hex'); + }); + this.write(this.framer.inv(items)); }; @@ -1232,6 +1240,8 @@ Peer.prototype._handleAddr = function handleAddr(addrs) { if (ts <= 100000000 || ts > now + 10 * 60) ts = now - 5 * 24 * 60 * 60; + this.addrFilter.add(host, 'ascii'); + this.emit('addr', { version: addr.version, ts: ts, @@ -1308,6 +1318,9 @@ Peer.prototype._handleGetAddr = function handleGetAddr() { hosts[seed.host] = true; + if (!this.addrFilter.added(seed.host, 'ascii')) + continue; + items.push({ network: this.network, ts: seed.ts || ts, @@ -1342,6 +1355,9 @@ Peer.prototype._handleInv = function handleInv(items) { txs.push(item.hash); else if (item.type === constants.inv.BLOCK) blocks.push(item.hash); + else + continue; + this.invFilter.add(item.hash, 'hex'); } if (blocks.length > 0) diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index fbb1e2b6..856f6966 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -170,8 +170,9 @@ function Pool(options) { }; this.tx = { - state: {}, - count: 0, + filter: !this.mempool + ? new bcoin.bloom.rolling(50000, 0.000001) + : null, type: constants.inv.TX }; @@ -1016,8 +1017,7 @@ Pool.prototype._createPeer = function _createPeer(options) { for (i = 0; i < txs.length; i++) { hash = txs[i]; - if (self._markTX(hash, 0)) - self.getData(peer, self.tx.type, hash); + self.getData(peer, self.tx.type, hash); } }); @@ -1043,19 +1043,19 @@ Pool.prototype._createPeer = function _createPeer(options) { Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { var self = this; - var requested, updated; + var requested = this.fulfill(tx); - callback = utils.asyncify(callback); - - requested = this.fulfill(tx); - updated = this._markTX(tx, 1); + if (!requested && tx.ts === 0) { + bcoin.debug('Peer sent unrequested tx: %s (%s).', + tx.rhash, peer.hostname); + } function addMempool(tx, callback) { if (!self.mempool) - return callback(); + return utils.nextTick(callback); if (tx.ts !== 0) - return callback(); + return utils.nextTick(callback); self.mempool.addTX(tx, callback); } @@ -1069,11 +1069,7 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { } } - if (updated || tx.block) - self.emit('tx', tx, peer); - - if (self.options.spv && tx.block) - self.emit('watched', tx, peer); + self.emit('tx', tx, peer); return callback(); }); @@ -1208,29 +1204,6 @@ Pool.prototype._addPeer = function _addPeer() { }); }; -Pool.prototype._markTX = function(hash, state) { - if (hash.hash) - hash = hash.hash('hex'); - - if (this.tx.count >= 5000) { - this.tx.state = {}; - this.tx.count = 0; - } - - if (this.tx.state[hash] == null) { - this.tx.state[hash] = state; - this.tx.count++; - return true; - } - - if (this.tx.state[hash] < state) { - this.tx.state[hash] = state; - return true; - } - - return false; -}; - Pool.prototype.bestPeer = function bestPeer() { return this.peers.regular.reduce(function(best, peer) { if (!peer.version || !peer.socket) @@ -1567,8 +1540,12 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { } if (type === this.tx.type) { - if (!this.mempool) - return utils.asyncify(done)(null, false); + if (!this.mempool) { + done = utils.asyncify(done); + if (this.tx.filter.added(hash, 'hex')) + return done(null, false); + return done(null, true); + } return this.mempool.has(hash, done); }