peer: handle drains better.

This commit is contained in:
Christopher Jeffrey 2016-11-19 14:33:18 -08:00
parent 9353a86e14
commit 8f743b6e7d
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
7 changed files with 119 additions and 44 deletions

View File

@ -5,7 +5,6 @@
process.title = 'bcoin';
var bcoin = require('../');
var utils = bcoin.utils;
var assert = require('assert');
var options = bcoin.config({

View File

@ -5,7 +5,7 @@
process.title = 'bcoin';
var bcoin = require('../');
var utils = bcoin.utils;
var util = bcoin.util;
var assert = require('assert');
var options = bcoin.config({
@ -38,7 +38,7 @@ node.open().then(function() {
node.on('block', function(block) {
assert(block.txs.length >= 1);
if (block.txs.length > 1)
utils.log(block.txs[1]);
util.log(block.txs[1]);
});
}

View File

@ -330,7 +330,7 @@ util.inherits(BIP151, EventEmitter);
*/
BIP151.prototype.error = function error() {
var msg = util.fmt.apply(utils, arguments);
var msg = util.fmt.apply(util, arguments);
this.emit('error', new Error(msg));
};

View File

@ -76,7 +76,7 @@ Parser.prototype._init = function _init(str) {
*/
Parser.prototype.error = function error() {
var msg = util.fmt.apply(utils, arguments);
var msg = util.fmt.apply(util, arguments);
this.emit('error', new Error(msg));
};

View File

@ -114,6 +114,9 @@ function Peer(pool, addr, socket) {
this.bip150 = null;
this.lastSend = 0;
this.lastRecv = 0;
this.drainStart = 0;
this.drainSize = 0;
this.drainQueue = [];
this.challenge = null;
this.lastPong = -1;
@ -209,7 +212,14 @@ Peer.prototype._init = function init() {
self.error('socket hangup');
});
this.socket.on('drain', function() {
self.finishDrain();
});
this.socket.on('data', function(chunk) {
if (self.maybeStall())
return;
self.parser.feed(chunk);
});
@ -718,6 +728,8 @@ Peer.prototype.destroy = function destroy() {
if (this.destroyed)
return;
this.finishDrain(new Error('Destroyed.'));
this.destroyed = true;
this.connected = false;
@ -767,22 +779,81 @@ Peer.prototype.destroy = function destroy() {
*/
Peer.prototype.write = function write(data) {
var self = this;
if (this.destroyed)
return Promise.resolve();
this.lastSend = util.ms();
if (this.socket.write(data) === false) {
return new Promise(function(resolve, reject) {
self.socket.once('drain', resolve);
});
}
if (this.socket.write(data) === false)
return this.onDrain(data.length);
return Promise.resolve();
};
/**
* Wait for drain.
* @private
* @param {Function} resolve
* @param {Function} reject
*/
Peer.prototype.onDrain = function onDrain(size) {
var self = this;
if (this.maybeStall())
return Promise.resolve();
this.drainStart = util.now();
this.drainSize += size;
if (this.drainSize >= (10 << 20)) {
this.logger.warning(
'Peer is not reading: %s buffered (%s).',
util.mb(this.drainSize),
this.hostname);
this.error('Peer stalled.');
return Promise.reject(new Error('Peer stalled.'));
}
return new Promise(function(resolve, reject) {
self.drainQueue.push(co.wrap(resolve, reject));
});
};
/**
* Potentially timeout peer if it hasn't read.
* @private
*/
Peer.prototype.maybeStall = function maybeStall() {
if (this.drainQueue.length === 0)
return false;
if (util.now() < this.drainStart + 10)
return false;
this.finishDrain(new Error('Peer stalled.'));
this.error('Peer stalled.');
return true;
};
/**
* Notify drainers of the latest drain.
* @private
*/
Peer.prototype.finishDrain = function finishDrain(err) {
var jobs = this.drainQueue.slice();
var i;
this.drainQueue.length = 0;
this.drainSize = 0;
for (i = 0; i < jobs.length; i++)
jobs[i](err);
};
/**
* Send a packet.
* @param {Packet} packet
@ -839,7 +910,7 @@ Peer.prototype.error = function error(err, keep) {
if (typeof args[args.length - 1] === 'boolean')
keep = args.pop();
msg = util.fmt.apply(utils, args);
msg = util.fmt.apply(util, args);
err = new Error(msg);
}
@ -2619,6 +2690,11 @@ function compare(a, b) {
return a.id - b.id;
}
function Drainer(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
}
/*
* Expose
*/

View File

@ -506,7 +506,7 @@ Worker.prototype._bind = function _bind() {
this.on('log', function(items) {
util.log('Worker %d:', self.id);
util.log.apply(utils, items);
util.log.apply(util, items);
});
this.on('packet', function(packet) {

View File

@ -1,10 +1,10 @@
'use strict';
var BN = require('bn.js');
var bcoin = require('bcoin');
var constants = bcoin.constants;
var opcodes = constants.opcodes;
var utils = bcoin.utils;
var BN = require('bn.js');
var util = bcoin.util;
function createGenesisBlock(options) {
var flags = options.flags;
@ -109,31 +109,31 @@ var btcd = createGenesisBlock({
nonce: 2
});
utils.log(main);
utils.log('');
utils.log(testnet);
utils.log('');
utils.log(regtest);
utils.log('');
utils.log(segnet3);
utils.log('');
utils.log(segnet4);
utils.log('');
utils.log('');
utils.log('main hash: %s', main.rhash);
utils.log('main raw: %s', main.toRaw().toString('hex'));
utils.log('');
utils.log('testnet hash: %s', testnet.rhash);
utils.log('testnet raw: %s', testnet.toRaw().toString('hex'));
utils.log('');
utils.log('regtest hash: %s', regtest.rhash);
utils.log('regtest raw: %s', regtest.toRaw().toString('hex'));
utils.log('');
utils.log('segnet3 hash: %s', segnet3.rhash);
utils.log('segnet3 raw: %s', segnet3.toRaw().toString('hex'));
utils.log('');
utils.log('segnet4 hash: %s', segnet4.rhash);
utils.log('segnet4 raw: %s', segnet4.toRaw().toString('hex'));
utils.log('');
utils.log('btcd simnet hash: %s', btcd.rhash);
utils.log('btcd simnet raw: %s', btcd.toRaw().toString('hex'));
util.log(main);
util.log('');
util.log(testnet);
util.log('');
util.log(regtest);
util.log('');
util.log(segnet3);
util.log('');
util.log(segnet4);
util.log('');
util.log('');
util.log('main hash: %s', main.rhash);
util.log('main raw: %s', main.toRaw().toString('hex'));
util.log('');
util.log('testnet hash: %s', testnet.rhash);
util.log('testnet raw: %s', testnet.toRaw().toString('hex'));
util.log('');
util.log('regtest hash: %s', regtest.rhash);
util.log('regtest raw: %s', regtest.toRaw().toString('hex'));
util.log('');
util.log('segnet3 hash: %s', segnet3.rhash);
util.log('segnet3 raw: %s', segnet3.toRaw().toString('hex'));
util.log('');
util.log('segnet4 hash: %s', segnet4.rhash);
util.log('segnet4 raw: %s', segnet4.toRaw().toString('hex'));
util.log('');
util.log('btcd simnet hash: %s', btcd.rhash);
util.log('btcd simnet raw: %s', btcd.toRaw().toString('hex'));