net: use Map object for requested items.

This commit is contained in:
Christopher Jeffrey 2017-01-07 00:19:40 -08:00
parent bea77b0ec7
commit fb0b2b53d7
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 91 additions and 167 deletions

View File

@ -14,6 +14,7 @@ var NetAddress = require('../primitives/netaddress');
var List = require('../utils/list');
var murmur3 = require('../utils/murmur3');
var StaticWriter = require('../utils/staticwriter');
var Map = require('../utils/map');
/**
* Host List
@ -71,7 +72,7 @@ HostList.prototype._init = function init() {
var i;
for (i = 0; i < this.maxBuckets; i++)
this.fresh.push(new MapBucket());
this.fresh.push(new Map());
for (i = 0; i < this.maxBuckets; i++)
this.used.push(new List());
@ -219,7 +220,7 @@ HostList.prototype.getHost = function getHost() {
* Get fresh bucket for host.
* @private
* @param {HostEntry} entry
* @returns {MapBucket}
* @returns {Map}
*/
HostList.prototype.freshBucket = function freshBucket(entry) {
@ -343,7 +344,7 @@ HostList.prototype.add = function add(addr, src) {
/**
* Evict a host from fresh bucket.
* @param {MapBucket} bucket
* @param {Map} bucket
*/
HostList.prototype.evictFresh = function evictFresh(bucket) {
@ -806,97 +807,6 @@ HostList.fromJSON = function fromJSON(options, json) {
return new HostEntry(options).fromJSON(json);
};
/**
* MapBucket
* @constructor
*/
function MapBucket() {
if (!(this instanceof MapBucket))
return new MapBucket();
this.map = {};
this.size = 0;
}
/**
* Get map keys.
* @returns {String[]}
*/
MapBucket.prototype.keys = function keys() {
return Object.keys(this.map);
};
/**
* Get item from map.
* @param {String} key
* @returns {Object|null}
*/
MapBucket.prototype.get = function get(key) {
return this.map[key];
};
/**
* Test whether map has an item.
* @param {String} key
* @returns {Boolean}
*/
MapBucket.prototype.has = function has(key) {
return this.map[key] !== undefined;
};
/**
* Set a key to value in map.
* @param {String} key
* @param {Object} value
* @returns {Boolean}
*/
MapBucket.prototype.set = function set(key, value) {
var item = this.map[key];
assert(value !== undefined);
this.map[key] = value;
if (item === undefined) {
this.size++;
return true;
}
return false;
};
/**
* Remove an item from map.
* @param {String} key
* @returns {Object|null}
*/
MapBucket.prototype.remove = function remove(key) {
var item = this.map[key];
if (item === undefined)
return;
delete this.map[key];
this.size--;
return item;
};
/**
* Reset the map.
*/
MapBucket.prototype.reset = function reset() {
this.map = {};
this.size = 0;
};
/**
* HostEntry
* @constructor

View File

@ -11,6 +11,7 @@ var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var util = require('../utils/util');
var co = require('../utils/co');
var Map = require('../utils/map');
var Parser = require('./parser');
var Framer = require('./framer');
var packets = require('./packets');
@ -113,8 +114,6 @@ function Peer(pool) {
this.bip150 = null;
this.compactMode = -1;
this.compactWitness = false;
this.compactBlocks = {};
this.compactAmount = 0;
this.lastMerkle = null;
this.waitingTX = 0;
this.syncSent = false;
@ -134,10 +133,10 @@ function Peer(pool) {
this.addrFilter = new Bloom.Rolling(5000, 0.001);
this.invFilter = new Bloom.Rolling(50000, 0.000001);
this.requestMap = {};
this.queueMap = {};
this.responseMap = {};
this.requestMap = new Map();
this.blockQueue = [];
this.compactBlocks = new Map();
this.responseMap = new Map();
if (this.options.bip151) {
this.bip151 = new BIP151();
@ -696,12 +695,12 @@ Peer.prototype.announceTX = function announceTX(txs) {
Peer.prototype.announceList = function announceList() {
var blocks = [];
var txs = [];
var hashes = Object.keys(this.pool.invMap);
var hashes = this.pool.invMap.keys();
var i, hash, item;
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
item = this.pool.invMap[hash];
item = this.pool.invMap.get(hash);
switch (item.type) {
case invTypes.BLOCK:
@ -944,17 +943,16 @@ Peer.prototype.destroy = function destroy() {
this.connectTimeout = null;
}
keys = Object.keys(this.responseMap);
keys = this.responseMap.keys();
for (i = 0; i < keys.length; i++) {
cmd = keys[i];
entry = this.responseMap[cmd];
delete this.responseMap[cmd];
entry = this.responseMap.get(cmd);
this.responseMap.remove(cmd);
entry.reject(new Error('Peer was destroyed.'));
}
this.compactBlocks = {};
this.compactAmount = 0;
this.compactBlocks.reset();
this.locker.destroy();
@ -1033,9 +1031,12 @@ Peer.prototype.addTimeout = function addTimeout(packet) {
switch (packet.type) {
case packetTypes.MEMPOOL:
case packetTypes.GETBLOCKS:
this.request(packetTypes.INV, timeout);
break;
case packetTypes.GETBLOCKS:
if (!this.chain.synced)
this.request(packetTypes.INV, timeout);
break;
case packetTypes.GETHEADERS:
this.request(packetTypes.HEADERS, timeout * 2);
break;
@ -1077,13 +1078,13 @@ Peer.prototype.fulfill = function fulfill(packet) {
*/
Peer.prototype.maybeTimeout = function maybeTimeout() {
var keys = Object.keys(this.responseMap);
var keys = this.responseMap.keys();
var now = util.ms();
var i, key, entry, name;
for (i = 0; i < keys.length; i++) {
key = keys[i];
entry = this.responseMap[key];
entry = this.responseMap.get(key);
if (now > entry.timeout) {
name = packets.typesByVal[key];
this.error('Peer is stalling (%s).', name.toLowerCase());
@ -1113,14 +1114,14 @@ Peer.prototype.maybeTimeout = function maybeTimeout() {
*/
Peer.prototype.request = function request(type, timeout) {
var entry = this.responseMap[type];
var entry = this.responseMap.get(type);
if (this.destroyed)
return;
if (!entry) {
entry = new RequestEntry();
this.responseMap[type] = entry;
this.responseMap.set(type, entry);
}
entry.setTimeout(timeout);
@ -1136,12 +1137,12 @@ Peer.prototype.request = function request(type, timeout) {
*/
Peer.prototype.response = function response(type, payload) {
var entry = this.responseMap[type];
var entry = this.responseMap.get(type);
if (!entry)
return;
delete this.responseMap[type];
this.responseMap.remove(type);
return entry;
};
@ -1880,7 +1881,7 @@ Peer.prototype.handleMempool = co(function* handleMempool(packet) {
Peer.prototype.getBroadcasted = function getBroadcasted(item) {
var type = item.isTX() ? invTypes.TX : invTypes.BLOCK;
var entry = this.pool.invMap[item.hash];
var entry = this.pool.invMap.get(item.hash);
if (!entry)
return;
@ -2420,7 +2421,7 @@ Peer.prototype.handleReject = co(function* handleReject(reject) {
if (!reject.hash)
return;
entry = this.pool.invMap[reject.hash];
entry = this.pool.invMap.get(reject.hash);
if (!entry)
return;
@ -2598,7 +2599,7 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) {
return;
}
if (this.compactBlocks[hash]) {
if (this.compactBlocks.has(hash)) {
this.logger.debug(
'Peer sent us a duplicate compact block (%s).',
this.hostname);
@ -2624,14 +2625,13 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) {
return;
}
if (this.compactAmount >= 10) {
if (this.compactBlocks.size >= 10) {
this.logger.warning('Compact block DoS attempt (%s).', this.hostname);
this.destroy();
return;
}
this.compactBlocks[hash] = block;
this.compactAmount++;
this.compactBlocks.set(hash, block);
this.send(new packets.GetBlockTxnPacket(block.toRequest()));
@ -2695,17 +2695,15 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) {
Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) {
var res = packet.response;
var block = this.compactBlocks[res.hash];
var block = this.compactBlocks.get(res.hash);
if (!block) {
this.logger.debug('Peer sent unsolicited blocktxn (%s).', this.hostname);
this.compactBlocks = {};
this.compactAmount = 0;
this.compactBlocks.reset();
return;
}
delete this.compactBlocks[res.hash];
this.compactAmount--;
this.compactBlocks.remove(res.hash);
if (!block.fillMissing(res)) {
this.increaseBan(100);

View File

@ -29,6 +29,7 @@ var tcp = require('./tcp');
var dns = require('./dns');
var HostList = require('./hostlist');
var InvItem = require('../primitives/invitem');
var Map = require('../utils/map');
var invTypes = InvItem.types;
var VerifyError = errors.VerifyError;
var VerifyResult = errors.VerifyResult;
@ -137,11 +138,11 @@ function Pool(options) {
this.txFilter = null;
// Requested objects.
this.requestMap = {};
this.activeRequests = 0;
this.requestMap = new Map();
this.queueMap = new Map();
// Currently broadcasted objects.
this.invMap = {};
this.invMap = new Map();
this.invTimeout = 60000;
this.scheduled = false;
@ -382,18 +383,17 @@ Pool.prototype._close = co(function* close() {
this.stopSync();
hashes = Object.keys(this.invMap);
hashes = this.invMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
item = this.invMap[hash];
item = this.invMap.get(hash);
item.resolve();
}
this.peers.destroy();
this.requestMap = {};
this.activeRequests = 0;
this.requestMap.reset();
this.hosts.reset();
@ -1012,8 +1012,8 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
(this.chain.getProgress() * 100).toFixed(2) + '%',
this.chain.total,
this.chain.orphanCount,
this.activeRequests,
0,
this.requestMap.size,
this.queueMap.size,
block.bits,
this.peers.size(),
this.chain.locker.pending,
@ -1520,12 +1520,22 @@ Pool.prototype.removePeer = function removePeer(peer) {
this.peers.remove(peer);
hashes = Object.keys(peer.requestMap);
hashes = peer.requestMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
this.fulfill(peer, hash);
}
hashes = peer.blockQueue;
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
assert(this.queueMap.has(hash));
this.queueMap.remove(hash);
}
peer.blockQueue.length = 0;
};
/**
@ -1650,10 +1660,14 @@ Pool.prototype.getBlock = function getBlock(peer, hash) {
if (peer.destroyed)
throw new Error('Peer is destroyed (getdata).');
if (this.requestMap[hash])
if (this.requestMap.has(hash))
return;
peer.queueMap[hash] = true;
if (this.queueMap.has(hash))
return;
this.queueMap.insert(hash);
peer.blockQueue.push(hash);
};
/**
@ -1668,7 +1682,11 @@ Pool.prototype.hasBlock = co(function* hasBlock(hash) {
return true;
// Check the pending requests.
if (this.requestMap[hash])
if (this.requestMap.has(hash))
return true;
// Check the queued requests.
if (this.queueMap.has(hash))
return true;
return false;
@ -1701,12 +1719,10 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
if (this.hasTX(hash))
continue;
assert(!this.requestMap[hash]);
assert(!this.requestMap.has(hash));
this.requestMap[hash] = true;
peer.requestMap[hash] = true;
this.activeRequests++;
this.requestMap.insert(hash);
peer.requestMap.insert(hash);
items.push(hash);
}
@ -1739,7 +1755,7 @@ Pool.prototype.hasTX = function hasTX(hash) {
}
// Check the pending requests.
if (this.requestMap[hash])
if (this.requestMap.has(hash))
return true;
return false;
@ -1771,39 +1787,39 @@ Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) {
*/
Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) {
var queue = Object.keys(peer.queueMap);
var queue = peer.blockQueue;
var hashes = [];
var i, size, hash;
if (peer.destroyed)
return;
if (queue.length === 0)
return;
if (this.options.spv) {
if (this.activeRequests >= 2000)
if (this.requestMap.size >= 2000)
return;
size = Math.min(queue.length, 50000);
} else {
size = this.network.getBatchSize(this.chain.height);
if (this.activeRequests >= size)
if (this.requestMap.size >= size)
return;
}
for (i = 0; i < queue.length; i++) {
hash = queue[i];
delete peer.queueMap[hash];
assert(this.queueMap.has(hash));
if (this.requestMap[hash])
continue;
this.queueMap.remove(hash);
assert(!this.requestMap[hash]);
assert(!this.requestMap.has(hash));
this.requestMap[hash] = true;
peer.requestMap[hash] = true;
this.activeRequests++;
this.requestMap.insert(hash);
peer.requestMap.insert(hash);
hashes.push(hash);
@ -1811,10 +1827,12 @@ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) {
break;
}
peer.blockQueue = queue.slice(hashes.length);
this.logger.debug(
'Requesting %d/%d blocks from peer with getdata (%s).',
hashes.length,
this.activeRequests,
this.requestMap.size,
peer.hostname);
peer.getBlock(hashes);
@ -1828,15 +1846,13 @@ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) {
*/
Pool.prototype.fulfill = function fulfill(peer, hash) {
if (!peer.requestMap[hash])
if (!peer.requestMap.has(hash))
return false;
delete peer.requestMap[hash];
peer.requestMap.remove(hash);
assert(this.requestMap[hash]);
delete this.requestMap[hash];
this.activeRequests--;
assert(this.requestMap.has(hash));
this.requestMap.remove(hash);
return true;
};
@ -1849,7 +1865,7 @@ Pool.prototype.fulfill = function fulfill(peer, hash) {
Pool.prototype.broadcast = function broadcast(msg) {
var hash = msg.hash('hex');
var item = this.invMap[hash];
var item = this.invMap.get(hash);
if (item) {
item.refresh();
@ -2148,9 +2164,9 @@ BroadcastItem.prototype.addJob = function addJob(resolve, reject) {
BroadcastItem.prototype.start = function start() {
assert(!this.timeout, 'Already started.');
assert(!this.pool.invMap[this.hash], 'Already started.');
assert(!this.pool.invMap.has(this.hash), 'Already started.');
this.pool.invMap[this.hash] = this;
this.pool.invMap.set(this.hash, this);
this.refresh();
@ -2199,12 +2215,12 @@ BroadcastItem.prototype.announce = function announce() {
BroadcastItem.prototype.cleanup = function cleanup() {
assert(this.timeout != null, 'Already finished.');
assert(this.pool.invMap[this.hash], 'Already finished.');
assert(this.pool.invMap.has(this.hash), 'Already finished.');
clearTimeout(this.timeout);
this.timeout = null;
delete this.pool.invMap[this.hash];
this.pool.invMap.remove(this.hash);
};
/**