diff --git a/lib/net/peer.js b/lib/net/peer.js index ef03fc9c..5b83a4be 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -12,6 +12,7 @@ var EventEmitter = require('events').EventEmitter; var utils = require('../utils/utils'); var spawn = require('../utils/spawn'); var co = spawn.co; +var wrap = spawn.wrap; var Parser = require('./parser'); var Framer = require('./framer'); var packets = require('./packets'); @@ -106,6 +107,7 @@ function Peer(pool, addr, socket) { this.bip150 = null; this.lastSend = 0; this.lastRecv = 0; + this.outgoing = 0; this.challenge = null; this.lastPong = -1; @@ -744,25 +746,30 @@ Peer.prototype.destroy = function destroy() { /** * Write data to the peer's socket. * @param {Buffer} data - * @returns {Boolean} + * @returns {Promise} */ Peer.prototype.write = function write(data) { + var self = this; + if (this.destroyed) - return false; + return Promise.resolve(null); this.lastSend = utils.ms(); - return this.socket.write(data); + return new Promise(function(resolve, reject) { + self.socket.write(data, wrap(resolve, reject)); + }); }; /** * Send a packet. * @param {Packet} packet + * @returns {Promise} */ Peer.prototype.send = function send(packet) { - var tx, checksum; + var tx, checksum, payload; // Used cached hashes as the // packet checksum for speed. @@ -776,9 +783,29 @@ Peer.prototype.send = function send(packet) { } } - this.write(this.framer.packet(packet.cmd, packet.toRaw(), checksum)); + payload = this.framer.packet(packet.cmd, packet.toRaw(), checksum); + + return this.write(payload); }; +/** + * Send a packet. Throttle reads and wait for flush. + * @param {Packet} packet + * @returns {Promise} + */ + +Peer.prototype.flush = co(function* flush(packet) { + assert(this.outgoing >= 0); + + if (this.outgoing++ === 0) + this.socket.pause(); + + yield this.send(packet); + + if (--this.outgoing === 0) + this.socket.resume(); +}); + /** * Emit an error and destroy the peer. * @private @@ -1528,7 +1555,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { * @param {GetDataPacket} */ -Peer.prototype._handleGetData = co(function* _handleGetData(packet) { +Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { var notFound = []; var items = packet.items; var i, j, item, entry, tx, block; @@ -1560,7 +1587,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { continue; } - this.send(new packets.TXPacket(tx, item.hasWitness())); + yield this.flush(new packets.TXPacket(tx, item.hasWitness())); continue; } @@ -1570,7 +1597,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { switch (item.type) { case constants.inv.BLOCK: case constants.inv.WITNESS_BLOCK: - this.send(new packets.BlockPacket(block, item.hasWitness())); + yield this.flush(new packets.BlockPacket(block, item.hasWitness())); break; case constants.inv.FILTERED_BLOCK: case constants.inv.WITNESS_FILTERED_BLOCK: @@ -1581,18 +1608,18 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { block = block.toMerkle(this.spvFilter); - this.send(new packets.MerkleBlockPacket(block)); + yield this.flush(new packets.MerkleBlockPacket(block)); for (j = 0; j < block.txs.length; j++) { tx = block.txs[j]; - this.send(new packets.TXPacket(tx, item.hasWitness())); + yield this.flush(new packets.TXPacket(tx, item.hasWitness())); } break; case constants.inv.CMPCT_BLOCK: // Fallback to full block. if (block.height < this.chain.tip.height - 10) { - this.send(new packets.BlockPacket(block, false)); + yield this.flush(new packets.BlockPacket(block, false)); break; } @@ -1607,7 +1634,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { break; } - this.send(new packets.CmpctBlockPacket(block, false)); + yield this.flush(new packets.CmpctBlockPacket(block, false)); break; default: this.logger.warning( @@ -2157,7 +2184,8 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) { res = bcoin.bip152.TXResponse.fromBlock(block, req); - this.send(new packets.BlockTxnPacket(res, false)); + yield this.flush(new packets.BlockTxnPacket(res, false)); + this.fire('blocktxn', req); }); diff --git a/lib/net/proxysocket.js b/lib/net/proxysocket.js index 2c988591..14543dff 100644 --- a/lib/net/proxysocket.js +++ b/lib/net/proxysocket.js @@ -129,9 +129,13 @@ ProxySocket.prototype.connect = function connect(port, host) { this.sendBuffer.length = 0; }; -ProxySocket.prototype.write = function write(data) { +ProxySocket.prototype.write = function write(data, callback) { if (!this.info) { this.sendBuffer.push(data); + + if (callback) + callback(); + return true; } @@ -139,6 +143,9 @@ ProxySocket.prototype.write = function write(data) { this.socket.emit('tcp data', data.toString('hex')); + if (callback) + callback(); + return true; };