peer: flush data to socket.

This commit is contained in:
Christopher Jeffrey 2016-09-29 15:40:03 -07:00
parent a467b4e475
commit 661a8f2f20
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 49 additions and 14 deletions

View File

@ -12,6 +12,7 @@ var EventEmitter = require('events').EventEmitter;
var utils = require('../utils/utils');
var spawn = require('../utils/spawn');
var co = spawn.co;
var wrap = spawn.wrap;
var Parser = require('./parser');
var Framer = require('./framer');
var packets = require('./packets');
@ -106,6 +107,7 @@ function Peer(pool, addr, socket) {
this.bip150 = null;
this.lastSend = 0;
this.lastRecv = 0;
this.outgoing = 0;
this.challenge = null;
this.lastPong = -1;
@ -744,25 +746,30 @@ Peer.prototype.destroy = function destroy() {
/**
* Write data to the peer's socket.
* @param {Buffer} data
* @returns {Boolean}
* @returns {Promise}
*/
Peer.prototype.write = function write(data) {
var self = this;
if (this.destroyed)
return false;
return Promise.resolve(null);
this.lastSend = utils.ms();
return this.socket.write(data);
return new Promise(function(resolve, reject) {
self.socket.write(data, wrap(resolve, reject));
});
};
/**
* Send a packet.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.send = function send(packet) {
var tx, checksum;
var tx, checksum, payload;
// Used cached hashes as the
// packet checksum for speed.
@ -776,9 +783,29 @@ Peer.prototype.send = function send(packet) {
}
}
this.write(this.framer.packet(packet.cmd, packet.toRaw(), checksum));
payload = this.framer.packet(packet.cmd, packet.toRaw(), checksum);
return this.write(payload);
};
/**
* Send a packet. Throttle reads and wait for flush.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.flush = co(function* flush(packet) {
assert(this.outgoing >= 0);
if (this.outgoing++ === 0)
this.socket.pause();
yield this.send(packet);
if (--this.outgoing === 0)
this.socket.resume();
});
/**
* Emit an error and destroy the peer.
* @private
@ -1528,7 +1555,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
* @param {GetDataPacket}
*/
Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
var notFound = [];
var items = packet.items;
var i, j, item, entry, tx, block;
@ -1560,7 +1587,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
continue;
}
this.send(new packets.TXPacket(tx, item.hasWitness()));
yield this.flush(new packets.TXPacket(tx, item.hasWitness()));
continue;
}
@ -1570,7 +1597,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
switch (item.type) {
case constants.inv.BLOCK:
case constants.inv.WITNESS_BLOCK:
this.send(new packets.BlockPacket(block, item.hasWitness()));
yield this.flush(new packets.BlockPacket(block, item.hasWitness()));
break;
case constants.inv.FILTERED_BLOCK:
case constants.inv.WITNESS_FILTERED_BLOCK:
@ -1581,18 +1608,18 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
block = block.toMerkle(this.spvFilter);
this.send(new packets.MerkleBlockPacket(block));
yield this.flush(new packets.MerkleBlockPacket(block));
for (j = 0; j < block.txs.length; j++) {
tx = block.txs[j];
this.send(new packets.TXPacket(tx, item.hasWitness()));
yield this.flush(new packets.TXPacket(tx, item.hasWitness()));
}
break;
case constants.inv.CMPCT_BLOCK:
// Fallback to full block.
if (block.height < this.chain.tip.height - 10) {
this.send(new packets.BlockPacket(block, false));
yield this.flush(new packets.BlockPacket(block, false));
break;
}
@ -1607,7 +1634,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
break;
}
this.send(new packets.CmpctBlockPacket(block, false));
yield this.flush(new packets.CmpctBlockPacket(block, false));
break;
default:
this.logger.warning(
@ -2157,7 +2184,8 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) {
res = bcoin.bip152.TXResponse.fromBlock(block, req);
this.send(new packets.BlockTxnPacket(res, false));
yield this.flush(new packets.BlockTxnPacket(res, false));
this.fire('blocktxn', req);
});

View File

@ -129,9 +129,13 @@ ProxySocket.prototype.connect = function connect(port, host) {
this.sendBuffer.length = 0;
};
ProxySocket.prototype.write = function write(data) {
ProxySocket.prototype.write = function write(data, callback) {
if (!this.info) {
this.sendBuffer.push(data);
if (callback)
callback();
return true;
}
@ -139,6 +143,9 @@ ProxySocket.prototype.write = function write(data) {
this.socket.emit('tcp data', data.toString('hex'));
if (callback)
callback();
return true;
};