add rejects filter.

This commit is contained in:
Christopher Jeffrey 2016-05-20 06:49:38 -07:00
parent d9d18f2be7
commit 8e5cdbdfcd
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 102 additions and 32 deletions

View File

@ -43,6 +43,9 @@ function Bloom(size, n, tweak, update) {
this.reset();
}
if (tweak == null)
tweak = (Math.random() * 0x100000000) >>> 0;
if (update == null)
update = constants.filterFlags.NONE;
@ -166,16 +169,13 @@ Bloom.fromRate = function fromRate(items, rate, tweak, update) {
n = (size / items * LN2) | 0;
n = Math.min(n, constants.bloom.MAX_HASH_FUNCS);
if (tweak == null)
tweak = Math.random() * 0x100000000 | 0;
return new Bloom(size, n, tweak, update);
};
/**
* A bloom filter that will reset itself
* once the max number of items is reached.
* @exports Bloom
* @exports RollingFilter
* @constructor
* @param {Number} items - Expected number of items.
* @param {Number} rate - False positive rate.
@ -198,7 +198,11 @@ function RollingFilter(items, rate) {
*/
RollingFilter.prototype.reset = function reset() {
if (this.count === 0)
return;
this.count = 0;
return this.filter.reset();
};
@ -225,6 +229,9 @@ RollingFilter.prototype.add = function add(val, enc) {
*/
RollingFilter.prototype.test = function test(val, enc) {
if (this.count === 0)
return false;
return this.filter.test(val, enc);
};

View File

@ -1345,18 +1345,20 @@ Peer.prototype._handleGetAddr = function handleGetAddr() {
Peer.prototype._handleInv = function handleInv(items) {
var blocks = [];
var txs = [];
var item, i;
var i, item, unknown;
this.emit('inv', items);
for (i = 0; i < items.length; i++) {
item = items[i];
if (item.type === constants.inv.TX)
if (item.type === constants.inv.TX) {
txs.push(item.hash);
else if (item.type === constants.inv.BLOCK)
} else if (item.type === constants.inv.BLOCK) {
blocks.push(item.hash);
else
} else {
unknown = item.type;
continue;
}
this.invFilter.add(item.hash, 'hex');
}
@ -1365,6 +1367,9 @@ Peer.prototype._handleInv = function handleInv(items) {
if (txs.length > 0)
this.emit('txs', txs);
if (unknown != null)
bcoin.debug('Peer sent an unknown inv type: %d (%s).', unknown);
};
Peer.prototype._handleHeaders = function handleHeaders(headers) {

View File

@ -138,11 +138,7 @@ function Pool(options) {
this.watchMap = {};
this.bloom = new bcoin.bloom(
8 * 1024,
10,
(Math.random() * 0xffffffff) | 0
);
this.bloom = new bcoin.bloom(8 * 1024, 10);
this.peers = {
// Peers that are loading blocks themselves
@ -176,6 +172,8 @@ function Pool(options) {
type: constants.inv.TX
};
this.rejects = new bcoin.bloom.rolling(120000, 0.000001);
if (this.options.witness) {
this.block.type |= constants.WITNESS_MASK;
this.tx.type |= constants.WITNESS_MASK;
@ -823,12 +821,13 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
callback = utils.ensure(callback);
// Fulfill our request.
requested = self.fulfill(block);
// Fulfill the load request.
requested = this.fulfill(block);
// Someone is sending us blocks without
// us requesting them.
if (!requested) {
peer.invFilter.add(block.hash());
bcoin.debug(
'Received unrequested block: %s (%s).',
block.rhash, peer.hostname);
@ -845,6 +844,8 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (err.score >= 0)
peer.sendReject(block, err.code, err.reason, err.score);
self.rejects.add(block.hash());
if (err.reason === 'bad-prevblk' && peer === self.peers.load) {
self.resolveOrphan(peer, null, block.hash('hex'), function(e) {
self.scheduleRequests(peer);
@ -1043,11 +1044,28 @@ Pool.prototype._createPeer = function _createPeer(options) {
Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
var self = this;
var requested = this.fulfill(tx);
var requested;
callback = utils.ensure(callback);
// Fulfill the load request.
requested = this.fulfill(tx);
if (!requested && tx.ts === 0) {
peer.invFilter.add(tx.hash());
if (!this.mempool)
this.tx.filter.add(tx.hash());
bcoin.debug('Peer sent unrequested tx: %s (%s).',
tx.rhash, peer.hostname);
if (this.rejects.test(tx.hash())) {
return callback(new VerifyError(tx,
'alreadyknown',
'txn-already-known',
0));
}
}
function addMempool(tx, callback) {
@ -1065,7 +1083,8 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
if (err.type === 'VerifyError') {
if (err.score >= 0)
peer.sendReject(tx, err.code, err.reason, err.score);
return callback();
self.rejects.add(tx.hash());
return callback(err);
}
}
@ -1494,16 +1513,13 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
if (typeof options === 'boolean')
options = { force: options };
function done(err, exists) {
this.has(type, hash, options.force, function(err, exists) {
if (err)
return callback(err);
if (exists)
return callback(null, true);
if (self.request.map[hash])
return callback(null, true);
item = new LoadRequest(self, peer, type, hash);
if (options.noQueue)
@ -1531,25 +1547,65 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
peer.queue.block.push(item);
return callback(null, false);
});
};
/**
* Test whether the pool has or has seen an item.
* @param {InvType} type
* @param {Hash} hash
* @param {Function} callback - Returns [Error, Boolean].
*/
Pool.prototype.has = function has(type, hash, force, callback) {
if (!callback) {
callback = force;
force = false;
}
if (options.force) {
return utils.nextTick(function() {
return done(null, false);
});
if (force) {
callback = utils.asyncify(callback);
return callback(null, false);
}
// Check the pending requests.
if (this.request.map[hash]) {
callback = utils.asyncify(callback);
return callback(null, true);
}
// We need to reset the rejects filter periodically.
// There may be a locktime in a TX that is now valid.
if (this.rejects.tip !== this.chain.tip.hash) {
this.rejects.tip = this.chain.tip.hash;
this.rejects.reset();
} else {
// If we recently rejected this item. Ignore.
if (this.rejects.test(hash, 'hex')) {
callback = utils.asyncify(callback);
bcoin.debug(
'Peer sent a known reject: %s (%s).',
hash, peer.hostname);
return callback(null, true);
}
}
if (type === this.tx.type) {
// Check the TX filter if
// we don't have a mempool.
if (!this.mempool) {
done = utils.asyncify(done);
callback = utils.asyncify(callback);
if (this.tx.filter.added(hash, 'hex'))
return done(null, false);
return done(null, true);
return callback(null, false);
return callback(null, true);
}
return this.mempool.has(hash, done);
// Check the mempool.
return this.mempool.has(hash, callback);
}
return this.chain.has(hash, done);
// Check the chain.
return this.chain.has(hash, callback);
};
/**
@ -1976,6 +2032,8 @@ Pool.prototype.reject = function reject(peer, obj, code, reason, score) {
reason: reason,
data: obj.hash()
});
this.rejects.add(obj.hash());
} else {
bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.',
peer.hostname, code, reason);

View File

@ -1340,8 +1340,8 @@ utils.U64 = new bn('ffffffffffffffff', 'hex');
utils.nonce = function _nonce() {
var nonce = new Buffer(8);
nonce.writeUInt32LE(Math.random() * 0x100000000 | 0, 0, true);
nonce.writeUInt32LE(Math.random() * 0x100000000 | 0, 4, true);
nonce.writeUInt32LE((Math.random() * 0x100000000) >>> 0, 0, true);
nonce.writeUInt32LE((Math.random() * 0x100000000) >>> 0, 4, true);
return new bn(nonce);
};