diff --git a/bin/node b/bin/node index b8050722..8a96f26e 100755 --- a/bin/node +++ b/bin/node @@ -5,7 +5,6 @@ process.title = 'bcoin'; var bcoin = require('../'); -var utils = bcoin.utils; var assert = require('assert'); var options = bcoin.config({ diff --git a/bin/spvnode b/bin/spvnode index a10abc42..e68e3802 100755 --- a/bin/spvnode +++ b/bin/spvnode @@ -5,7 +5,7 @@ process.title = 'bcoin'; var bcoin = require('../'); -var utils = bcoin.utils; +var util = bcoin.util; var assert = require('assert'); var options = bcoin.config({ @@ -38,7 +38,7 @@ node.open().then(function() { node.on('block', function(block) { assert(block.txs.length >= 1); if (block.txs.length > 1) - utils.log(block.txs[1]); + util.log(block.txs[1]); }); } diff --git a/lib/net/bip151.js b/lib/net/bip151.js index a43c4c3c..3a3fd938 100644 --- a/lib/net/bip151.js +++ b/lib/net/bip151.js @@ -330,7 +330,7 @@ util.inherits(BIP151, EventEmitter); */ BIP151.prototype.error = function error() { - var msg = util.fmt.apply(utils, arguments); + var msg = util.fmt.apply(util, arguments); this.emit('error', new Error(msg)); }; diff --git a/lib/net/parser.js b/lib/net/parser.js index 2b98dea6..5c8473a0 100644 --- a/lib/net/parser.js +++ b/lib/net/parser.js @@ -76,7 +76,7 @@ Parser.prototype._init = function _init(str) { */ Parser.prototype.error = function error() { - var msg = util.fmt.apply(utils, arguments); + var msg = util.fmt.apply(util, arguments); this.emit('error', new Error(msg)); }; diff --git a/lib/net/peer.js b/lib/net/peer.js index 48f8cd18..7198acdd 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -114,6 +114,9 @@ function Peer(pool, addr, socket) { this.bip150 = null; this.lastSend = 0; this.lastRecv = 0; + this.drainStart = 0; + this.drainSize = 0; + this.drainQueue = []; this.challenge = null; this.lastPong = -1; @@ -209,7 +212,14 @@ Peer.prototype._init = function init() { self.error('socket hangup'); }); + this.socket.on('drain', function() { + self.finishDrain(); + }); + this.socket.on('data', function(chunk) { + if (self.maybeStall()) + return; + self.parser.feed(chunk); }); @@ -718,6 +728,8 @@ Peer.prototype.destroy = function destroy() { if (this.destroyed) return; + this.finishDrain(new Error('Destroyed.')); + this.destroyed = true; this.connected = false; @@ -767,22 +779,81 @@ Peer.prototype.destroy = function destroy() { */ Peer.prototype.write = function write(data) { - var self = this; - if (this.destroyed) return Promise.resolve(); this.lastSend = util.ms(); - if (this.socket.write(data) === false) { - return new Promise(function(resolve, reject) { - self.socket.once('drain', resolve); - }); - } + if (this.socket.write(data) === false) + return this.onDrain(data.length); return Promise.resolve(); }; +/** + * Wait for drain. + * @private + * @param {Function} resolve + * @param {Function} reject + */ + +Peer.prototype.onDrain = function onDrain(size) { + var self = this; + + if (this.maybeStall()) + return Promise.resolve(); + + this.drainStart = util.now(); + this.drainSize += size; + + if (this.drainSize >= (10 << 20)) { + this.logger.warning( + 'Peer is not reading: %s buffered (%s).', + util.mb(this.drainSize), + this.hostname); + this.error('Peer stalled.'); + return Promise.reject(new Error('Peer stalled.')); + } + + return new Promise(function(resolve, reject) { + self.drainQueue.push(co.wrap(resolve, reject)); + }); +}; + +/** + * Potentially timeout peer if it hasn't read. + * @private + */ + +Peer.prototype.maybeStall = function maybeStall() { + if (this.drainQueue.length === 0) + return false; + + if (util.now() < this.drainStart + 10) + return false; + + this.finishDrain(new Error('Peer stalled.')); + this.error('Peer stalled.'); + + return true; +}; + +/** + * Notify drainers of the latest drain. + * @private + */ + +Peer.prototype.finishDrain = function finishDrain(err) { + var jobs = this.drainQueue.slice(); + var i; + + this.drainQueue.length = 0; + this.drainSize = 0; + + for (i = 0; i < jobs.length; i++) + jobs[i](err); +}; + /** * Send a packet. * @param {Packet} packet @@ -839,7 +910,7 @@ Peer.prototype.error = function error(err, keep) { if (typeof args[args.length - 1] === 'boolean') keep = args.pop(); - msg = util.fmt.apply(utils, args); + msg = util.fmt.apply(util, args); err = new Error(msg); } @@ -2619,6 +2690,11 @@ function compare(a, b) { return a.id - b.id; } +function Drainer(resolve, reject) { + this.resolve = resolve; + this.reject = reject; +} + /* * Expose */ diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index a79ad4db..29557f8f 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -506,7 +506,7 @@ Worker.prototype._bind = function _bind() { this.on('log', function(items) { util.log('Worker %d:', self.id); - util.log.apply(utils, items); + util.log.apply(util, items); }); this.on('packet', function(packet) { diff --git a/scripts/gen.js b/scripts/gen.js index 2ec4d033..2db7c30b 100644 --- a/scripts/gen.js +++ b/scripts/gen.js @@ -1,10 +1,10 @@ 'use strict'; +var BN = require('bn.js'); var bcoin = require('bcoin'); var constants = bcoin.constants; var opcodes = constants.opcodes; -var utils = bcoin.utils; -var BN = require('bn.js'); +var util = bcoin.util; function createGenesisBlock(options) { var flags = options.flags; @@ -109,31 +109,31 @@ var btcd = createGenesisBlock({ nonce: 2 }); -utils.log(main); -utils.log(''); -utils.log(testnet); -utils.log(''); -utils.log(regtest); -utils.log(''); -utils.log(segnet3); -utils.log(''); -utils.log(segnet4); -utils.log(''); -utils.log(''); -utils.log('main hash: %s', main.rhash); -utils.log('main raw: %s', main.toRaw().toString('hex')); -utils.log(''); -utils.log('testnet hash: %s', testnet.rhash); -utils.log('testnet raw: %s', testnet.toRaw().toString('hex')); -utils.log(''); -utils.log('regtest hash: %s', regtest.rhash); -utils.log('regtest raw: %s', regtest.toRaw().toString('hex')); -utils.log(''); -utils.log('segnet3 hash: %s', segnet3.rhash); -utils.log('segnet3 raw: %s', segnet3.toRaw().toString('hex')); -utils.log(''); -utils.log('segnet4 hash: %s', segnet4.rhash); -utils.log('segnet4 raw: %s', segnet4.toRaw().toString('hex')); -utils.log(''); -utils.log('btcd simnet hash: %s', btcd.rhash); -utils.log('btcd simnet raw: %s', btcd.toRaw().toString('hex')); +util.log(main); +util.log(''); +util.log(testnet); +util.log(''); +util.log(regtest); +util.log(''); +util.log(segnet3); +util.log(''); +util.log(segnet4); +util.log(''); +util.log(''); +util.log('main hash: %s', main.rhash); +util.log('main raw: %s', main.toRaw().toString('hex')); +util.log(''); +util.log('testnet hash: %s', testnet.rhash); +util.log('testnet raw: %s', testnet.toRaw().toString('hex')); +util.log(''); +util.log('regtest hash: %s', regtest.rhash); +util.log('regtest raw: %s', regtest.toRaw().toString('hex')); +util.log(''); +util.log('segnet3 hash: %s', segnet3.rhash); +util.log('segnet3 raw: %s', segnet3.toRaw().toString('hex')); +util.log(''); +util.log('segnet4 hash: %s', segnet4.rhash); +util.log('segnet4 raw: %s', segnet4.toRaw().toString('hex')); +util.log(''); +util.log('btcd simnet hash: %s', btcd.rhash); +util.log('btcd simnet raw: %s', btcd.toRaw().toString('hex'));