net: wait for peer drain on block serving.

This commit is contained in:
Christopher Jeffrey 2017-01-23 20:30:02 -08:00
parent 658120de6d
commit cfeafb6273
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 95 additions and 45 deletions

View File

@ -390,7 +390,7 @@ Peer.prototype.bind = function bind(socket) {
});
this.socket.on('drain', function() {
self.drainSize = 0;
self.handleDrain();
});
this.socket.on('data', function(chunk) {
@ -1047,13 +1047,11 @@ Peer.prototype.sendFeeRate = function sendFeeRate(rate) {
Peer.prototype.destroy = function destroy() {
var connected = this.connected;
var i, keys, cmd, entry;
var i, keys, cmd, entry, jobs, job;
if (this.destroyed)
return;
this.drainSize = 0;
this.destroyed = true;
this.connected = false;
@ -1086,6 +1084,16 @@ Peer.prototype.destroy = function destroy() {
this.connectTimeout = null;
}
jobs = this.drainQueue;
this.drainSize = 0;
this.drainQueue = [];
for (i = 0; i < jobs.length; i++) {
job = jobs[i];
job.reject(new Error('Peer was destroyed.'));
}
keys = this.responseMap.keys();
for (i = 0; i < keys.length; i++) {
@ -1103,7 +1111,6 @@ Peer.prototype.destroy = function destroy() {
/**
* Write data to the peer's socket.
* @param {Buffer} data
* @returns {Promise}
*/
Peer.prototype.write = function write(data) {
@ -1116,6 +1123,84 @@ Peer.prototype.write = function write(data) {
this.needsDrain(data.length);
};
/**
* Send a packet.
* @param {Packet} packet
*/
Peer.prototype.send = function send(packet) {
var tx, checksum;
if (this.destroyed)
throw new Error('Peer is destroyed (send).');
// Used cached hashes as the
// packet checksum for speed.
if (packet.type === packetTypes.TX) {
tx = packet.tx;
if (packet.witness) {
if (!tx.isCoinbase())
checksum = tx.witnessHash();
} else {
checksum = tx.hash();
}
}
this.sendRaw(packet.cmd, packet.toRaw(), checksum);
this.addTimeout(packet);
};
/**
* Send a packet.
* @param {Packet} packet
*/
Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) {
var payload = this.framePacket(cmd, body, checksum);
this.write(payload);
};
/**
* Wait for a drain event.
* @returns {Promise}
*/
Peer.prototype.drain = function drain() {
var self = this;
if (this.destroyed)
return Promise.reject(new Error('Peer is destroyed.'));
if (this.drainSize === 0)
return Promise.resolve();
return new Promise(function(resolve, reject) {
self.drainQueue.push(co.job(resolve, reject));
});
};
/**
* Handle drain event.
* @private
*/
Peer.prototype.handleDrain = function handleDrain() {
var jobs = this.drainQueue;
var i, job;
if (jobs.length === 0)
return;
this.drainSize = 0;
this.drainQueue = [];
for (i = 0; i < jobs.length; i++) {
job = jobs[i];
job.resolve();
}
};
/**
* Add to drain counter.
* @private
@ -1323,46 +1408,6 @@ Peer.prototype.wait = function wait(type, timeout) {
});
};
/**
* Send a packet.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.send = function send(packet) {
var tx, checksum;
if (this.destroyed)
throw new Error('Peer is destroyed (send).');
// Used cached hashes as the
// packet checksum for speed.
if (packet.type === packetTypes.TX) {
tx = packet.tx;
if (packet.witness) {
if (!tx.isCoinbase())
checksum = tx.witnessHash();
} else {
checksum = tx.hash();
}
}
this.addTimeout(packet);
this.sendRaw(packet.cmd, packet.toRaw(), checksum);
};
/**
* Send a packet.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) {
var payload = this.framePacket(cmd, body, checksum);
this.write(payload);
};
/**
* Emit an error and destroy the peer.
* @private

View File

@ -1595,6 +1595,11 @@ Pool.prototype.handleGetData = co(function* handleGetData(peer, packet) {
peer.sendInv([new InvItem(invTypes.BLOCK, this.chain.tip.hash)]);
peer.hashContinue = null;
}
// Wait for the peer to read
// before we pull more data
// out of the database.
yield peer.drain();
}
if (notFound.length > 0)