diff --git a/lib/net/peer.js b/lib/net/peer.js index 2d17dac8..c47572c8 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -390,7 +390,7 @@ Peer.prototype.bind = function bind(socket) { }); this.socket.on('drain', function() { - self.drainSize = 0; + self.handleDrain(); }); this.socket.on('data', function(chunk) { @@ -1047,13 +1047,11 @@ Peer.prototype.sendFeeRate = function sendFeeRate(rate) { Peer.prototype.destroy = function destroy() { var connected = this.connected; - var i, keys, cmd, entry; + var i, keys, cmd, entry, jobs, job; if (this.destroyed) return; - this.drainSize = 0; - this.destroyed = true; this.connected = false; @@ -1086,6 +1084,16 @@ Peer.prototype.destroy = function destroy() { this.connectTimeout = null; } + jobs = this.drainQueue; + + this.drainSize = 0; + this.drainQueue = []; + + for (i = 0; i < jobs.length; i++) { + job = jobs[i]; + job.reject(new Error('Peer was destroyed.')); + } + keys = this.responseMap.keys(); for (i = 0; i < keys.length; i++) { @@ -1103,7 +1111,6 @@ Peer.prototype.destroy = function destroy() { /** * Write data to the peer's socket. * @param {Buffer} data - * @returns {Promise} */ Peer.prototype.write = function write(data) { @@ -1116,6 +1123,84 @@ Peer.prototype.write = function write(data) { this.needsDrain(data.length); }; +/** + * Send a packet. + * @param {Packet} packet + */ + +Peer.prototype.send = function send(packet) { + var tx, checksum; + + if (this.destroyed) + throw new Error('Peer is destroyed (send).'); + + // Used cached hashes as the + // packet checksum for speed. + if (packet.type === packetTypes.TX) { + tx = packet.tx; + if (packet.witness) { + if (!tx.isCoinbase()) + checksum = tx.witnessHash(); + } else { + checksum = tx.hash(); + } + } + + this.sendRaw(packet.cmd, packet.toRaw(), checksum); + + this.addTimeout(packet); +}; + +/** + * Send a packet. + * @param {Packet} packet + */ + +Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) { + var payload = this.framePacket(cmd, body, checksum); + this.write(payload); +}; + +/** + * Wait for a drain event. + * @returns {Promise} + */ + +Peer.prototype.drain = function drain() { + var self = this; + + if (this.destroyed) + return Promise.reject(new Error('Peer is destroyed.')); + + if (this.drainSize === 0) + return Promise.resolve(); + + return new Promise(function(resolve, reject) { + self.drainQueue.push(co.job(resolve, reject)); + }); +}; + +/** + * Handle drain event. + * @private + */ + +Peer.prototype.handleDrain = function handleDrain() { + var jobs = this.drainQueue; + var i, job; + + if (jobs.length === 0) + return; + + this.drainSize = 0; + this.drainQueue = []; + + for (i = 0; i < jobs.length; i++) { + job = jobs[i]; + job.resolve(); + } +}; + /** * Add to drain counter. * @private @@ -1323,46 +1408,6 @@ Peer.prototype.wait = function wait(type, timeout) { }); }; -/** - * Send a packet. - * @param {Packet} packet - * @returns {Promise} - */ - -Peer.prototype.send = function send(packet) { - var tx, checksum; - - if (this.destroyed) - throw new Error('Peer is destroyed (send).'); - - // Used cached hashes as the - // packet checksum for speed. - if (packet.type === packetTypes.TX) { - tx = packet.tx; - if (packet.witness) { - if (!tx.isCoinbase()) - checksum = tx.witnessHash(); - } else { - checksum = tx.hash(); - } - } - - this.addTimeout(packet); - - this.sendRaw(packet.cmd, packet.toRaw(), checksum); -}; - -/** - * Send a packet. - * @param {Packet} packet - * @returns {Promise} - */ - -Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) { - var payload = this.framePacket(cmd, body, checksum); - this.write(payload); -}; - /** * Emit an error and destroy the peer. * @private diff --git a/lib/net/pool.js b/lib/net/pool.js index 40896e52..d93f8c71 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1595,6 +1595,11 @@ Pool.prototype.handleGetData = co(function* handleGetData(peer, packet) { peer.sendInv([new InvItem(invTypes.BLOCK, this.chain.tip.hash)]); peer.hashContinue = null; } + + // Wait for the peer to read + // before we pull more data + // out of the database. + yield peer.drain(); } if (notFound.length > 0)