diff --git a/lib/net/packets.js b/lib/net/packets.js index 2f3c5028..c07422bb 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -1147,6 +1147,8 @@ InvPacket.prototype.getSize = function getSize() { InvPacket.prototype.toWriter = function toWriter(bw) { var i, item; + assert(this.items.length <= 50000); + bw.writeVarint(this.items.length); for (i = 0; i < this.items.length; i++) { @@ -1178,6 +1180,8 @@ InvPacket.prototype.fromReader = function fromReader(br) { count = br.readVarint(); + assert(count <= 50000, 'Inv item count too high.'); + for (i = 0; i < count; i++) this.items.push(InvItem.fromReader(br)); diff --git a/lib/net/peer.js b/lib/net/peer.js index 19c65d20..70d66e1c 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -189,7 +189,7 @@ Peer.prototype._init = function init() { this.parser.on('packet', co(function* (packet) { try { - yield self._onPacket(packet); + yield self.handlePacket(packet); } catch (e) { self.destroy(); self.error(e); @@ -330,19 +330,16 @@ Peer.prototype.connect = function connect(port, host) { */ Peer.prototype.open = co(function* open() { - yield this._wait(); - yield this._stallify(); - yield this._bip151(); - yield this._bip150(); - yield this._handshake(); - yield this._finalize(); + yield this.initConnect(); + yield this.initStall(); + yield this.initBIP151(); + yield this.initBIP150(); + yield this.initVerack(); + yield this.finalize(); if (this.destroyed) throw new Error('Peer was destroyed.'); - clearTimeout(this.connectTimeout); - this.connectTimeout = null; - // Finally we can let the pool know // that this peer is ready to go. this.emit('open'); @@ -367,7 +364,7 @@ Peer.prototype.tryOpen = co(function* tryOpen() { * @private */ -Peer.prototype._wait = function _wait() { +Peer.prototype.initConnect = function initConnect() { var self = this; if (this.connected) { @@ -397,7 +394,7 @@ Peer.prototype._wait = function _wait() { * @private */ -Peer.prototype._stallify = function _stallify() { +Peer.prototype.initStall = function initStall() { var self = this; assert(!this.stallTimer); this.stallTimer = setInterval(function() { @@ -412,7 +409,7 @@ Peer.prototype._stallify = function _stallify() { * @private */ -Peer.prototype._bip151 = co(function* _bip151() { +Peer.prototype.initBIP151 = co(function* initBIP151() { // Send encinit. Wait for handshake to complete. if (!this.bip151) return; @@ -442,7 +439,7 @@ Peer.prototype._bip151 = co(function* _bip151() { * @private */ -Peer.prototype._bip150 = co(function* _bip150() { +Peer.prototype.initBIP150 = co(function* initBIP150() { if (!this.bip151 || !this.bip150) return; @@ -475,7 +472,7 @@ Peer.prototype._bip150 = co(function* _bip150() { * @private */ -Peer.prototype._handshake = co(function* _handshake() { +Peer.prototype.initVerack = co(function* initVerack() { // Say hello. this.sendVersion(); @@ -509,7 +506,7 @@ Peer.prototype._handshake = co(function* _handshake() { * @private */ -Peer.prototype._finalize = co(function* _finalize() { +Peer.prototype.finalize = co(function* finalize() { var self = this; // Setup the ping interval. @@ -591,7 +588,7 @@ Peer.prototype.announceBlock = function announceBlock(blocks) { // they're using compact block mode 1. if (this.compactMode && this.compactMode.mode === 1) { this.invFilter.add(block.hash()); - this._sendCompactBlock(block, this.compactWitness); + this.sendCompactBlock(block, this.compactWitness); continue; } @@ -1083,24 +1080,29 @@ Peer.prototype.getData = function getData(items) { * @param {Packet} packet */ -Peer.prototype._onPacket = co(function* onPacket(packet) { +Peer.prototype.handlePacket = co(function* handlePacket(packet) { var unlock; + // We stop reads and lock the peer for any + // packet with significant IO/asynchronocity. switch (packet.type) { - case packetTypes.VERSION: - case packetTypes.CMPCTBLOCK: - // These can't have locks or stop reads. - return yield this.__onPacket(packet); - default: + case packetTypes.GETDATA: + case packetTypes.GETBLOCKS: + case packetTypes.GETHEADERS: + case packetTypes.GETUTXOS: unlock = yield this.locker.lock(); + this.socket.pause(); + try { - return yield this.__onPacket(packet); + return yield this.onPacket(packet); } finally { this.socket.resume(); unlock(); } break; + default: + return yield this.onPacket(packet); } }); @@ -1110,7 +1112,7 @@ Peer.prototype._onPacket = co(function* onPacket(packet) { * @param {Packet} packet */ -Peer.prototype.__onPacket = co(function* onPacket(packet) { +Peer.prototype.onPacket = co(function* onPacket(packet) { this.lastRecv = util.ms(); if (this.destroyed) @@ -1611,11 +1613,12 @@ Peer.prototype.handleMempool = co(function* handleMempool(packet) { /** * Get a block/tx from the broadcast map. + * @private * @param {InvItem} item * @returns {Promise} */ -Peer.prototype._getBroadcasted = function _getBroadcasted(item) { +Peer.prototype.getBroadcasted = function getBroadcasted(item) { var entry = this.pool.invMap[item.hash]; if (!entry) @@ -1646,12 +1649,13 @@ Peer.prototype._getBroadcasted = function _getBroadcasted(item) { /** * Get a block/tx either from the broadcast map, mempool, or blockchain. + * @private * @param {InvItem} item * @returns {Promise} */ -Peer.prototype._getItem = co(function* _getItem(item) { - var entry = this._getBroadcasted(item); +Peer.prototype.getItem = co(function* getItem(item) { + var entry = this.getBroadcasted(item); if (entry) return entry; @@ -1676,12 +1680,13 @@ Peer.prototype._getItem = co(function* _getItem(item) { /** * Send a block from the broadcast list or chain. + * @private * @param {InvItem} item * @returns {Boolean} */ -Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) { - var block = this._getBroadcasted(item); +Peer.prototype.sendBlock = co(function* sendBlock(item, witness) { + var block = this.getBroadcasted(item); // Check for a broadcasted item first. if (block) { @@ -1720,12 +1725,13 @@ Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) { /** * Send a compact block. + * @private * @param {Block} block * @param {Boolean} witness * @returns {Boolean} */ -Peer.prototype._sendCompactBlock = function _sendCompactBlock(block, witness) { +Peer.prototype.sendCompactBlock = function sendCompactBlock(block, witness) { // Try again with a new nonce // if we get a siphash collision. for (;;) { @@ -1761,7 +1767,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { item = items[i]; if (item.isTX()) { - tx = yield this._getItem(item); + tx = yield this.getItem(item); if (!tx) { notFound.push(item); @@ -1788,7 +1794,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { switch (item.type) { case constants.inv.BLOCK: case constants.inv.WITNESS_BLOCK: - result = yield this._sendBlock(item, item.hasWitness()); + result = yield this.sendBlock(item, item.hasWitness()); if (!result) { notFound.push(item); continue; @@ -1802,7 +1808,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { continue; } - block = yield this._getItem(item); + block = yield this.getItem(item); if (!block) { notFound.push(item); @@ -1826,7 +1832,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { // Fallback to full block. height = yield this.chain.db.getHeight(item.hash); if (height < this.chain.tip.height - 10) { - result = yield this._sendBlock(item, this.compactWitness); + result = yield this.sendBlock(item, this.compactWitness); if (!result) { notFound.push(item); continue; @@ -1835,14 +1841,14 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { break; } - block = yield this._getItem(item); + block = yield this.getItem(item); if (!block) { notFound.push(item); continue; } - yield this._sendCompactBlock(block, this.compactWitness); + yield this.sendCompactBlock(block, this.compactWitness); blocks++; @@ -2387,7 +2393,7 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { item = new InvItem(constants.inv.BLOCK, req.hash); - block = yield this._getItem(item); + block = yield this.getItem(item); if (!block) { this.logger.debug(