net: better write handling.

This commit is contained in:
Christopher Jeffrey 2016-11-24 14:43:35 -08:00
parent 01bb0adc17
commit 6c25efca89
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 177 additions and 125 deletions

View File

@ -125,8 +125,10 @@ function Peer(pool, addr, socket) {
this.banScore = 0;
this.pingTimeout = null;
this.pingTimer = null;
this.pingInterval = 30000;
this.stallTimer = null;
this.stallInterval = 5000;
this.requestTimeout = 10000;
this.requestMap = {};
@ -243,7 +245,7 @@ Peer.prototype._init = function init() {
});
this.bip151.on('rekey', function() {
self.logger.debug('Rekeying with peer (%s).', self.hostname);
self.send(self.bip151.toRekey());
self.trySend(self.bip151.toRekey());
});
}
@ -286,6 +288,7 @@ Peer.prototype.connect = function connect(port, host) {
Peer.prototype.open = co(function* open() {
try {
yield this._connect();
yield this._stallify();
yield this._bip151();
yield this._bip150();
yield this._handshake();
@ -295,11 +298,7 @@ Peer.prototype.open = co(function* open() {
return;
}
// The peer may have been killed before
// the handshake finished. In this case,
// do not emit `open`.
if (this.destroyed)
return;
assert(!this.destroyed);
// Finally we can let the pool know
// that this peer is ready to go.
@ -339,6 +338,20 @@ Peer.prototype._connect = function _connect() {
});
};
/**
* Setup stall timer.
* @private
*/
Peer.prototype._stallify = function _stallify() {
var self = this;
assert(!this.stallTimer);
this.stallTimer = setInterval(function() {
self.maybeStall();
}, this.stallInterval);
return Promise.resolve();
};
/**
* Handle `connect` event (called immediately
* if a socket was passed into peer).
@ -354,7 +367,7 @@ Peer.prototype._bip151 = co(function* _bip151() {
this.logger.info('Attempting BIP151 handshake (%s).', this.hostname);
this.send(this.bip151.toEncinit());
yield this.send(this.bip151.toEncinit());
try {
yield this.bip151.wait(3000);
@ -389,7 +402,7 @@ Peer.prototype._bip150 = co(function* _bip150() {
if (this.bip150.outbound) {
if (!this.bip150.peerIdentity)
throw new Error('No known identity for peer.');
this.send(this.bip150.toChallenge());
yield this.send(this.bip150.toChallenge());
}
yield this.bip150.wait(3000);
@ -410,13 +423,13 @@ Peer.prototype._bip150 = co(function* _bip150() {
Peer.prototype._handshake = co(function* _handshake() {
// Say hello.
this.sendVersion();
yield this.sendVersion();
// Advertise our address.
if (this.pool.address.host !== '0.0.0.0'
&& !this.options.selfish
&& this.pool.server) {
this.send(new packets.AddrPacket([this.pool.address]));
yield this.send(new packets.AddrPacket([this.pool.address]));
}
yield this.request('verack');
@ -446,14 +459,14 @@ Peer.prototype._finalize = co(function* _finalize() {
var self = this;
// Setup the ping interval.
this.pingTimeout = setInterval(function() {
this.pingTimer = setInterval(function() {
self.sendPing();
}, this.pingInterval);
// Ask for headers-only.
if (this.options.headers) {
if (this.version.version >= 70012)
this.send(new packets.SendHeadersPacket());
yield this.send(new packets.SendHeadersPacket());
}
// Let them know we support segwit (old
@ -461,32 +474,30 @@ Peer.prototype._finalize = co(function* _finalize() {
// of service bits).
if (this.options.witness && this.network.oldWitness) {
if (this.version.version >= 70012)
this.send(new packets.HaveWitnessPacket());
yield this.send(new packets.HaveWitnessPacket());
}
// We want compact blocks!
if (this.options.compact) {
if (this.version.version >= 70014)
this.sendCompact();
yield this.sendCompact();
}
// Find some more peers.
this.send(new packets.GetAddrPacket());
yield this.send(new packets.GetAddrPacket());
// Relay our spv filter if we have one.
this.updateWatch();
yield this.updateWatch();
// Announce our currently broadcasted items.
this.announce(this.pool.invItems);
yield this.announce(this.pool.invItems);
// Set a fee rate filter.
if (this.pool.feeRate !== -1)
this.sendFeeRate(this.pool.feeRate);
yield this.sendFeeRate(this.pool.feeRate);
// Start syncing the chain.
this.sync();
yield co.wait();
yield this.sync();
});
/**
@ -498,6 +509,15 @@ Peer.prototype.isLoader = function isLoader() {
return this === this.pool.peers.load;
};
/**
* Broadcast items to peer (transactions or blocks).
* @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items
*/
Peer.prototype.tryAnnounce = function tryAnnounce(items) {
return this.announce().catch(util.nop);
};
/**
* Broadcast items to peer (transactions or blocks).
* @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items
@ -658,14 +678,11 @@ Peer.prototype.sendVersion = function sendVersion() {
*/
Peer.prototype.sendPing = function sendPing() {
if (this.maybeStall())
return Promise.resolve();
if (!this.version)
return Promise.resolve();
if (this.version.version <= 60000)
return this.send(new packets.PingPacket());
return this.trySend(new packets.PingPacket());
if (this.challenge) {
this.logger.debug('Peer has not responded to ping (%s).', this.hostname);
@ -676,7 +693,7 @@ Peer.prototype.sendPing = function sendPing() {
this.lastPing = util.ms();
this.challenge = util.nonce();
return this.send(new packets.PingPacket(this.challenge));
return this.trySend(new packets.PingPacket(this.challenge));
};
/**
@ -709,7 +726,7 @@ Peer.prototype.updateWatch = function updateWatch() {
if (!this.options.spv)
return Promise.resolve();
return this.send(new packets.FilterLoadPacket(this.pool.spvFilter));
return this.trySend(new packets.FilterLoadPacket(this.pool.spvFilter));
};
/**
@ -718,7 +735,7 @@ Peer.prototype.updateWatch = function updateWatch() {
*/
Peer.prototype.sendFeeRate = function sendFeeRate(rate) {
return this.send(new packets.FeeFilterPacket(rate));
return this.trySend(new packets.FeeFilterPacket(rate));
};
/**
@ -731,7 +748,7 @@ Peer.prototype.destroy = function destroy() {
if (this.destroyed)
return;
this.finishDrain();
this.finishDrain(new Error('Peer destroyed.'));
this.destroyed = true;
this.connected = false;
@ -745,9 +762,14 @@ Peer.prototype.destroy = function destroy() {
if (this.bip150)
this.bip150.destroy();
if (this.pingTimeout != null) {
clearInterval(this.pingTimeout);
this.pingTimeout = null;
if (this.pingTimer != null) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
if (this.stallTimer != null) {
clearInterval(this.stallTimer);
this.stallTimer = null;
}
if (this.connectTimeout != null) {
@ -783,7 +805,7 @@ Peer.prototype.destroy = function destroy() {
Peer.prototype.write = function write(data) {
if (this.destroyed)
return Promise.resolve();
return Promise.reject(new Error('Peer destroyed (write).'));
this.lastSend = util.ms();
@ -804,7 +826,7 @@ Peer.prototype.onDrain = function onDrain(size) {
var self = this;
if (this.maybeStall())
return Promise.resolve();
return Promise.reject(new Error('Peer stalled (drain).'));
this.drainStart = util.now();
this.drainSize += size;
@ -815,7 +837,7 @@ Peer.prototype.onDrain = function onDrain(size) {
util.mb(this.drainSize),
this.hostname);
this.error('Peer stalled.');
return Promise.resolve();
return Promise.reject(new Error('Peer stalled (drain).'));
}
return new Promise(function(resolve, reject) {
@ -857,6 +879,16 @@ Peer.prototype.finishDrain = function finishDrain(err) {
jobs[i](err);
};
/**
* Send a packet (no error handling).
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.trySend = function send(packet) {
return this.send(packet).catch(util.nop);
};
/**
* Send a packet.
* @param {Packet} packet
@ -1009,7 +1041,7 @@ Peer.prototype.getData = function getData(items) {
data[i] = item;
}
return this.send(new packets.GetDataPacket(data));
return this.trySend(new packets.GetDataPacket(data));
};
/**
@ -2368,7 +2400,7 @@ Peer.prototype.sendAlert = function sendAlert(alert) {
if (!this.invFilter.added(alert.hash()))
return Promise.resolve();
return this.send(alert);
return this.trySend(alert);
};
/**
@ -2451,7 +2483,7 @@ Peer.prototype.sendMempool = function sendMempool() {
'Requesting inv packet from peer with mempool (%s).',
this.hostname);
return this.send(new packets.MempoolPacket());
return this.trySend(new packets.MempoolPacket());
};
/**
@ -2476,7 +2508,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) {
'Sending reject packet to peer (%s).',
this.hostname);
return this.send(reject);
return this.trySend(reject);
};
/**
@ -2486,7 +2518,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) {
Peer.prototype.sendCompact = function sendCompact() {
var version = this.options.witness ? 2 : 1;
this.logger.info('Initializing compact blocks (%s).', this.hostname);
return this.send(new packets.SendCmpctPacket(0, version));
return this.trySend(new packets.SendCmpctPacket(0, version));
};
/**
@ -2562,8 +2594,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) {
return;
}
// Note: call and forget
this.sendGetBlocks(locator, root);
yield this.sendGetBlocks(locator, root);
});
/**
@ -2575,8 +2606,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) {
Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) {
var locator = yield this.chain.getLocator(tip);
// Note: call and forget
this.sendGetHeaders(locator, stop);
yield this.sendGetHeaders(locator, stop);
});
/**
@ -2588,8 +2618,7 @@ Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) {
Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) {
var locator = yield this.chain.getLocator(tip);
// Note: call and forget
this.sendGetBlocks(locator, stop);
yield this.sendGetBlocks(locator, stop);
});
/**
@ -2597,33 +2626,42 @@ Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) {
* @returns {Promise}
*/
Peer.prototype.sync = function sync() {
Peer.prototype.trySync = function trySync() {
return this.sync().catch(util.nop);
};
/**
* Start syncing from peer.
* @returns {Promise}
*/
Peer.prototype.sync = co(function* sync() {
var tip;
if (!this.pool.syncing)
return Promise.resolve();
return;
if (!this.ack)
return Promise.resolve();
return;
if (this.syncSent)
return Promise.resolve();
return;
if (!this.version.hasNetwork())
return Promise.resolve();
return;
if (this.options.witness && !this.version.hasWitness())
return Promise.resolve();
return;
if (!this.isLoader()) {
if (!this.chain.synced)
return Promise.resolve();
return;
}
// Ask for the mempool if we're synced.
if (this.network.requestMempool) {
if (this.isLoader() && this.chain.synced)
this.sendMempool();
yield this.sendMempool();
}
this.syncSent = true;
@ -2632,11 +2670,11 @@ Peer.prototype.sync = function sync() {
if (!this.chain.tip.isGenesis())
tip = this.chain.tip.prevBlock;
return this.getHeaders(tip);
return yield this.getHeaders(tip);
}
return this.getBlocks();
};
return yield this.getBlocks();
});
/**
* Inspect the peer.

View File

@ -639,7 +639,7 @@ Pool.prototype.setLoader = function setLoader(peer) {
this.peers.repurpose(peer);
this.fillPeers();
peer.sync();
peer.trySync();
util.nextTick(function() {
self.emit('loader', peer);
@ -675,10 +675,10 @@ Pool.prototype.sync = function sync() {
var i;
if (this.peers.load)
this.peers.load.sync();
this.peers.load.trySync();
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].sync();
this.peers.regular[i].trySync();
};
/**
@ -770,7 +770,7 @@ Pool.prototype.__handleHeaders = co(function* _handleHeaders(headers, peer) {
last = hash;
yield this.getData(peer, this.blockType, hash);
yield this.getBlock(peer, hash);
}
// Schedule the getdata's we just added.
@ -830,7 +830,7 @@ Pool.prototype._handleBlocks = co(function* _handleBlocks(hashes, peer) {
continue;
}
exists = yield this.getData(peer, this.blockType, hash);
exists = yield this.getBlock(peer, hash);
// Normally we request the hashContinue.
// In the odd case where we already have
@ -1200,7 +1200,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
for (i = 0; i < txs.length; i++) {
hash = txs[i];
try {
yield self.getData(peer, self.txType, hash);
yield self.getTX(peer, hash);
} catch (e) {
self.emit('error', e);
}
@ -1367,7 +1367,7 @@ Pool.prototype._handleTX = co(function* _handleTX(tx, peer) {
if (this.options.requestMissing && missing) {
for (i = 0; i < missing.length; i++)
yield this.getData(peer, this.txType, missing[i]);
yield this.getTX(peer, missing[i]);
}
this.emit('tx', tx, peer);
@ -1545,46 +1545,23 @@ Pool.prototype.watchAddress = function watchAddress(address) {
* Queue a `getdata` request to be sent. Checks existence
* in the chain before requesting.
* @param {Peer} peer
* @param {Number} type - `getdata` type (see {@link constants.inv}).
* @param {Hash} hash - {@link Block} or {@link TX} hash.
* @param {Hash} hash - Block hash.
* @returns {Promise}
*/
Pool.prototype.getData = co(function* getData(peer, type, hash) {
var self = this;
var item, exists;
Pool.prototype.getBlock = co(function* getBlock(peer, hash) {
var item;
if (!this.loaded)
return;
exists = yield this.has(peer, type, hash);
if (exists)
return true;
if (peer.destroyed)
throw new Error('Peer is already destroyed (getdata).');
item = new LoadRequest(this, peer, type, hash);
if (yield this.hasBlock(hash))
return true;
if (type === this.txType) {
if (peer.queueTX.length === 0) {
util.nextTick(function() {
self.logger.debug(
'Requesting %d/%d txs from peer with getdata (%s).',
peer.queueTX.length,
self.activeTX,
peer.hostname);
peer.getData(peer.queueTX);
peer.queueTX.length = 0;
});
}
peer.queueTX.push(item.start());
return false;
}
item = new LoadRequest(this, peer, this.blockType, hash);
peer.queueBlock.push(item);
@ -1592,60 +1569,97 @@ Pool.prototype.getData = co(function* getData(peer, type, hash) {
});
/**
* Test whether the pool has or has seen an item.
* Test whether the chain has or has seen an item.
* @param {Peer} peer
* @param {InvType} type
* @param {Hash} hash
* @returns {Promise} - Returns Boolean.
*/
Pool.prototype.has = co(function* has(peer, type, hash) {
var exists = yield this.exists(type, hash);
if (exists)
Pool.prototype.hasBlock = co(function* hasBlock(hash) {
// Check the chain.
if (yield this.chain.has(hash))
return true;
// Check the pending requests.
if (this.requestMap[hash])
return true;
if (type !== this.txType)
return false;
// If we recently rejected this item. Ignore.
if (this.hasReject(hash)) {
this.logger.spam(
'Peer sent a known reject of %s (%s).',
util.revHex(hash), peer.hostname);
return true;
}
return false;
});
/**
* Test whether the chain or mempool has seen an item.
* Queue a `getdata` request to be sent. Checks existence
* in the mempool before requesting.
* @param {Peer} peer
* @param {Hash} hash - TX hash.
* @returns {Promise}
*/
Pool.prototype.getTX = function getTX(peer, hash) {
var self = this;
var item;
if (!this.loaded)
return;
if (peer.destroyed)
throw new Error('Peer is already destroyed (getdata).');
if (this.hasTX(hash))
return true;
item = new LoadRequest(this, peer, this.txType, hash);
if (peer.queueTX.length === 0) {
util.nextTick(function() {
self.logger.debug(
'Requesting %d/%d txs from peer with getdata (%s).',
peer.queueTX.length,
self.activeTX,
peer.hostname);
peer.getData(peer.queueTX);
peer.queueTX.length = 0;
});
}
peer.queueTX.push(item.start());
return false;
};
/**
* Test whether the mempool has or has seen an item.
* @param {Peer} peer
* @param {InvType} type
* @param {Hash} hash
* @returns {Promise} - Returns Boolean.
*/
Pool.prototype.exists = function exists(type, hash) {
if (type === this.txType) {
Pool.prototype.hasTX = function hasTX(hash) {
if (!this.mempool) {
// Check the TX filter if
// we don't have a mempool.
if (!this.mempool) {
if (this.txFilter.added(hash, 'hex'))
return Promise.resolve(false);
return Promise.resolve(true);
}
if (!this.txFilter.added(hash, 'hex'))
return true;
} else {
// Check the mempool.
return Promise.resolve(this.mempool.has(hash));
if (this.mempool.has(hash))
return true;
}
// Check the chain.
return this.chain.has(hash);
// If we recently rejected this item. Ignore.
if (this.hasReject(hash)) {
this.logger.spam('Saw known reject of %s.', util.revHex(hash));
return true;
}
// Check the pending requests.
if (this.requestMap[hash])
return true;
return false;
};
/**
@ -1769,10 +1783,10 @@ Pool.prototype.announce = function announce(msg) {
var i;
if (this.peers.load)
this.peers.load.announce(msg);
this.peers.load.tryAnnounce(msg);
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].announce(msg);
this.peers.regular[i].tryAnnounce(msg);
};
/**
@ -2499,7 +2513,7 @@ BroadcastItem.prototype.refresh = function refresh() {
*/
BroadcastItem.prototype.announce = function announce() {
this.pool.announce(this);
return this.pool.announce(this);
};
/**