pool/peer: split request map into two maps.

This commit is contained in:
Christopher Jeffrey 2017-03-07 19:35:02 -08:00
parent 2bbeb40ac5
commit 871225bbe4
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 125 additions and 47 deletions

View File

@ -543,7 +543,7 @@ RPC.prototype.getpeerinfo = co(function* getpeerinfo(args, help) {
besthash: peer.bestHash ? util.revHex(peer.bestHash) : null,
bestheight: peer.bestHeight,
banscore: peer.banScore,
inflight: peer.requestMap.keys().map(util.revHex),
inflight: peer.blockMap.keys().map(util.revHex),
whitelisted: false
});
}

View File

@ -137,7 +137,8 @@ function Peer(options) {
this.addrFilter = new Bloom.Rolling(5000, 0.001);
this.invFilter = new Bloom.Rolling(50000, 0.000001);
this.requestMap = new Map();
this.blockMap = new Map();
this.txMap = new Map();
this.responseMap = new Map();
this.compactBlocks = new Map();
@ -195,8 +196,7 @@ Peer.RESPONSE_TIMEOUT = 30000;
/**
* Required time for loader to
* respond with block/merkleblock
* during initial sync.
* respond with block/merkleblock.
* @const {Number}
* @default
*/
@ -204,12 +204,13 @@ Peer.RESPONSE_TIMEOUT = 30000;
Peer.BLOCK_TIMEOUT = 120000;
/**
* Max number of requested items.
* Required time for loader to
* respond with a tx.
* @const {Number}
* @default
*/
Peer.MAX_REQUESTS = 5000;
Peer.TX_TIMEOUT = 120000;
/**
* Generic timeout interval.
@ -1317,24 +1318,30 @@ Peer.prototype.maybeTimeout = function maybeTimeout() {
}
}
if (this.options.isFull()) {
if (this.requestMap.size > Peer.MAX_REQUESTS) {
this.error('Peer is stalling (data).');
this.destroy();
return;
}
keys = this.requestMap.keys();
if (this.options.isFull() || !this.syncing) {
keys = this.blockMap.keys();
for (i = 0; i < keys.length; i++) {
key = keys[i];
ts = this.requestMap.get(key);
ts = this.blockMap.get(key);
if (now > ts + Peer.BLOCK_TIMEOUT) {
this.error('Peer is stalling (block).');
this.destroy();
return;
}
}
keys = this.txMap.keys();
for (i = 0; i < keys.length; i++) {
key = keys[i];
ts = this.txMap.get(key);
if (now > ts + Peer.TX_TIMEOUT) {
this.error('Peer is stalling (tx).');
this.destroy();
return;
}
}
}
if (now > this.ts + 60000) {

View File

@ -93,7 +93,8 @@ function Pool(options) {
this.syncing = false;
this.spvFilter = null;
this.txFilter = null;
this.requestMap = new Map();
this.blockMap = new Map();
this.txMap = new Map();
this.compactBlocks = new Map();
this.invMap = new Map();
this.pendingFilter = null;
@ -173,7 +174,7 @@ Pool.prototype._init = function _init() {
});
this.chain.on('full', function() {
self.resync();
self.sync();
self.emit('full');
self.logger.info('Chain is fully synced (height=%d).', self.chain.height);
});
@ -356,7 +357,8 @@ Pool.prototype._disconnect = co(function* disconnect() {
this.peers.destroy();
this.requestMap.reset();
this.blockMap.reset();
this.txMap.reset();
if (this.pendingFilter != null) {
clearTimeout(this.pendingFilter);
@ -702,6 +704,14 @@ Pool.prototype.forceSync = function forceSync() {
this.resync(true);
};
/**
* Send a sync to each peer.
*/
Pool.prototype.sync = function* sync(force) {
this.resync(false);
};
/**
* Stop the sync.
* @private
@ -718,8 +728,14 @@ Pool.prototype.stopSync = function stopSync() {
for (peer = this.peers.head(); peer; peer = peer.next) {
if (!peer.outbound)
continue;
peer.syncing = false;
peer.blockMap.reset();
peer.compactBlocks.reset();
}
this.blockMap.reset();
this.compactBlocks.reset();
};
/**
@ -1375,7 +1391,7 @@ Pool.prototype.handleOpen = co(function* handleOpen(peer) {
Pool.prototype.handleClose = co(function* handleClose(peer, connected) {
var outbound = peer.outbound;
var loader = peer.loader;
var size = peer.requestMap.size;
var size = peer.blockMap.size;
this.removePeer(peer);
@ -1395,9 +1411,9 @@ Pool.prototype.handleClose = co(function* handleClose(peer, connected) {
if (this.disconnecting)
return;
if (this.chain.synced && !loader && size > 0) {
this.logger.debug('Peer with requested blocks disconnected.');
this.logger.debug('Resending sync...');
if (this.chain.synced && size > 0) {
this.logger.warning('Peer disconnected with requested blocks.');
this.logger.warning('Resending sync...');
this.forceSync();
}
@ -1918,7 +1934,14 @@ Pool.prototype.handleNotFound = co(function* handleNotFound(peer, packet) {
for (i = 0; i < items.length; i++) {
item = items[i];
this.fulfill(peer, item.hash);
if (!this.resolveItem(peer, item)) {
this.logger.warning(
'Peer sent notfound for unrequested item: %s (%s).',
item.hash, peer.hostname());
peer.destroy();
return;
}
}
});
@ -2222,7 +2245,7 @@ Pool.prototype._addBlock = co(function* addBlock(peer, block, flags) {
if (!this.syncing)
return;
if (!this.fulfill(peer, hash)) {
if (!this.resolveBlock(peer, hash)) {
this.logger.warning(
'Received unrequested block: %s (%s).',
block.rhash(), peer.hostname());
@ -2380,7 +2403,7 @@ Pool.prototype.logStatus = function logStatus(block) {
(this.chain.getProgress() * 100).toFixed(2) + '%',
this.chain.total,
this.chain.orphanCount,
this.requestMap.size,
this.blockMap.size,
block.bits,
this.peers.size(),
this.locker.jobs.length);
@ -2442,7 +2465,7 @@ Pool.prototype._handleTX = co(function* handleTX(peer, packet) {
}
}
if (!this.fulfill(peer, hash)) {
if (!this.resolveTX(peer, hash)) {
this.logger.warning(
'Peer sent unrequested tx: %s (%s).',
tx.txid(), peer.hostname());
@ -2627,7 +2650,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet)
return;
}
if (!peer.requestMap.has(hash)) {
if (!peer.blockMap.has(hash)) {
this.logger.warning(
'Peer sent an unrequested merkleblock (%s).',
peer.hostname());
@ -2733,7 +2756,7 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) {
return;
}
if (!peer.requestMap.has(hash)) {
if (!peer.blockMap.has(hash)) {
if (this.options.blockMode !== 1) {
this.logger.warning(
'Peer sent us an unrequested compact block (%s).',
@ -2741,9 +2764,9 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) {
peer.destroy();
return;
}
peer.requestMap.set(hash, util.ms());
assert(!this.requestMap.has(hash));
this.requestMap.insert(hash);
peer.blockMap.set(hash, util.ms());
assert(!this.blockMap.has(hash));
this.blockMap.insert(hash);
}
if (!this.mempool) {
@ -3132,11 +3155,18 @@ Pool.prototype.removePeer = function removePeer(peer) {
this.peers.remove(peer);
hashes = peer.requestMap.keys();
hashes = peer.blockMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
this.fulfill(peer, hash);
this.resolveBlock(peer, hash);
}
hashes = peer.txMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
this.resolveTX(peer, hash);
}
hashes = peer.compactBlocks.keys();
@ -3340,11 +3370,14 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.requestMap.has(hash))
if (this.blockMap.has(hash))
continue;
this.requestMap.insert(hash);
peer.requestMap.set(hash, now);
this.blockMap.insert(hash);
peer.blockMap.set(hash, now);
if (this.chain.synced)
now += 100;
items.push(hash);
}
@ -3355,7 +3388,7 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) {
this.logger.debug(
'Requesting %d/%d blocks from peer with getdata (%s).',
items.length,
this.requestMap.size,
this.blockMap.size,
peer.hostname());
peer.getBlock(items);
@ -3384,11 +3417,13 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.requestMap.has(hash))
if (this.txMap.has(hash))
continue;
this.requestMap.insert(hash);
peer.requestMap.set(hash, now);
this.txMap.insert(hash);
peer.txMap.set(hash, now);
now += 50;
items.push(hash);
}
@ -3399,7 +3434,7 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
this.logger.debug(
'Requesting %d/%d txs from peer with getdata (%s).',
items.length,
this.requestMap.size,
this.txMap.size,
peer.hostname());
peer.getTX(items);
@ -3479,24 +3514,60 @@ Pool.prototype.ensureTX = function ensureTX(peer, hashes) {
};
/**
* Fulfill a requested item.
* Fulfill a requested tx.
* @param {Peer} peer
* @param {Hash} hash
* @returns {Boolean}
*/
Pool.prototype.fulfill = function fulfill(peer, hash) {
if (!peer.requestMap.has(hash))
Pool.prototype.resolveTX = function resolveTX(peer, hash) {
if (!peer.txMap.has(hash))
return false;
peer.requestMap.remove(hash);
peer.txMap.remove(hash);
assert(this.requestMap.has(hash));
this.requestMap.remove(hash);
assert(this.txMap.has(hash));
this.txMap.remove(hash);
return true;
};
/**
* Fulfill a requested block.
* @param {Peer} peer
* @param {Hash} hash
* @returns {Boolean}
*/
Pool.prototype.resolveBlock = function resolveBlock(peer, hash) {
if (!peer.blockMap.has(hash))
return false;
peer.blockMap.remove(hash);
assert(this.blockMap.has(hash));
this.blockMap.remove(hash);
return true;
};
/**
* Fulfill a requested item.
* @param {Peer} peer
* @param {InvItem} item
* @returns {Boolean}
*/
Pool.prototype.resolveItem = function resolveItem(peer, item) {
if (item.isBlock())
return this.resolveBlock(peer, item.hash);
if (item.isTX())
return this.resolveTX(peer, item.hash);
return false;
};
/**
* Broadcast a transaction or block.
* @param {TX|Block} msg