From fb0b2b53d7729162a8395c59ed1cd0380858919a Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sat, 7 Jan 2017 00:19:40 -0800 Subject: [PATCH] net: use Map object for requested items. --- lib/net/hostlist.js | 98 ++----------------------------------------- lib/net/peer.js | 60 +++++++++++++------------- lib/net/pool.js | 100 +++++++++++++++++++++++++------------------- 3 files changed, 91 insertions(+), 167 deletions(-) diff --git a/lib/net/hostlist.js b/lib/net/hostlist.js index 6e2a8dbe..2b5a2900 100644 --- a/lib/net/hostlist.js +++ b/lib/net/hostlist.js @@ -14,6 +14,7 @@ var NetAddress = require('../primitives/netaddress'); var List = require('../utils/list'); var murmur3 = require('../utils/murmur3'); var StaticWriter = require('../utils/staticwriter'); +var Map = require('../utils/map'); /** * Host List @@ -71,7 +72,7 @@ HostList.prototype._init = function init() { var i; for (i = 0; i < this.maxBuckets; i++) - this.fresh.push(new MapBucket()); + this.fresh.push(new Map()); for (i = 0; i < this.maxBuckets; i++) this.used.push(new List()); @@ -219,7 +220,7 @@ HostList.prototype.getHost = function getHost() { * Get fresh bucket for host. * @private * @param {HostEntry} entry - * @returns {MapBucket} + * @returns {Map} */ HostList.prototype.freshBucket = function freshBucket(entry) { @@ -343,7 +344,7 @@ HostList.prototype.add = function add(addr, src) { /** * Evict a host from fresh bucket. - * @param {MapBucket} bucket + * @param {Map} bucket */ HostList.prototype.evictFresh = function evictFresh(bucket) { @@ -806,97 +807,6 @@ HostList.fromJSON = function fromJSON(options, json) { return new HostEntry(options).fromJSON(json); }; -/** - * MapBucket - * @constructor - */ - -function MapBucket() { - if (!(this instanceof MapBucket)) - return new MapBucket(); - - this.map = {}; - this.size = 0; -} - -/** - * Get map keys. - * @returns {String[]} - */ - -MapBucket.prototype.keys = function keys() { - return Object.keys(this.map); -}; - -/** - * Get item from map. - * @param {String} key - * @returns {Object|null} - */ - -MapBucket.prototype.get = function get(key) { - return this.map[key]; -}; - -/** - * Test whether map has an item. - * @param {String} key - * @returns {Boolean} - */ - -MapBucket.prototype.has = function has(key) { - return this.map[key] !== undefined; -}; - -/** - * Set a key to value in map. - * @param {String} key - * @param {Object} value - * @returns {Boolean} - */ - -MapBucket.prototype.set = function set(key, value) { - var item = this.map[key]; - - assert(value !== undefined); - - this.map[key] = value; - - if (item === undefined) { - this.size++; - return true; - } - - return false; -}; - -/** - * Remove an item from map. - * @param {String} key - * @returns {Object|null} - */ - -MapBucket.prototype.remove = function remove(key) { - var item = this.map[key]; - - if (item === undefined) - return; - - delete this.map[key]; - this.size--; - - return item; -}; - -/** - * Reset the map. - */ - -MapBucket.prototype.reset = function reset() { - this.map = {}; - this.size = 0; -}; - /** * HostEntry * @constructor diff --git a/lib/net/peer.js b/lib/net/peer.js index f20773c1..c60f2d16 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -11,6 +11,7 @@ var assert = require('assert'); var EventEmitter = require('events').EventEmitter; var util = require('../utils/util'); var co = require('../utils/co'); +var Map = require('../utils/map'); var Parser = require('./parser'); var Framer = require('./framer'); var packets = require('./packets'); @@ -113,8 +114,6 @@ function Peer(pool) { this.bip150 = null; this.compactMode = -1; this.compactWitness = false; - this.compactBlocks = {}; - this.compactAmount = 0; this.lastMerkle = null; this.waitingTX = 0; this.syncSent = false; @@ -134,10 +133,10 @@ function Peer(pool) { this.addrFilter = new Bloom.Rolling(5000, 0.001); this.invFilter = new Bloom.Rolling(50000, 0.000001); - this.requestMap = {}; - this.queueMap = {}; - - this.responseMap = {}; + this.requestMap = new Map(); + this.blockQueue = []; + this.compactBlocks = new Map(); + this.responseMap = new Map(); if (this.options.bip151) { this.bip151 = new BIP151(); @@ -696,12 +695,12 @@ Peer.prototype.announceTX = function announceTX(txs) { Peer.prototype.announceList = function announceList() { var blocks = []; var txs = []; - var hashes = Object.keys(this.pool.invMap); + var hashes = this.pool.invMap.keys(); var i, hash, item; for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - item = this.pool.invMap[hash]; + item = this.pool.invMap.get(hash); switch (item.type) { case invTypes.BLOCK: @@ -944,17 +943,16 @@ Peer.prototype.destroy = function destroy() { this.connectTimeout = null; } - keys = Object.keys(this.responseMap); + keys = this.responseMap.keys(); for (i = 0; i < keys.length; i++) { cmd = keys[i]; - entry = this.responseMap[cmd]; - delete this.responseMap[cmd]; + entry = this.responseMap.get(cmd); + this.responseMap.remove(cmd); entry.reject(new Error('Peer was destroyed.')); } - this.compactBlocks = {}; - this.compactAmount = 0; + this.compactBlocks.reset(); this.locker.destroy(); @@ -1033,9 +1031,12 @@ Peer.prototype.addTimeout = function addTimeout(packet) { switch (packet.type) { case packetTypes.MEMPOOL: - case packetTypes.GETBLOCKS: this.request(packetTypes.INV, timeout); break; + case packetTypes.GETBLOCKS: + if (!this.chain.synced) + this.request(packetTypes.INV, timeout); + break; case packetTypes.GETHEADERS: this.request(packetTypes.HEADERS, timeout * 2); break; @@ -1077,13 +1078,13 @@ Peer.prototype.fulfill = function fulfill(packet) { */ Peer.prototype.maybeTimeout = function maybeTimeout() { - var keys = Object.keys(this.responseMap); + var keys = this.responseMap.keys(); var now = util.ms(); var i, key, entry, name; for (i = 0; i < keys.length; i++) { key = keys[i]; - entry = this.responseMap[key]; + entry = this.responseMap.get(key); if (now > entry.timeout) { name = packets.typesByVal[key]; this.error('Peer is stalling (%s).', name.toLowerCase()); @@ -1113,14 +1114,14 @@ Peer.prototype.maybeTimeout = function maybeTimeout() { */ Peer.prototype.request = function request(type, timeout) { - var entry = this.responseMap[type]; + var entry = this.responseMap.get(type); if (this.destroyed) return; if (!entry) { entry = new RequestEntry(); - this.responseMap[type] = entry; + this.responseMap.set(type, entry); } entry.setTimeout(timeout); @@ -1136,12 +1137,12 @@ Peer.prototype.request = function request(type, timeout) { */ Peer.prototype.response = function response(type, payload) { - var entry = this.responseMap[type]; + var entry = this.responseMap.get(type); if (!entry) return; - delete this.responseMap[type]; + this.responseMap.remove(type); return entry; }; @@ -1880,7 +1881,7 @@ Peer.prototype.handleMempool = co(function* handleMempool(packet) { Peer.prototype.getBroadcasted = function getBroadcasted(item) { var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; - var entry = this.pool.invMap[item.hash]; + var entry = this.pool.invMap.get(item.hash); if (!entry) return; @@ -2420,7 +2421,7 @@ Peer.prototype.handleReject = co(function* handleReject(reject) { if (!reject.hash) return; - entry = this.pool.invMap[reject.hash]; + entry = this.pool.invMap.get(reject.hash); if (!entry) return; @@ -2598,7 +2599,7 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { return; } - if (this.compactBlocks[hash]) { + if (this.compactBlocks.has(hash)) { this.logger.debug( 'Peer sent us a duplicate compact block (%s).', this.hostname); @@ -2624,14 +2625,13 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { return; } - if (this.compactAmount >= 10) { + if (this.compactBlocks.size >= 10) { this.logger.warning('Compact block DoS attempt (%s).', this.hostname); this.destroy(); return; } - this.compactBlocks[hash] = block; - this.compactAmount++; + this.compactBlocks.set(hash, block); this.send(new packets.GetBlockTxnPacket(block.toRequest())); @@ -2695,17 +2695,15 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { var res = packet.response; - var block = this.compactBlocks[res.hash]; + var block = this.compactBlocks.get(res.hash); if (!block) { this.logger.debug('Peer sent unsolicited blocktxn (%s).', this.hostname); - this.compactBlocks = {}; - this.compactAmount = 0; + this.compactBlocks.reset(); return; } - delete this.compactBlocks[res.hash]; - this.compactAmount--; + this.compactBlocks.remove(res.hash); if (!block.fillMissing(res)) { this.increaseBan(100); diff --git a/lib/net/pool.js b/lib/net/pool.js index 959bc385..c60d2aca 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -29,6 +29,7 @@ var tcp = require('./tcp'); var dns = require('./dns'); var HostList = require('./hostlist'); var InvItem = require('../primitives/invitem'); +var Map = require('../utils/map'); var invTypes = InvItem.types; var VerifyError = errors.VerifyError; var VerifyResult = errors.VerifyResult; @@ -137,11 +138,11 @@ function Pool(options) { this.txFilter = null; // Requested objects. - this.requestMap = {}; - this.activeRequests = 0; + this.requestMap = new Map(); + this.queueMap = new Map(); // Currently broadcasted objects. - this.invMap = {}; + this.invMap = new Map(); this.invTimeout = 60000; this.scheduled = false; @@ -382,18 +383,17 @@ Pool.prototype._close = co(function* close() { this.stopSync(); - hashes = Object.keys(this.invMap); + hashes = this.invMap.keys(); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - item = this.invMap[hash]; + item = this.invMap.get(hash); item.resolve(); } this.peers.destroy(); - this.requestMap = {}; - this.activeRequests = 0; + this.requestMap.reset(); this.hosts.reset(); @@ -1012,8 +1012,8 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { (this.chain.getProgress() * 100).toFixed(2) + '%', this.chain.total, this.chain.orphanCount, - this.activeRequests, - 0, + this.requestMap.size, + this.queueMap.size, block.bits, this.peers.size(), this.chain.locker.pending, @@ -1520,12 +1520,22 @@ Pool.prototype.removePeer = function removePeer(peer) { this.peers.remove(peer); - hashes = Object.keys(peer.requestMap); + hashes = peer.requestMap.keys(); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; this.fulfill(peer, hash); } + + hashes = peer.blockQueue; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + assert(this.queueMap.has(hash)); + this.queueMap.remove(hash); + } + + peer.blockQueue.length = 0; }; /** @@ -1650,10 +1660,14 @@ Pool.prototype.getBlock = function getBlock(peer, hash) { if (peer.destroyed) throw new Error('Peer is destroyed (getdata).'); - if (this.requestMap[hash]) + if (this.requestMap.has(hash)) return; - peer.queueMap[hash] = true; + if (this.queueMap.has(hash)) + return; + + this.queueMap.insert(hash); + peer.blockQueue.push(hash); }; /** @@ -1668,7 +1682,11 @@ Pool.prototype.hasBlock = co(function* hasBlock(hash) { return true; // Check the pending requests. - if (this.requestMap[hash]) + if (this.requestMap.has(hash)) + return true; + + // Check the queued requests. + if (this.queueMap.has(hash)) return true; return false; @@ -1701,12 +1719,10 @@ Pool.prototype.getTX = function getTX(peer, hashes) { if (this.hasTX(hash)) continue; - assert(!this.requestMap[hash]); + assert(!this.requestMap.has(hash)); - this.requestMap[hash] = true; - peer.requestMap[hash] = true; - - this.activeRequests++; + this.requestMap.insert(hash); + peer.requestMap.insert(hash); items.push(hash); } @@ -1739,7 +1755,7 @@ Pool.prototype.hasTX = function hasTX(hash) { } // Check the pending requests. - if (this.requestMap[hash]) + if (this.requestMap.has(hash)) return true; return false; @@ -1771,39 +1787,39 @@ Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) { */ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) { - var queue = Object.keys(peer.queueMap); + var queue = peer.blockQueue; var hashes = []; var i, size, hash; + if (peer.destroyed) + return; + if (queue.length === 0) return; if (this.options.spv) { - if (this.activeRequests >= 2000) + if (this.requestMap.size >= 2000) return; size = Math.min(queue.length, 50000); } else { size = this.network.getBatchSize(this.chain.height); - if (this.activeRequests >= size) + if (this.requestMap.size >= size) return; } for (i = 0; i < queue.length; i++) { hash = queue[i]; - delete peer.queueMap[hash]; + assert(this.queueMap.has(hash)); - if (this.requestMap[hash]) - continue; + this.queueMap.remove(hash); - assert(!this.requestMap[hash]); + assert(!this.requestMap.has(hash)); - this.requestMap[hash] = true; - peer.requestMap[hash] = true; - - this.activeRequests++; + this.requestMap.insert(hash); + peer.requestMap.insert(hash); hashes.push(hash); @@ -1811,10 +1827,12 @@ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) { break; } + peer.blockQueue = queue.slice(hashes.length); + this.logger.debug( 'Requesting %d/%d blocks from peer with getdata (%s).', hashes.length, - this.activeRequests, + this.requestMap.size, peer.hostname); peer.getBlock(hashes); @@ -1828,15 +1846,13 @@ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) { */ Pool.prototype.fulfill = function fulfill(peer, hash) { - if (!peer.requestMap[hash]) + if (!peer.requestMap.has(hash)) return false; - delete peer.requestMap[hash]; + peer.requestMap.remove(hash); - assert(this.requestMap[hash]); - delete this.requestMap[hash]; - - this.activeRequests--; + assert(this.requestMap.has(hash)); + this.requestMap.remove(hash); return true; }; @@ -1849,7 +1865,7 @@ Pool.prototype.fulfill = function fulfill(peer, hash) { Pool.prototype.broadcast = function broadcast(msg) { var hash = msg.hash('hex'); - var item = this.invMap[hash]; + var item = this.invMap.get(hash); if (item) { item.refresh(); @@ -2148,9 +2164,9 @@ BroadcastItem.prototype.addJob = function addJob(resolve, reject) { BroadcastItem.prototype.start = function start() { assert(!this.timeout, 'Already started.'); - assert(!this.pool.invMap[this.hash], 'Already started.'); + assert(!this.pool.invMap.has(this.hash), 'Already started.'); - this.pool.invMap[this.hash] = this; + this.pool.invMap.set(this.hash, this); this.refresh(); @@ -2199,12 +2215,12 @@ BroadcastItem.prototype.announce = function announce() { BroadcastItem.prototype.cleanup = function cleanup() { assert(this.timeout != null, 'Already finished.'); - assert(this.pool.invMap[this.hash], 'Already finished.'); + assert(this.pool.invMap.has(this.hash), 'Already finished.'); clearTimeout(this.timeout); this.timeout = null; - delete this.pool.invMap[this.hash]; + this.pool.invMap.remove(this.hash); }; /**