diff --git a/lib/bcoin/bip150.js b/lib/bcoin/bip150.js index 25fa7a85..e99e42bc 100644 --- a/lib/bcoin/bip150.js +++ b/lib/bcoin/bip150.js @@ -122,6 +122,7 @@ BIP150.prototype.reply = function reply(payload) { sig = bcoin.ec.toDER(data); msg = this.hash(this.output.sid, type, this.peerIdentity); + result = bcoin.ec.verify(msg, sig, this.peerIdentity); if (!result) @@ -279,10 +280,15 @@ BIP150.prototype.wait = function wait(timeout, callback) { }; BIP150.prototype.getAddress = function getAddress() { + assert(this.peerIdentity, 'Cannot serialize address.'); + return BIP150.address(this.peerIdentity); +}; + +BIP150.address = function address(key) { var p = new bcoin.writer(); p.writeU8(0x0f); p.writeU16BE(0xff01); - p.writeBytes(utils.hash160(this.peerIdentity)); + p.writeBytes(utils.hash160(key)); p.writeChecksum(); return utils.toBase58(p.render()); }; diff --git a/lib/bcoin/http/rpc.js b/lib/bcoin/http/rpc.js index 4023205d..cf62a94c 100644 --- a/lib/bcoin/http/rpc.js +++ b/lib/bcoin/http/rpc.js @@ -414,7 +414,7 @@ RPC.prototype.disconnectnode = function disconnectnode(args, callback) { node = toString(args[0]); node = IP.normalize(node); - peer = this.pool.getPeer(node); + peer = this.pool.peers.get(node); if (peer) peer.destroy(); @@ -430,7 +430,7 @@ RPC.prototype.getaddednodeinfo = function getaddednodeinfo(args, callback) { if (args.length === 2) { host = toString(args[1]); - peer = this.pool.getPeer(host); + peer = this.pool.peers.get(host); if (!peer) return callback(new RPCError('Node has not been added.')); peers = [peer]; @@ -533,7 +533,7 @@ RPC.prototype.ping = function ping(args, callback) { }; RPC.prototype.setban = function setban(args, callback) { - var host, peer, ip; + var host, ip; if (args.help || args.length < 2 @@ -547,14 +547,10 @@ RPC.prototype.setban = function setban(args, callback) { switch (args[1]) { case 'add': - peer = this.pool.getPeer(ip); - if (peer) - peer.setMisbehavior(100); - else - this.pool.peers.misbehaving[ip] = utils.now(); + this.pool.ban(ip); break; case 'remove': - delete this.pool.peers.misbehaving[ip]; + this.pool.unban(ip); break; } @@ -568,11 +564,11 @@ RPC.prototype.listbanned = function listbanned(args, callback) { return callback(new RPCError('listbanned')); banned = []; - keys = Object.keys(this.pool.peers.misbehaving); + keys = Object.keys(this.pool.hosts.misbehaving); for (i = 0; i < keys.length; i++) { host = keys[i]; - time = this.pool.peers.misbehaving[host]; + time = this.pool.hosts.misbehaving[host]; banned.push({ address: host, banned_until: time + constants.BAN_TIME, @@ -588,8 +584,7 @@ RPC.prototype.clearbanned = function clearbanned(args, callback) { if (args.help || args.length !== 0) return callback(new RPCError('clearbanned')); - this.pool.peers.ignored = {}; - this.pool.peers.misbehaving = {}; + this.pool.hosts.clear(); callback(null, null); }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 2965a613..1ced7e83 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -74,6 +74,8 @@ function Peer(pool, options) { if (!options) options = {}; + assert(typeof options.type === 'number', 'Peer must have a type.'); + this.options = options; this.pool = pool; this.logger = pool.logger; @@ -119,7 +121,19 @@ function Peer(pool, options) { this.banScore = 0; - assert(typeof this.type === 'number', 'Peer must have a type.'); + this.pingTimer = null; + this.pingInterval = options.pingInterval || 120000; + + this.requestTimeout = options.requestTimeout || 10000; + this.requestMap = {}; + + this.queueBlock = []; + this.queueTX = []; + + this.uid = 0; + this.id = Peer.uid++; + + this.setMaxListeners(10000); if (options.socket) { this.socket = options.socket; @@ -156,27 +170,6 @@ function Peer(pool, options) { this.parser = new bcoin.protocol.parser(this); this.framer = new bcoin.protocol.framer(this); - this.requests = { - timeout: options.requestTimeout || 10000, - skip: {}, - map: {} - }; - - this.ping = { - timer: null, - interval: options.pingInterval || 120000 - }; - - this.queue = { - block: [], - tx: [] - }; - - this.uid = 0; - this.id = Peer.uid++; - - this.setMaxListeners(10000); - this._init(); } @@ -396,9 +389,9 @@ Peer.prototype._onAck = function _onAck(err) { this.ts = utils.now(); // Setup the ping interval. - this.ping.timer = setInterval(function() { + this.pingTimer = setInterval(function() { self.sendPing(); - }, this.ping.interval); + }, this.pingInterval); // Ask for headers-only. if (this.pool.options.headers) { @@ -427,7 +420,7 @@ Peer.prototype._onAck = function _onAck(err) { this.updateWatch(); // Announce our currently broadcasted items. - this.announce(this.pool.inv.items); + this.announce(this.pool.invItems); // Set a fee rate filter. if (this.pool.feeRate !== -1) @@ -496,7 +489,7 @@ Peer.prototype.createSocket = function createSocket(port, host) { Peer.prototype.announce = function announce(items) { var inv = []; var headers = []; - var i, item; + var i, item, entry; if (this.destroyed) return; @@ -507,9 +500,13 @@ Peer.prototype.announce = function announce(items) { for (i = 0; i < items.length; i++) { item = items[i]; + // Check the peer's bloom + // filter if they're using spv. if (!this.isWatched(item)) continue; + // Convert item to block headers + // for peers that request it. if (this.preferHeaders && item.toHeaders) { item = item.toHeaders(); if (this.invFilter.test(item.hash())) @@ -521,11 +518,23 @@ Peer.prototype.announce = function announce(items) { if (item.toInv) item = item.toInv(); + // Do not send txs to spv clients + // that have relay unset. if (!this.relay) { if (item.type === constants.inv.TX) continue; } + // Filter according to peer's fee filter. + if (this.feeRate !== -1 && this.mempool) { + if (item.type === constants.inv.TX) { + entry = this.mempool.getEntry(item.hash); + if (entry && entry.getRate() < this.feeRate) + continue; + } + } + + // Don't send if they already have it. if (this.invFilter.test(item.hash, 'hex')) continue; @@ -710,19 +719,19 @@ Peer.prototype.destroy = function destroy() { this.emit('close'); - if (this.ping.timer) { - clearInterval(this.ping.timer); - this.ping.timer = null; + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = null; } - keys = Object.keys(this.requests.map); + keys = Object.keys(this.requestMap); for (i = 0; i < keys.length; i++) { cmd = keys[i]; - queue = this.requests.map[cmd]; + queue = this.requestMap[cmd]; for (j = 0; j < queue.length; j++) - clearTimeout(queue[j].timer); + queue[j].destroy(); } }; @@ -771,37 +780,17 @@ Peer.prototype._error = function error(err, keep) { */ Peer.prototype.request = function request(cmd, callback) { - var self = this; var entry; if (this.destroyed) return utils.asyncify(callback)(new Error('Destroyed, sorry')); - entry = { - cmd: cmd, - callback: callback, - id: this.uid++, - ontimeout: function() { - var queue = self.requests.map[cmd]; + entry = new RequestEntry(this, cmd, callback); - if (!queue) - return; + if (!this.requestMap[cmd]) + this.requestMap[cmd] = []; - if (utils.binaryRemove(queue, entry, compare)) { - if (queue.length === 0) - delete self.requests.map[cmd]; - callback(new Error('Timed out: ' + cmd)); - } - }, - timer: null - }; - - entry.timer = setTimeout(entry.ontimeout, this.requests.timeout); - - if (!this.requests.map[cmd]) - this.requests.map[cmd] = []; - - this.requests.map[cmd].push(entry); + this.requestMap[cmd].push(entry); return entry; }; @@ -814,7 +803,7 @@ Peer.prototype.request = function request(cmd, callback) { */ Peer.prototype.response = function response(cmd, payload) { - var queue = this.requests.map[cmd]; + var queue = this.requestMap[cmd]; var entry, res; if (!queue) @@ -827,16 +816,15 @@ Peer.prototype.response = function response(cmd, payload) { res = entry.callback(null, payload, cmd); - if (res === this.requests.skip) + if (res === false) return false; queue.shift(); if (queue.length === 0) - delete this.requests.map[cmd]; + delete this.requestMap[cmd]; - clearTimeout(entry.timer); - entry.timer = null; + entry.destroy(); return true; }; @@ -1465,7 +1453,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { if (version.hasWitness()) this.haveWitness = true; - if (version.relay === false) + if (!version.relay) this.relay = false; // ACK @@ -1524,7 +1512,7 @@ Peer.prototype._handleMempool = function _handleMempool() { */ Peer.prototype._getItem = function _getItem(item, callback) { - var entry = this.pool.inv.map[item.hash]; + var entry = this.pool.invMap[item.hash]; if (entry) { this.logger.debug( @@ -1610,9 +1598,7 @@ Peer.prototype._handleGetData = function _handleGetData(items) { return next(); } - // We should technically calculate this in - // the `mempool` handler, but it would be - // too slow. + // Fallback in case fee filter fails in `announce`. if (self.feeRate !== -1) { if (entry.getRate() < self.feeRate) return next(); @@ -1725,7 +1711,7 @@ Peer.prototype._handleAddr = function _handleAddr(addrs) { this.logger.info( 'Received %d addrs (hosts=%d, peers=%d) (%s).', addrs.length, - this.pool.hosts.length, + this.pool.hosts.items.length, this.pool.peers.all.length, this.hostname); @@ -1801,8 +1787,8 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() { this.sentAddr = true; - for (i = 0; i < this.pool.hosts.length; i++) { - host = this.pool.hosts[i]; + for (i = 0; i < this.pool.hosts.items.length; i++) { + host = this.pool.hosts.items[i]; if (!host.isIP()) continue; @@ -1900,7 +1886,7 @@ Peer.prototype._handleReject = function _handleReject(payload) { return; hash = payload.data; - entry = this.pool.inv.map[hash]; + entry = this.pool.invMap[hash]; if (!entry) return; @@ -2317,7 +2303,7 @@ Peer.prototype.sendCompact = function sendCompact() { */ Peer.prototype.isMisbehaving = function isMisbehaving() { - return this.pool.isMisbehaving(this.host); + return this.pool.hosts.isMisbehaving(this.host); }; /** @@ -2326,7 +2312,7 @@ Peer.prototype.isMisbehaving = function isMisbehaving() { */ Peer.prototype.isIgnored = function isIgnored() { - return this.pool.isIgnored(this.host); + return this.pool.hosts.isIgnored(this.host); }; /** @@ -2490,6 +2476,40 @@ Peer.prototype.inspect = function inspect() { + '>'; }; +/** + * RequestEntry + * @constructor + */ + +function RequestEntry(peer, cmd, callback) { + this.peer = peer; + this.cmd = cmd; + this.callback = callback; + this.id = peer.uid++; + this.onTimeout = this._onTimeout.bind(this); + this.timeout = setTimeout(this.onTimeout, this.peer.requestTimeout); +} + +RequestEntry.prototype._onTimeout = function _onTimeout() { + var queue = this.peer.requestMap[this.cmd]; + + if (!queue) + return; + + if (utils.binaryRemove(queue, this, compare)) { + if (queue.length === 0) + delete this.peer.requestMap[this.cmd]; + this.callback(new Error('Timed out: ' + this.cmd)); + } +}; + +RequestEntry.prototype.destroy = function destroy() { + if (this.timeout != null) { + clearTimeout(this.timeout); + this.timeout = null; + } +}; + /* * Helpers */ diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 60f7082a..266e3e16 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -34,8 +34,6 @@ var InvItem = bcoin.packets.InvItem; * @param {Number?} [options.feeRate] - Fee filter rate. * @param {Number?} [options.loadTimeout=120000] - Sync timeout before * finding a new loader peer. - * @param {Number?} [options.loadInterval=20000] - Timeout before attempting to - * send another getblocks request. * @param {Number?} [options.requestTimeout=120000] - Timeout for in-flight * blocks. * @param {Number?} [options.invTimeout=60000] - Timeout for broadcasted @@ -80,179 +78,163 @@ var InvItem = bcoin.packets.InvItem; */ function Pool(options) { - var i, seeds, hostname, seed; - if (!(this instanceof Pool)) return new Pool(options); AsyncObject.call(this); - if (!options) - options = {}; + assert(options && options.chain, 'Pool requires a blockchain.'); this.options = options; this.chain = options.chain; this.logger = options.logger || this.chain.logger; this.mempool = options.mempool; - - assert(this.chain, 'Pool requires a blockchain.'); - this.network = this.chain.network; - if (options.relay == null) - options.relay = !options.spv; - - if (options.headers == null) - options.headers = options.spv; - - seeds = options.seeds || this.network.seeds; - - if (options.preferredSeed) { - seeds = seeds.slice(); - seeds.unshift(options.preferredSeed); - } - - this.seeds = []; - this.hosts = []; - this.hostMap = {}; - - for (i = 0; i < seeds.length; i++) { - hostname = seeds[i]; - seed = NetworkAddress.fromHostname(hostname, this.network); - this.seeds.push(seed); - } - this.services = constants.LOCAL_SERVICES; + this.port = this.network.port; + + this.server = null; + this.maxPeers = 8; + this.maxLeeches = 8; + this.connected = false; + this.uid = 0; + this.createServer = null; + this.locker = new bcoin.locker(this); + this.proxyServer = null; + this.auth = null; + this.identityKey = null; + + this.syncing = false; + this.synced = false; + + this.loadTimeout = 120000; + + this.feeRate = -1; + + this.address = new NetworkAddress(); + + this.peers = new PeerList(this); + this.hosts = new HostList(this); + + this.blockType = constants.inv.BLOCK; + this.txType = constants.inv.TX; + + this.localNonce = utils.nonce(); + + this.spvFilter = null; + this.txFilter = null; + this.rejectsFilter = new bcoin.bloom.rolling(120000, 0.000001); + this.rejectsTip = null; + + // Requested objects. + this.requestMap = {}; + this.requestTimeout = 20 * 60000; + this.activeRequest = 0; + this.activeBlocks = 0; + this.activeTX = 0; + + // Currently broadcasted objects. + this.invMap = {}; + this.invItems = []; + this.invTimeout = 60000; + + this.scheduled = false; + this.pendingWatch = null; + this.timer = null; + this.interval = null; + + this._initOptions(); + this._init(); +}; + +utils.inherits(Pool, AsyncObject); + +/** + * Initialize options. + * @private + */ + +Pool.prototype._initOptions = function _initOptions() { + if (this.options.relay == null) + this.options.relay = !this.options.spv; + + if (this.options.headers == null) + this.options.headers = this.options.spv; if (!this.options.witness) this.services &= ~constants.services.WITNESS; - this.port = this.options.port != null - ? this.options.port - : this.network.port; + if (this.options.port != null) + this.port = this.options.port; - this.address = new NetworkAddress({ - ts: utils.now(), - services: this.services, - host: '0.0.0.0', - port: this.port - }); + this.address.ts = utils.now(); + this.address.services = this.services; + this.address.port = this.port; - this.server = null; - this.maxPeers = options.maxPeers || 8; - this.maxLeeches = options.maxLeeches || 8; - this.connected = false; - this.uid = 0; - this._createServer = options.createServer; - this.locker = new bcoin.locker(this); - this.proxyServer = options.proxyServer; - this.auth = null; - this.identityKey = null; + if (this.options.maxPeers != null) + this.maxPeers = this.options.maxPeers; + + if (this.options.maxLeeches != null) + this.maxLeeches = this.options.maxLeeches; + + this.createServer = this.options.createServer; + this.proxyServer = this.options.proxyServer; if (this.options.bip150) { this.options.bip151 = true; this.auth = new bcoin.bip150.AuthDB(); - if (options.authPeers) - this.auth.setAuthorized(options.authPeers); + if (this.options.authPeers) + this.auth.setAuthorized(this.options.authPeers); - if (options.knownPeers) - this.auth.setKnown(options.knownPeers); + if (this.options.knownPeers) + this.auth.setKnown(this.options.knownPeers); - this.identityKey = options.identityKey || bcoin.ec.generatePrivateKey(); + this.identityKey = this.options.identityKey || bcoin.ec.generatePrivateKey(); assert(Buffer.isBuffer(this.identityKey), 'Identity key must be a buffer.'); assert(bcoin.ec.privateKeyVerify(this.identityKey), 'Invalid identity key.'); } - this.syncing = false; - this.synced = false; - this._scheduled = false; - this._pendingWatch = null; - this._timer = null; - this._interval = null; + if (this.options.loadTimeout != null) + this.loadTimeout = this.options.loadTimeout; - this.load = { - timeout: options.loadTimeout || 120000, - interval: options.loadInterval || 20000 - }; + if (this.options.feeRate != null) + this.feeRate = this.options.feeRate; - this.requestTimeout = options.requestTimeout || 20 * 60000; + if (this.options.seeds) + this.hosts.setSeeds(this.options.seeds); - this.feeRate = options.feeRate != null ? options.feeRate : -1; + if (this.options.preferredSeed) + this.hosts.addSeed(this.options.preferredSeed); - this.spvFilter = options.spv - ? bcoin.bloom.fromRate(10000, 0.001, constants.bloom.NONE) - : null; - - this.localNonce = utils.nonce(); - - this.peers = { - // Peers that are loading blocks themselves - regular: [], - // Peers that are still connecting - pending: [], - // Peers that connected to us - leeches: [], - // Peers that are loading block ids - load: null, - // All peers - all: [], - // Misbehaving hosts - misbehaving: {}, - // Ignored hosts - ignored: {}, - // Map of hosts - map: {} - }; - - this.block = { - versionHeight: 0, - bestHash: null, - type: !options.spv - ? constants.inv.BLOCK - : constants.inv.FILTERED_BLOCK - }; - - this.tx = { - filter: !this.mempool - ? new bcoin.bloom.rolling(50000, 0.000001) - : null, - type: constants.inv.TX - }; - - this.rejects = new bcoin.bloom.rolling(120000, 0.000001); + if (this.options.spv) + this.blockType = constants.inv.FILTERED_BLOCK; if (this.options.witness) { - this.block.type |= constants.WITNESS_MASK; - this.tx.type |= constants.WITNESS_MASK; + this.blockType |= constants.WITNESS_MASK; + this.txType |= constants.WITNESS_MASK; if (this.options.compact) { this.logger.warning('Disabling compact blocks due to segwit.'); this.options.compact = false; } } - this.request = { - map: {}, - active: 0, - activeBlocks: 0, - activeTX: 0 - }; + if (this.options.spv) + this.spvFilter = bcoin.bloom.fromRate(10000, 0.001, constants.bloom.NONE); - // Currently broadcasted objects - this.inv = { - items: [], - map: {}, - timeout: options.invTimeout || 60000, - interval: options.invInterval || 3000 - }; + if (!this.options.mempool) + this.txFilter = new bcoin.bloom.rolling(50000, 0.000001); - this._init(); + if (this.options.requestTimeout != null) + this.requestTimeout = this.options.requestTimeout; + + if (this.options.invTimeout != null) + this.invTimeout = this.options.invTimeout; }; -utils.inherits(Pool, AsyncObject); - /** * Initialize the pool. * @private @@ -263,7 +245,7 @@ Pool.prototype._init = function _init() { if (this.mempool) { this.mempool.on('bad orphan', function(tx) { - self.rejects.add(tx.hash()); + self.rejectsFilter.add(tx.hash()); }); } @@ -292,8 +274,8 @@ Pool.prototype._init = function _init() { }); this.chain.on('full', function() { - self._stopTimer(); - self._stopInterval(); + self.stopTimer(); + self.stopInterval(); if (!self.synced) { // Ask loader for a mempool snapshot. @@ -330,6 +312,8 @@ Pool.prototype._lock = function _lock(func, args, force) { Pool.prototype._open = function _open(callback) { var self = this; + var key; + this.getIP(function(err, ip) { if (err) self.logger.error(err); @@ -353,8 +337,9 @@ Pool.prototype._open = function _open(callback) { self.logger.info('Pool loaded (maxpeers=%d).', self.maxPeers); if (self.identityKey) { - self.logger.info('Identity public key: %s', - bcoin.ec.publicKeyCreate(self.identityKey, true).toString('hex')); + key = bcoin.ec.publicKeyCreate(self.identityKey, true); + self.logger.info('Identity public key: %s.', key.toString('hex')); + self.logger.info('Identity address: %s.', bcoin.bip150.address(key)); } if (!self.options.listen) @@ -372,39 +357,26 @@ Pool.prototype._open = function _open(callback) { */ Pool.prototype._close = function close(callback) { - var i, items, peers, hashes, hash; + var i, items, hashes, hash; this.stopSync(); - items = this.inv.items.slice(); + items = this.invItems.slice(); for (i = 0; i < items.length; i++) items[i].finish(); - hashes = Object.keys(this.request.map); + hashes = Object.keys(this.requestMap); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - this.request.map[hash].finish(new Error('Pool closed.')); + this.requestMap[hash].finish(new Error('Pool closed.')); } - if (this.peers.load) - this.peers.load.destroy(); + this.peers.destroy(); - peers = this.peers.regular.slice(); - - for (i = 0; i < peers.length; i++) - peers[i].destroy(); - - peers = this.peers.pending.slice(); - - for (i = 0; i < peers.length; i++) - peers[i].destroy(); - - peers = this.peers.leeches.slice(); - - for (i = 0; i < peers.length; i++) - peers[i].destroy(); + this.stopInterval(); + this.stopTimer(); this.unlisten(callback); }; @@ -415,7 +387,6 @@ Pool.prototype._close = function close(callback) { Pool.prototype.connect = function connect() { var self = this; - var i; assert(this.loaded, 'Pool is not loaded.'); @@ -442,12 +413,9 @@ Pool.prototype.connect = function connect() { }); } - assert(this.seeds.length !== 0, 'No seeds available.'); + assert(this.hosts.seeds.length !== 0, 'No seeds available.'); - this._addLoader(); - - for (i = 0; i < this.maxPeers - 1; i++) - this._addPeer(); + this.addLoader(); this.connected = true; }; @@ -465,8 +433,8 @@ Pool.prototype.listen = function listen(callback) { assert(!this.server, 'Server already listening.'); - if (this._createServer) { - this.server = this._createServer(); + if (this.createServer) { + this.server = this.createServer(); } else { if (utils.isBrowser) return utils.nextTick(callback); @@ -475,38 +443,7 @@ Pool.prototype.listen = function listen(callback) { } this.server.on('connection', function(socket) { - var hostname, host; - - if (!socket.remoteAddress) { - self.logger.debug('Ignoring disconnected leech.'); - socket.destroy(); - return; - } - - host = IP.normalize(socket.remoteAddress); - - if (self.peers.leeches.length >= self.maxLeeches) { - hostname = IP.hostname(host, socket.remotePort); - self.logger.debug('Ignoring leech: too many leeches (%s).', hostname); - socket.destroy(); - return; - } - - if (self.isMisbehaving(host)) { - hostname = IP.hostname(host, socket.remotePort); - self.logger.debug('Ignoring misbehaving leech (%s).', hostname); - socket.destroy(); - return; - } - - if (self.isIgnored(host)) { - hostname = IP.hostname(host, socket.remotePort); - self.logger.debug('Ignoring leech (%s).', hostname); - socket.destroy(); - return; - } - - self._addLeech(socket); + self._handleLeech(socket); }); this.server.on('listening', function() { @@ -537,21 +474,61 @@ Pool.prototype.unlisten = function unlisten(callback) { this.server = null; }; +/** + * Handle incoming connection. + * @private + * @param {net.Socket} socket + */ + +Pool.prototype._handleLeech = function _handleLeech(socket) { + var hostname, host; + + if (!socket.remoteAddress) { + this.logger.debug('Ignoring disconnected leech.'); + socket.destroy(); + return; + } + + host = IP.normalize(socket.remoteAddress); + + if (this.peers.leeches.length >= this.maxLeeches) { + hostname = IP.hostname(host, socket.remotePort); + this.logger.debug('Ignoring leech: too many leeches (%s).', hostname); + socket.destroy(); + return; + } + + if (this.hosts.isMisbehaving(host)) { + hostname = IP.hostname(host, socket.remotePort); + this.logger.debug('Ignoring misbehaving leech (%s).', hostname); + socket.destroy(); + return; + } + + if (this.hosts.isIgnored(host)) { + hostname = IP.hostname(host, socket.remotePort); + this.logger.debug('Ignoring leech (%s).', hostname); + socket.destroy(); + return; + } + + this.addLeech(socket); +}; + /** * Start timer to detect stalling. * @private */ -Pool.prototype._startTimer = function _startTimer() { +Pool.prototype.startTimer = function startTimer() { var self = this; - this._stopTimer(); + this.stopTimer(); function destroy() { if (!self.syncing) return; - // Chain is full and up-to-date if (self.chain.isFull()) return; @@ -561,7 +538,7 @@ Pool.prototype._startTimer = function _startTimer() { } } - this._timer = setTimeout(destroy, this.load.timeout); + this.timer = setTimeout(destroy, this.loadTimeout); }; /** @@ -569,12 +546,11 @@ Pool.prototype._startTimer = function _startTimer() { * @private */ -Pool.prototype._stopTimer = function _stopTimer() { - if (this._timer == null) - return; - - clearTimeout(this._timer); - this._timer = null; +Pool.prototype.stopTimer = function stopTimer() { + if (this.timer != null) { + clearTimeout(this.timer); + this.timer = null; + } }; /** @@ -585,26 +561,25 @@ Pool.prototype._stopTimer = function _stopTimer() { * @private */ -Pool.prototype._startInterval = function _startInterval() { +Pool.prototype.startInterval = function startInterval() { var self = this; - this._stopInterval(); + this.stopInterval(); function load() { if (!self.syncing) return; - // Chain is full and up-to-date if (self.chain.isFull()) - return; + return self.stopInterval(); if (self.chain.isBusy()) - return self._startTimer(); + return self.startTimer(); self.logger.warning('Stalling.'); } - this._interval = setInterval(load, this.load.interval); + this.interval = setInterval(load, this.loadTimeout / 6 | 0); }; /** @@ -612,12 +587,11 @@ Pool.prototype._startInterval = function _startInterval() { * @private */ -Pool.prototype._stopInterval = function _stopInterval() { - if (this._interval == null) - return; - - clearInterval(this._interval); - this._interval = null; +Pool.prototype.stopInterval = function stopInterval() { + if (this.interval != null) { + clearInterval(this.interval); + this.interval = null; + } }; /** @@ -626,26 +600,27 @@ Pool.prototype._stopInterval = function _stopInterval() { * @private */ -Pool.prototype._addLoader = function _addLoader() { +Pool.prototype.addLoader = function addLoader() { var self = this; var peer; if (!this.loaded) return; - if (this.peers.load) + if (this.peers.load) { + this.fillPeers(); return; + } - peer = this._createPeer({ + peer = this.createPeer({ host: this.getLoaderHost(), type: bcoin.peer.types.LOADER }); this.logger.info('Added loader peer (%s).', peer.hostname); - this.peers.load = peer; - this.peers.all.push(peer); - this.peers.map[peer.host] = peer; + this.peers.addLoader(peer); + this.fillPeers(); utils.nextTick(function() { self.emit('loader', peer); @@ -659,13 +634,13 @@ Pool.prototype._addLoader = function _addLoader() { Pool.prototype.startSync = function startSync() { this.syncing = true; - this._startInterval(); - this._startTimer(); + this.startInterval(); + this.startTimer(); this.connect(); if (!this.peers.load) { - this._addLoader(); + this.addLoader(); return; } @@ -702,8 +677,8 @@ Pool.prototype.stopSync = function stopSync() { if (!this.loaded) return; - this._stopInterval(); - this._stopTimer(); + this.stopInterval(); + this.stopTimer(); if (this.peers.load) this.peers.load.syncSent = false; @@ -742,9 +717,9 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) if (peer === this.peers.load) { // Reset interval to avoid stall behavior. - this._startInterval(); + this.startInterval(); // Reset timeout to avoid killing the loader. - this._startTimer(); + this.startTimer(); } utils.forEachSerial(headers, function(header, next) { @@ -761,13 +736,13 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) if (!header.verify(ret)) { peer.reject(header, 'invalid', ret.reason, 100); - self.rejects.add(header.hash()); + self.rejectsFilter.add(header.hash()); return next(new Error('Invalid header.')); } last = hash; - self.getData(peer, self.block.type, hash, next); + self.getData(peer, self.blockType, hash, next); }, function(err) { if (err) return callback(err); @@ -811,9 +786,9 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { if (peer === this.peers.load) { // Reset interval to avoid stall behavior. - this._startInterval(); + this.startInterval(); // Reset timeout to avoid killing the loader. - this._startTimer(); + this.startTimer(); } utils.forEachSerial(hashes, function(hash, next, i) { @@ -829,7 +804,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { return peer.resolveOrphan(null, hash, next); } - self.getData(peer, self.block.type, hash, function(err, exists) { + self.getData(peer, self.blockType, hash, function(err, exists) { if (err) return next(err); @@ -842,7 +817,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { // the hashContinue on the remote node). if (exists && i === hashes.length - 1) { // Make sure we _actually_ have this block. - if (!self.request.map[hash]) { + if (!self.requestMap[hash]) { self.logger.debug('Received existing hash (%s).', peer.hostname); return peer.getBlocks(hash, null, next); } @@ -945,7 +920,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { }); } - self.rejects.add(block.hash()); + self.rejectsFilter.add(block.hash()); self.scheduleRequests(peer); return callback(err); } @@ -966,8 +941,8 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { (self.chain.getProgress() * 100).toFixed(2) + '%', self.chain.total, self.chain.orphan.count, - self.request.activeBlocks, - peer.queue.block.length, + self.activeBlocks, + peer.queueBlock.length, block.bits, self.peers.all.length, self.chain.locker.pending.length, @@ -1024,21 +999,23 @@ Pool.prototype.sendAlert = function sendAlert(alert) { * @returns {Peer} */ -Pool.prototype._createPeer = function _createPeer(options) { +Pool.prototype.createPeer = function createPeer(options) { var self = this; var peer = new bcoin.peer(this, options); peer.once('close', function() { - self._removePeer(peer); + self.removePeer(peer); if (!self.loaded) return; - if (peer.type !== bcoin.peer.types.LOADER) + if (peer.type !== bcoin.peer.types.LOADER) { + self.fillPeers(); return; + } - self._stopInterval(); - self._stopTimer(); + self.stopInterval(); + self.stopTimer(); if (self.peers.regular.length === 0) { self.logger.warning('%s %s %s', @@ -1046,12 +1023,12 @@ Pool.prototype._createPeer = function _createPeer(options) { 'Do you have a network connection?', 'Retrying in 5 seconds.'); setTimeout(function() { - self._addLoader(); + self.addLoader(); }, 5000); return; } - self._addLoader(); + self.addLoader(); }); peer.on('merkleblock', function(block) { @@ -1068,8 +1045,8 @@ Pool.prototype._createPeer = function _createPeer(options) { return self.emit('error', err); if (peer.type === bcoin.peer.types.LOADER) { - self._startInterval(); - self._startTimer(); + self.startInterval(); + self.startTimer(); } }); }); @@ -1088,8 +1065,8 @@ Pool.prototype._createPeer = function _createPeer(options) { return self.emit('error', err); if (peer.type === bcoin.peer.types.LOADER) { - self._startInterval(); - self._startTimer(); + self.startInterval(); + self.startTimer(); } }); }); @@ -1099,17 +1076,23 @@ Pool.prototype._createPeer = function _createPeer(options) { }); peer.on('reject', function(payload) { - var data = payload.data - ? utils.revHex(payload.data) - : null; + var data, code; + + if (payload.data) + data = utils.revHex(payload.data); + + code = constants.rejectByVal[payload.code]; + + if (code) + code = code.toLowerCase(); self.logger.warning( 'Received reject (%s): msg=%s code=%s reason=%s data=%s.', peer.hostname, payload.message, - constants.rejectByVal[payload.code] || payload.code, + code || payload.code, payload.reason, - data); + data || null); self.emit('reject', payload, peer); }); @@ -1123,7 +1106,7 @@ Pool.prototype._createPeer = function _createPeer(options) { for (i = 0; i < items.length; i++) { item = items[i]; - req = self.request.map[item.hash]; + req = self.requestMap[item.hash]; if (req && req.peer === peer) req.finish(new Error('Not found.')); } @@ -1158,11 +1141,12 @@ Pool.prototype._createPeer = function _createPeer(options) { continue; } - if (self.addHost(host)) + if (self.hosts.add(host)) self.emit('host', host, peer); } self.emit('addr', hosts, peer); + self.fillPeers(); }); peer.on('txs', function(txs) { @@ -1175,14 +1159,11 @@ Pool.prototype._createPeer = function _createPeer(options) { for (i = 0; i < txs.length; i++) { hash = txs[i]; - self.getData(peer, self.tx.type, hash); + self.getData(peer, self.txType, hash); } }); peer.on('version', function(version) { - if (version.height > self.block.versionHeight) - self.block.versionHeight = version.height; - self.logger.info( 'Received version (%s): version=%d height=%d services=%s agent=%s', peer.hostname, @@ -1229,7 +1210,7 @@ Pool.prototype._createPeer = function _createPeer(options) { Pool.prototype._handleAlert = function _handleAlert(alert, peer) { var now = bcoin.now(); - if (!this.rejects.added(alert.hash())) + if (!this.rejectsFilter.added(alert.hash())) return; if (!alert.verify(this.network.alertKey)) { @@ -1275,12 +1256,12 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { peer.invFilter.add(tx.hash()); if (!this.mempool) - this.tx.filter.add(tx.hash()); + this.txFilter.add(tx.hash()); this.logger.warning('Peer sent unrequested tx: %s (%s).', tx.rhash, peer.hostname); - if (this.rejects.test(tx.hash())) { + if (this.rejectsFilter.test(tx.hash())) { return callback(new VerifyError(tx, 'alreadyknown', 'txn-already-known', @@ -1298,7 +1279,7 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { if (err.type === 'VerifyError') { if (err.score !== -1) peer.reject(tx, err.code, err.reason, err.score); - self.rejects.add(tx.hash()); + self.rejectsFilter.add(tx.hash()); return callback(err); } return callback(err); @@ -1316,23 +1297,21 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { * @param {net.Socket} socket */ -Pool.prototype._addLeech = function _addLeech(socket) { +Pool.prototype.addLeech = function addLeech(socket) { var self = this; var peer; if (!this.loaded) return socket.destroy(); - peer = this._createPeer({ + peer = this.createPeer({ socket: socket, type: bcoin.peer.types.LEECH }); this.logger.info('Added leech peer (%s).', peer.hostname); - this.peers.leeches.push(peer); - this.peers.all.push(peer); - this.peers.map[peer.host] = peer; + this.peers.addLeech(peer); utils.nextTick(function() { self.emit('leech', peer); @@ -1345,39 +1324,34 @@ Pool.prototype._addLeech = function _addLeech(socket) { * @private */ -Pool.prototype._addPeer = function _addPeer() { +Pool.prototype.addPeer = function addPeer() { var self = this; var peer, host; if (!this.loaded) return; - if (this.peers.regular.length + this.peers.pending.length >= this.maxPeers - 1) + if (this.peers.isFull()) return; - host = this.getHost(); - - if (!host) { - setTimeout(this._addPeer.bind(this), 5000); + // Hang back if we don't have a loader peer yet. + if (!this.peers.load) return; - } - peer = this._createPeer({ + host = this.hosts.getHost(); + + if (!host) + return; + + peer = this.createPeer({ host: host, type: bcoin.peer.types.REGULAR }); - this.peers.pending.push(peer); - this.peers.all.push(peer); - this.peers.map[peer.host] = peer; + this.peers.addPending(peer); peer.once('ack', function() { - if (utils.binaryRemove(self.peers.pending, peer, compare)) - utils.binaryInsert(self.peers.regular, peer, compare); - }); - - peer.once('close', function() { - self._addPeer(); + self.peers.promote(peer); }); utils.nextTick(function() { @@ -1385,32 +1359,32 @@ Pool.prototype._addPeer = function _addPeer() { }); }; +/** + * Attempt to refill the pool with peers. + * @private + */ + +Pool.prototype.fillPeers = function fillPeers() { + for (var i = 0; i < this.maxPeers - 1; i++) + this.addPeer(); +}; + /** * Remove a peer from any list. Drop all load requests. * @private * @param {Peer} peer */ -Pool.prototype._removePeer = function _removePeer(peer) { +Pool.prototype.removePeer = function removePeer(peer) { var i, hashes, hash, item; - utils.binaryRemove(this.peers.pending, peer, compare); - utils.binaryRemove(this.peers.regular, peer, compare); - utils.binaryRemove(this.peers.leeches, peer, compare); - utils.binaryRemove(this.peers.all, peer, compare); + this.peers.remove(peer); - delete this.peers.map[peer.host]; - - if (this.peers.load === peer) { - this.logger.info('Removed loader peer (%s).', peer.hostname); - this.peers.load = null; - } - - hashes = Object.keys(this.request.map); + hashes = Object.keys(this.requestMap); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - item = this.request.map[hash]; + item = this.requestMap[hash]; if (item.peer === peer) item.finish(new Error('Peer closed.')); } @@ -1448,11 +1422,11 @@ Pool.prototype.updateWatch = function updateWatch() { var self = this; var i; - if (this._pendingWatch != null) + if (this.pendingWatch != null) return; - this._pendingWatch = setTimeout(function() { - self._pendingWatch = null; + this.pendingWatch = setTimeout(function() { + self.pendingWatch = null; if (self.peers.load) self.peers.load.updateWatch(); @@ -1481,27 +1455,16 @@ Pool.prototype.watchAddress = function watchAddress(address) { * @param {Function} callback */ -Pool.prototype.getData = function getData(peer, type, hash, options, callback) { +Pool.prototype.getData = function getData(peer, type, hash, callback) { var self = this; var item; - if (typeof options === 'function') { - callback = options; - options = null; - } - callback = utils.ensure(callback); if (!this.loaded) return callback(); - if (options == null) - options = {}; - - if (typeof options === 'boolean') - options = { force: options }; - - this.has(type, hash, options.force, function(err, exists) { + this.has(type, hash, function(err, exists) { if (err) return callback(err); @@ -1510,29 +1473,26 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { item = new LoadRequest(self, peer, type, hash); - if (options.noQueue) - return callback(null, false); - - if (type === self.tx.type) { - if (peer.queue.tx.length === 0) { + if (type === self.txType) { + if (peer.queueTX.length === 0) { utils.nextTick(function() { self.logger.debug( 'Requesting %d/%d txs from peer with getdata (%s).', - peer.queue.tx.length, - self.request.activeTX, + peer.queueTX.length, + self.activeTX, peer.hostname); - peer.getData(peer.queue.tx); - peer.queue.tx.length = 0; + peer.getData(peer.queueTX); + peer.queueTX.length = 0; }); } - peer.queue.tx.push(item.start()); + peer.queueTX.push(item.start()); return callback(null, false); } - peer.queue.block.push(item); + peer.queueBlock.push(item); callback(null, false); }); @@ -1545,15 +1505,10 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { * @param {Function} callback - Returns [Error, Boolean]. */ -Pool.prototype.has = function has(type, hash, force, callback) { +Pool.prototype.has = function has(type, hash, callback) { var self = this; - if (!callback) { - callback = force; - force = false; - } - - function check(err, exists) { + this.exists(type, hash, function(err, exists) { if (err) return callback(err); @@ -1561,46 +1516,50 @@ Pool.prototype.has = function has(type, hash, force, callback) { return callback(null, true); // Check the pending requests. - if (self.request.map[hash]) + if (self.requestMap[hash]) return callback(null, true); // We need to reset the rejects filter periodically. // There may be a locktime in a TX that is now valid. - if (self.rejects.tip !== self.chain.tip.hash) { - self.rejects.tip = self.chain.tip.hash; - self.rejects.reset(); + if (self.rejectsTip !== self.chain.tip.hash) { + self.rejectsTip = self.chain.tip.hash; + self.rejectsFilter.reset(); } else { // If we recently rejected this item. Ignore. - if (self.rejects.test(hash, 'hex')) { + if (self.rejectsFilter.test(hash, 'hex')) { self.logger.spam('Peer sent a known reject: %s.', utils.revHex(hash)); return callback(null, true); } } return callback(null, false); - } + }); +}; - if (force) { - check = utils.asyncify(check); - return check(null, false); - } +/** + * Test whether the chain or mempool has seen an item. + * @param {InvType} type + * @param {Hash} hash + * @param {Function} callback - Returns [Error, Boolean]. + */ - if (type === this.tx.type) { +Pool.prototype.exists = function exists(type, hash, callback) { + if (type === this.txType) { // Check the TX filter if // we don't have a mempool. if (!this.mempool) { - check = utils.asyncify(check); - if (this.tx.filter.added(hash, 'hex')) - return check(null, false); - return check(null, true); + callback = utils.asyncify(callback); + if (this.txFilter.added(hash, 'hex')) + return callback(null, false); + return callback(null, true); } // Check the mempool. - return check(null, this.mempool.has(hash)); + return callback(null, this.mempool.has(hash)); } // Check the chain. - return this.chain.has(hash, check); + this.chain.has(hash, callback); }; /** @@ -1611,15 +1570,15 @@ Pool.prototype.has = function has(type, hash, force, callback) { Pool.prototype.scheduleRequests = function scheduleRequests(peer) { var self = this; - if (this._scheduled) + if (this.scheduled) return; - this._scheduled = true; + this.scheduled = true; this.chain.onDrain(function() { utils.nextTick(function() { - self._sendRequests(peer); - self._scheduled = false; + self.sendRequests(peer); + self.scheduled = false; }); }); }; @@ -1630,28 +1589,28 @@ Pool.prototype.scheduleRequests = function scheduleRequests(peer) { * @param {Peer} peer */ -Pool.prototype._sendRequests = function _sendRequests(peer) { +Pool.prototype.sendRequests = function sendRequests(peer) { var i, size, items; if (this.chain.isBusy()) return; - if (peer.queue.block.length === 0) + if (peer.queueBlock.length === 0) return; if (this.options.spv) { - if (this.request.activeBlocks >= 500) + if (this.activeBlocks >= 500) return; - items = peer.queue.block.slice(); - peer.queue.block.length = 0; + items = peer.queueBlock.slice(); + peer.queueBlock.length = 0; } else { size = this.network.getBatchSize(this.chain.height); - if (this.request.activeBlocks >= size) + if (this.activeBlocks >= size) return; - items = peer.queue.block.slice(0, size); - peer.queue.block = peer.queue.block.slice(size); + items = peer.queueBlock.slice(0, size); + peer.queueBlock = peer.queueBlock.slice(size); } for (i = 0; i < items.length; i++) @@ -1660,7 +1619,7 @@ Pool.prototype._sendRequests = function _sendRequests(peer) { this.logger.debug( 'Requesting %d/%d blocks from peer with getdata (%s).', items.length, - this.request.activeBlocks, + this.activeBlocks, peer.hostname); peer.getData(items); @@ -1672,13 +1631,10 @@ Pool.prototype._sendRequests = function _sendRequests(peer) { * @returns {LoadRequest|null} */ -Pool.prototype.fulfill = function fulfill(hash) { - var item; +Pool.prototype.fulfill = function fulfill(data) { + var hash = data.hash('hex'); + var item = this.requestMap[hash]; - if (hash.hash) - hash = hash.hash('hex'); - - item = this.request.map[hash]; if (!item) return false; @@ -1702,7 +1658,7 @@ Pool.prototype.broadcast = function broadcast(msg, callback) { if (msg.toInv) hash = msg.toInv().hash; - item = this.inv.map[hash]; + item = this.invMap[hash]; if (item) { item.refresh(); @@ -1753,16 +1709,6 @@ Pool.prototype.setFeeRate = function setFeeRate(rate) { this.peers.regular[i].sendFeeRate(rate); }; -/** - * Get peer by host. - * @param {String} addr - * @returns {Peer?} - */ - -Pool.prototype.getPeer = function getPeer(host) { - return this.peers.map[host]; -}; - /** * Request UTXOs from peer. * @param {Outpoint[]} outpoints @@ -1823,127 +1769,10 @@ Pool.prototype.fillCoins = function fillCoins(tx, callback) { */ Pool.prototype.getLoaderHost = function getLoaderHost() { - var host; - if (!this.connected && this.options.preferredSeed) - return this.seeds[0]; + return this.hosts.seeds[0]; - host = this.getRandom(this.seeds); - - if (host) - return host; - - host = this.getRandom(this.hosts); - - if (host) - return host; - - this.logger.warning('All seeds banned or ignored. Clearing...'); - this.peers.ignored = {}; - this.peers.misbehaving = {}; - - return this.getRandom(this.seeds); -}; - -/** - * Allocate a new host which is not currently being used. - * @returns {NetworkAddress} - */ - -Pool.prototype.getHost = function getHost() { - var host; - - // Hang back if we don't have a loader peer yet. - if (!this.peers.load) - return; - - host = this.getRandom(this.seeds, true); - - if (host) - return host; - - return this.getRandom(this.hosts, true); -}; - -/** - * Get a random host from collection of hosts. - * @param {NetworkAddress[]} hosts - * @param {Boolean} unique - * @returns {NetworkAddress} - */ - -Pool.prototype.getRandom = function getRandom(hosts, unique) { - var index = Math.random() * hosts.length | 0; - var last = -1; - var i, host; - - for (i = 0; i < hosts.length; i++) { - host = hosts[i]; - - if (this.isMisbehaving(host.host)) - continue; - - if (this.isIgnored(host.host)) - continue; - - if (unique && this.getPeer(host.host)) - continue; - - if (i >= index) - return host; - - last = i; - } - - if (last === -1) - return; - - return hosts[last]; -}; - -/** - * Add host to host list. - * @param {String|NetworkAddress} host - * @returns {Boolean} - */ - -Pool.prototype.addHost = function addHost(host) { - if (typeof host === 'string') - host = NetworkAddress.fromHostname(host, this.network); - - if (this.hosts.length > 500) - return; - - if (this.hostMap[host.host]) - return; - - utils.binaryInsert(this.hosts, host, compare); - - this.hostMap[host.host] = host; - - return host; -}; - -/** - * Remove host from host list. - * @param {String|NetworkAddress} host - * @returns {Boolean} - */ - -Pool.prototype.removeHost = function removeHost(host) { - if (host.host) - host = host.host; - - host = this.hostMap[host]; - - if (!host) - return; - - utils.binaryRemove(this.hosts, host, compare); - - delete this.hostMap[host]; - - return host; + return this.hosts.getLoaderHost(); }; /** @@ -1957,8 +1786,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) { peer.banScore += score; if (peer.banScore >= constants.BAN_SCORE) { - this.peers.misbehaving[peer.host] = utils.now(); - this.removeHost(peer.host); + this.hosts.ban(peer.host); this.logger.debug('Ban threshold exceeded (%s).', peer.host); peer.destroy(); return true; @@ -1967,6 +1795,30 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) { return false; }; +/** + * Ban a peer. + * @param {String|NetworkAddress} host + */ + +Pool.prototype.ban = function ban(host) { + var peer = this.peers.get(host); + + this.logger.debug('Banning peer (%s).', host); + this.hosts.ban(host); + + if (peer) + peer.destroy(); +}; + +/** + * Unban a peer. + * @param {String|NetworkAddress} host + */ + +Pool.prototype.unban = function unban(host) { + this.hosts.unban(host); +}; + /** * Test whether the host/peer is banned. * @param {String} host @@ -1974,25 +1826,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) { */ Pool.prototype.isMisbehaving = function isMisbehaving(host) { - var peer, time; - - if (host.host) - host = host.host; - - time = this.peers.misbehaving[host]; - - if (time != null) { - if (utils.now() > time + constants.BAN_TIME) { - delete this.peers.misbehaving[host]; - peer = this.getPeer(host); - if (peer) - peer.banScore = 0; - return false; - } - return true; - } - - return false; + return this.hosts.isMisbehaving(host); }; /** @@ -2002,8 +1836,7 @@ Pool.prototype.isMisbehaving = function isMisbehaving(host) { Pool.prototype.ignore = function ignore(peer) { this.logger.debug('Ignoring peer (%s).', peer.hostname); - if (!this.removeHost(peer.host)) - this.peers.ignored[peer.host] = true; + this.hosts.ignore(peer.host); peer.destroy(); }; @@ -2014,10 +1847,7 @@ Pool.prototype.ignore = function ignore(peer) { */ Pool.prototype.isIgnored = function isIgnored(host) { - if (host.host) - host = host.host; - - return this.peers.ignored[host] === true; + return this.hosts.isIgnored(host); }; /** @@ -2027,12 +1857,13 @@ Pool.prototype.isIgnored = function isIgnored(host) { Pool.prototype.getIP = function getIP(callback) { var self = this; - var request = require('./http/request'); - var ip; + var request, ip; if (utils.isBrowser) return callback(new Error('Could not find IP.')); + request = require('./http/' + 'request'); + request({ method: 'GET', uri: 'http://icanhazip.com', @@ -2057,12 +1888,13 @@ Pool.prototype.getIP = function getIP(callback) { */ Pool.prototype.getIP2 = function getIP2(callback) { - var request = require('./http/request'); - var ip; + var request, ip; if (utils.isBrowser) return callback(new Error('Could not find IP.')); + request = require('./http/' + 'request'); + request({ method: 'GET', uri: 'http://checkip.dyndns.org', @@ -2081,6 +1913,332 @@ Pool.prototype.getIP2 = function getIP2(callback) { }); }; +/** + * Peer List + * @constructor + */ + +function PeerList(pool) { + this.pool = pool; + // Peers that are loading blocks themselves + this.regular = []; + // Peers that are still connecting + this.pending = []; + // Peers that connected to us + this.leeches = []; + // Peers that are loading block ids + this.load = null; + // All peers + this.all = []; + // Map of hosts + this.map = {}; +} + +PeerList.prototype.addPending = function addPending(peer) { + this.pending.push(peer); + this.all.push(peer); + this.map[peer.host] = peer; +}; + +PeerList.prototype.promote = function promote(peer) { + if (utils.binaryRemove(this.pending, peer, compare)) + utils.binaryInsert(this.regular, peer, compare); +}; + +PeerList.prototype.remove = function remove(peer) { + utils.binaryRemove(this.pending, peer, compare); + utils.binaryRemove(this.regular, peer, compare); + utils.binaryRemove(this.leeches, peer, compare); + utils.binaryRemove(this.all, peer, compare); + + delete this.map[peer.host]; + + if (this.load === peer) { + this.pool.logger.info('Removed loader peer (%s).', peer.hostname); + this.load = null; + } +}; + +PeerList.prototype.isFull = function isFull() { + return this.regular.length + this.pending.length >= this.pool.maxPeers - 1; +}; + +PeerList.prototype.addLeech = function addLeech(peer) { + this.peers.leeches.push(peer); + this.peers.all.push(peer); + this.peers.map[peer.host] = peer; +}; + +PeerList.prototype.addLoader = function addLoader(peer) { + this.load = peer; + this.all.push(peer); + this.map[peer.host] = peer; +}; + +PeerList.prototype.get = function get(host) { + return this.map[host]; +}; + +PeerList.prototype.destroy = function destroy() { + var i, peers; + + if (this.load) + this.load.destroy(); + + peers = this.regular.slice(); + + for (i = 0; i < peers.length; i++) + peers[i].destroy(); + + peers = this.pending.slice(); + + for (i = 0; i < peers.length; i++) + peers[i].destroy(); + + peers = this.leeches.slice(); + + for (i = 0; i < peers.length; i++) + peers[i].destroy(); +}; + +/** + * Host List + * @constructor + */ + +function HostList(pool) { + this.pool = pool; + this.seeds = []; + this.items = []; + this.map = {}; + + // Ignored hosts + this.ignored = {}; + + // Misbehaving hosts + this.misbehaving = {}; +} + +/** + * Clear misbehaving and ignored. + */ + +HostList.prototype.clear = function clear() { + this.ignored = {}; + this.misbehaving = {}; +}; + +/** + * Allocate a new loader host. + * @returns {NetworkAddress} + */ + +HostList.prototype.getLoaderHost = function getLoaderHost() { + var host = this.getRandom(this.seeds); + + if (host) + return host; + + host = this.getRandom(this.items); + + if (host) + return host; + + this.pool.logger.warning('All seeds banned or ignored. Clearing...'); + this.clear(); + + return this.getRandom(this.seeds); +}; + +/** + * Allocate a new host which is not currently being used. + * @returns {NetworkAddress} + */ + +HostList.prototype.getHost = function getHost() { + var host = this.getRandom(this.seeds, true); + + if (host) + return host; + + return this.getRandom(this.items, true); +}; + +/** + * Get a random host from collection of hosts. + * @param {NetworkAddress[]} hosts + * @param {Boolean} unique + * @returns {NetworkAddress} + */ + +HostList.prototype.getRandom = function getRandom(hosts, unique) { + var index = Math.random() * hosts.length | 0; + var last = -1; + var i, host; + + for (i = 0; i < hosts.length; i++) { + host = hosts[i]; + + if (this.isMisbehaving(host.host)) + continue; + + if (this.isIgnored(host.host)) + continue; + + if (unique && this.pool.peers.get(host.host)) + continue; + + if (i >= index) + return host; + + last = i; + } + + if (last === -1) + return; + + return hosts[last]; +}; + +/** + * Add host to host list. + * @param {String|NetworkAddress} host + * @returns {Boolean} + */ + +HostList.prototype.add = function add(host) { + if (typeof host === 'string') + host = NetworkAddress.fromHostname(host, this.network); + + if (this.items.length > 500) + return; + + if (this.map[host.host]) + return; + + utils.binaryInsert(this.items, host, compare); + + this.map[host.host] = host; + + return host; +}; + +/** + * Remove host from host list. + * @param {String|NetworkAddress} host + * @returns {Boolean} + */ + +HostList.prototype.remove = function remove(host) { + if (host.host) + host = host.host; + + host = this.map[host]; + + if (!host) + return; + + utils.binaryRemove(this.items, host, compare); + + delete this.map[host]; + + return host; +}; + +/** + * Increase peer's ban score. + * @param {String} host + */ + +HostList.prototype.ban = function ban(host) { + if (host.host) + host = host.host; + + this.misbehaving[host] = utils.now(); + this.remove(host); +}; + +/** + * Unban host. + * @param {String} host + */ + +HostList.prototype.unban = function unban(host) { + if (host.host) + host = host.host; + + delete this.misbehaving[host]; + delete this.ignored[host]; +}; + +/** + * Test whether the host/peer is banned. + * @param {String} host + * @returns {Boolean} + */ + +HostList.prototype.isMisbehaving = function isMisbehaving(host) { + var time; + + if (host.host) + host = host.host; + + time = this.misbehaving[host]; + + if (time != null) { + if (utils.now() > time + constants.BAN_TIME) { + delete this.misbehaving[host]; + return false; + } + return true; + } + + return false; +}; + +/** + * Ignore peer. + * @param {Peer} peer + */ + +HostList.prototype.ignore = function ignore(host) { + if (host.host) + host = host.host; + + if (!this.remove(host)) + this.ignored[host] = true; +}; + +/** + * Test whether the host/peer is ignored. + * @param {String} host + * @returns {Boolean} + */ + +HostList.prototype.isIgnored = function isIgnored(host) { + if (host.host) + host = host.host; + + return this.ignored[host] === true; +}; + +HostList.prototype.setSeeds = function setSeeds(seeds) { + var i, hostname, seed; + + this.seeds.length = 0; + + for (i = 0; i < seeds.length; i++) { + hostname = seeds[i]; + seed = NetworkAddress.fromHostname(hostname, this.pool.network); + this.seeds.push(seed); + } +}; + +HostList.prototype.addSeed = function addSeed(hostname) { + var seed = NetworkAddress.fromHostname(hostname, this.pool.network); + this.seeds.unshift(seed); +}; + /** * Represents an in-flight block or transaction. * @exports LoadRequest @@ -2109,8 +2267,8 @@ function LoadRequest(pool, peer, type, hash, callback) { this.addCallback(callback); - assert(!this.pool.request.map[this.hash]); - this.pool.request.map[this.hash] = this; + assert(!this.pool.requestMap[this.hash]); + this.pool.requestMap[this.hash] = this; } /** @@ -2127,7 +2285,7 @@ LoadRequest.prototype.destroy = function destroy() { */ LoadRequest.prototype._onTimeout = function _onTimeout() { - if (this.type !== this.pool.tx.type + if (this.type !== this.pool.txType && this.peer === this.pool.peers.load) { this.pool.logger.debug( 'Loader took too long serving a block. Finding a new one.'); @@ -2154,12 +2312,12 @@ LoadRequest.prototype.start = function start() { this.timeout = setTimeout(this.onTimeout, this.pool.requestTimeout); this.active = true; - this.pool.request.active++; + this.pool.activeRequest++; - if (this.type === this.pool.tx.type) - this.pool.request.activeTX++; + if (this.type === this.pool.txType) + this.pool.activeTX++; else - this.pool.request.activeBlocks++; + this.pool.activeBlocks++; return this; }; @@ -2173,22 +2331,22 @@ LoadRequest.prototype.start = function start() { LoadRequest.prototype.finish = function finish(err) { var i; - if (this.pool.request.map[this.hash]) { - delete this.pool.request.map[this.hash]; + if (this.pool.requestMap[this.hash]) { + delete this.pool.requestMap[this.hash]; if (this.active) { this.active = false; - this.pool.request.active--; - if (this.type === this.pool.tx.type) - this.pool.request.activeTX--; + this.pool.activeRequest--; + if (this.type === this.pool.txType) + this.pool.activeTX--; else - this.pool.request.activeBlocks--; + this.pool.activeBlocks--; } } - if (this.type === this.pool.tx.type) - utils.binaryRemove(this.peer.queue.tx, this, compare); + if (this.type === this.pool.txType) + utils.binaryRemove(this.peer.queueTX, this, compare); else - utils.binaryRemove(this.peer.queue.block, this, compare); + utils.binaryRemove(this.peer.queueBlock, this, compare); if (this.timeout != null) { clearTimeout(this.timeout); @@ -2207,7 +2365,7 @@ LoadRequest.prototype.finish = function finish(err) { LoadRequest.prototype.inspect = function inspect() { return ''; @@ -2287,10 +2445,10 @@ BroadcastItem.prototype.addCallback = function addCallback(callback) { BroadcastItem.prototype.start = function start() { assert(!this.timeout, 'Already started.'); - assert(!this.pool.inv.map[this.hash], 'Already started.'); + assert(!this.pool.invMap[this.hash], 'Already started.'); - this.pool.inv.map[this.hash] = this; - utils.binaryInsert(this.pool.inv.items, this, compare); + this.pool.invMap[this.hash] = this; + utils.binaryInsert(this.pool.invItems, this, compare); this.refresh(); @@ -2312,7 +2470,7 @@ BroadcastItem.prototype.refresh = function refresh() { this.timeout = setTimeout(function() { self.emit('timeout'); self.finish(new Error('Timed out.')); - }, this.pool.inv.timeout); + }, this.pool.invTimeout); }; /** @@ -2332,13 +2490,13 @@ BroadcastItem.prototype.finish = function finish(err) { var i; assert(this.timeout, 'Already finished.'); - assert(this.pool.inv.map[this.hash], 'Already finished.'); + assert(this.pool.invMap[this.hash], 'Already finished.'); clearTimeout(this.timeout); this.timeout = null; - delete this.pool.inv.map[this.hash]; - utils.binaryRemove(this.pool.inv.items, this, compare); + delete this.pool.invMap[this.hash]; + utils.binaryRemove(this.pool.invItems, this, compare); for (i = 0; i < this.callback.length; i++) this.callback[i](err);