use bloom filters to avoid double sending to peers.

This commit is contained in:
Christopher Jeffrey 2016-05-20 06:11:28 -07:00
parent d4f03f982e
commit d9d18f2be7
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 147 additions and 41 deletions

View File

@ -9,6 +9,13 @@ var utils = require('./utils');
var assert = utils.assert;
var constants = require('./protocol/constants');
/*
* Constants
*/
var LN2SQUARED = 0.4804530139182014246671025263266649717305529515945455;
var LN2 = 0.6931471805599453094172321214581765680755001343602552;
/**
* Bloom Filter
* @exports Bloom
@ -36,6 +43,9 @@ function Bloom(size, n, tweak, update) {
this.reset();
}
if (update == null)
update = constants.filterFlags.NONE;
this.n = n;
this.tweak = tweak;
this.update = update;
@ -136,6 +146,107 @@ Bloom.prototype.isWithinConstraints = function isWithinConstraints() {
return true;
};
/**
* Create a filter from a false positive rate.
* @param {Number} items - Expeected number of items.
* @param {Number} rate - False positive rate (0.0-1.0).
* @param {Number} tweak
* @param {Number} update
* @example
* bcoin.bloom.fromRate(800000, 0.01, 0xdeadbeef);
* @returns {Boolean}
*/
Bloom.fromRate = function fromRate(items, rate, tweak, update) {
var size, n;
size = (-1 / LN2SQUARED * items * Math.log(rate)) | 0;
size = Math.min(size, constants.bloom.MAX_BLOOM_FILTER_SIZE * 8);
n = (size / items * LN2) | 0;
n = Math.min(n, constants.bloom.MAX_HASH_FUNCS);
if (tweak == null)
tweak = Math.random() * 0x100000000 | 0;
return new Bloom(size, n, tweak, update);
};
/**
* A bloom filter that will reset itself
* once the max number of items is reached.
* @exports Bloom
* @constructor
* @param {Number} items - Expected number of items.
* @param {Number} rate - False positive rate.
* @property {Bloom} filter
* @property {Number} items
*/
function RollingFilter(items, rate) {
if (!(this instanceof RollingFilter))
return new RollingFilter(items, rate);
this.count = 0;
this.items = items;
this.filter = Bloom.fromRate(items, rate);
}
/**
* Reset the filter.
*/
RollingFilter.prototype.reset = function reset() {
this.count = 0;
return this.filter.reset();
};
/**
* Add data to the filter.
* @param {Buffer|String}
* @param {String?} enc - Can be any of the Buffer object's encodings.
*/
RollingFilter.prototype.add = function add(val, enc) {
if (this.count >= this.items)
this.reset();
this.count++;
return this.filter.add(val, enc);
};
/**
* Test whether data is present in the filter.
* @param {Buffer|String} val
* @param {String?} enc - Can be any of the Buffer object's encodings.
* @returns {Boolean}
*/
RollingFilter.prototype.test = function test(val, enc) {
return this.filter.test(val, enc);
};
/**
* Test whether data is present in the filter, add it if not.
* @param {Buffer|String} val
* @param {String?} enc - Can be any of the Buffer object's encodings.
* @returns {Boolean}
*/
RollingFilter.prototype.added = function added(val, enc) {
if (typeof val === 'string')
val = new Buffer(val, enc);
if (!this.filter.test(val)) {
this.filter.add(val);
return true;
}
return false;
};
/*
* Murmur
*/
@ -239,6 +350,8 @@ function murmur(data, seed) {
Bloom.hash = murmur;
Bloom.rolling = RollingFilter;
/*
* Expose
*/

View File

@ -100,6 +100,8 @@ function Peer(pool, options) {
this.relay = true;
this.localNonce = utils.nonce();
this.filterRate = -1;
this.addrFilter = new bcoin.bloom.rolling(5000, 0.001);
this.invFilter = new bcoin.bloom.rolling(50000, 0.000001);
this.challenge = null;
this.lastPong = -1;
@ -279,6 +281,8 @@ Peer.prototype.createSocket = function createSocket(port, host) {
*/
Peer.prototype.sendInv = function sendInv(items) {
var self = this;
if (this.destroyed)
return;
@ -313,6 +317,10 @@ Peer.prototype.sendInv = function sendInv(items) {
};
});
items = items.filter(function(msg) {
return self.invFilter.added(msg.hash, 'hex');
});
this.write(this.framer.inv(items));
};
@ -1232,6 +1240,8 @@ Peer.prototype._handleAddr = function handleAddr(addrs) {
if (ts <= 100000000 || ts > now + 10 * 60)
ts = now - 5 * 24 * 60 * 60;
this.addrFilter.add(host, 'ascii');
this.emit('addr', {
version: addr.version,
ts: ts,
@ -1308,6 +1318,9 @@ Peer.prototype._handleGetAddr = function handleGetAddr() {
hosts[seed.host] = true;
if (!this.addrFilter.added(seed.host, 'ascii'))
continue;
items.push({
network: this.network,
ts: seed.ts || ts,
@ -1342,6 +1355,9 @@ Peer.prototype._handleInv = function handleInv(items) {
txs.push(item.hash);
else if (item.type === constants.inv.BLOCK)
blocks.push(item.hash);
else
continue;
this.invFilter.add(item.hash, 'hex');
}
if (blocks.length > 0)

View File

@ -170,8 +170,9 @@ function Pool(options) {
};
this.tx = {
state: {},
count: 0,
filter: !this.mempool
? new bcoin.bloom.rolling(50000, 0.000001)
: null,
type: constants.inv.TX
};
@ -1016,8 +1017,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
for (i = 0; i < txs.length; i++) {
hash = txs[i];
if (self._markTX(hash, 0))
self.getData(peer, self.tx.type, hash);
self.getData(peer, self.tx.type, hash);
}
});
@ -1043,19 +1043,19 @@ Pool.prototype._createPeer = function _createPeer(options) {
Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
var self = this;
var requested, updated;
var requested = this.fulfill(tx);
callback = utils.asyncify(callback);
requested = this.fulfill(tx);
updated = this._markTX(tx, 1);
if (!requested && tx.ts === 0) {
bcoin.debug('Peer sent unrequested tx: %s (%s).',
tx.rhash, peer.hostname);
}
function addMempool(tx, callback) {
if (!self.mempool)
return callback();
return utils.nextTick(callback);
if (tx.ts !== 0)
return callback();
return utils.nextTick(callback);
self.mempool.addTX(tx, callback);
}
@ -1069,11 +1069,7 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
}
}
if (updated || tx.block)
self.emit('tx', tx, peer);
if (self.options.spv && tx.block)
self.emit('watched', tx, peer);
self.emit('tx', tx, peer);
return callback();
});
@ -1208,29 +1204,6 @@ Pool.prototype._addPeer = function _addPeer() {
});
};
Pool.prototype._markTX = function(hash, state) {
if (hash.hash)
hash = hash.hash('hex');
if (this.tx.count >= 5000) {
this.tx.state = {};
this.tx.count = 0;
}
if (this.tx.state[hash] == null) {
this.tx.state[hash] = state;
this.tx.count++;
return true;
}
if (this.tx.state[hash] < state) {
this.tx.state[hash] = state;
return true;
}
return false;
};
Pool.prototype.bestPeer = function bestPeer() {
return this.peers.regular.reduce(function(best, peer) {
if (!peer.version || !peer.socket)
@ -1567,8 +1540,12 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
}
if (type === this.tx.type) {
if (!this.mempool)
return utils.asyncify(done)(null, false);
if (!this.mempool) {
done = utils.asyncify(done);
if (this.tx.filter.added(hash, 'hex'))
return done(null, false);
return done(null, true);
}
return this.mempool.has(hash, done);
}