net: refactor sending and announcing.

This commit is contained in:
Christopher Jeffrey 2016-12-16 04:10:02 -08:00
parent 69a9b5873f
commit dc58c99ba2
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 406 additions and 414 deletions

File diff suppressed because it is too large Load Diff

View File

@ -154,6 +154,9 @@ function Pool(options) {
this.timeout = null;
this.interval = null;
this._onTimeout = this.onTimeout.bind(this);
this._onInterval = this.onInterval.bind(this);
this._initOptions();
this._init();
};
@ -372,7 +375,7 @@ Pool.prototype.connect = function connect() {
if (!this.options.selfish && !this.options.spv) {
if (this.mempool) {
this.mempool.on('tx', function(tx) {
self.announce(tx);
self.announceTX(tx);
});
}
@ -385,7 +388,7 @@ Pool.prototype.connect = function connect() {
this.chain.on('block', function(block) {
if (!self.chain.synced)
return;
self.announce(block);
self.announceBlock(block);
});
}
@ -503,24 +506,8 @@ Pool.prototype._handleLeech = function _handleLeech(socket) {
*/
Pool.prototype.startTimeout = function startTimeout() {
var self = this;
function destroy() {
if (!self.syncing)
return;
if (self.chain.synced)
return;
if (self.peers.load) {
self.peers.load.destroy();
self.logger.debug('Timer ran out. Finding new loader peer.');
}
}
this.stopTimeout();
this.timeout = setTimeout(destroy, this.loadTimeout);
this.timeout = setTimeout(this._onTimeout, this.loadTimeout);
};
/**
@ -535,6 +522,24 @@ Pool.prototype.stopTimeout = function stopTimeout() {
}
};
/**
* Potentially kill a stalling peer.
* @private
*/
Pool.prototype.onTimeout = function onTimeout() {
if (!this.syncing)
return;
if (this.chain.synced)
return;
if (this.peers.load) {
this.peers.load.destroy();
this.logger.debug('Timer ran out. Finding new loader peer.');
}
};
/**
* Start the stall interval (shorter than the
* stall timeout, inteded to give warnings and
@ -544,27 +549,8 @@ Pool.prototype.stopTimeout = function stopTimeout() {
*/
Pool.prototype.startInterval = function startInterval() {
var self = this;
function load() {
var peer = self.peers.load;
var hostname = peer ? peer.hostname : null;
if (!self.syncing)
return;
if (self.chain.synced)
return self.stopInterval();
if (self.chain.isBusy())
return self.startTimeout();
self.logger.warning('Loader peer is stalling (%s).', hostname);
}
this.stopInterval();
this.interval = setInterval(load, this.loadTimeout / 6 | 0);
this.interval = setInterval(this._onInterval, this.loadTimeout / 6 | 0);
};
/**
@ -579,6 +565,27 @@ Pool.prototype.stopInterval = function stopInterval() {
}
};
/**
* Warn that the loader peer is stalling.
* @private
*/
Pool.prototype.onInterval = function onInterval() {
var peer = this.peers.load;
var hostname = peer ? peer.hostname : null;
if (!this.syncing)
return;
if (this.chain.synced)
return this.stopInterval();
if (this.chain.isBusy())
return this.startTimeout();
this.logger.warning('Loader peer is stalling (%s).', hostname);
};
/**
* Add a loader peer. Necessary for
* a sync to even begin.
@ -645,7 +652,7 @@ Pool.prototype.setLoader = function setLoader(peer) {
this.peers.repurpose(peer);
this.fillPeers();
peer.trySync();
peer.sync();
util.nextTick(function() {
self.emit('loader', peer);
@ -683,7 +690,7 @@ Pool.prototype.sync = function sync() {
for (peer = this.peers.head(); peer; peer = peer.next) {
if (!peer.outbound || peer.pending)
continue;
peer.trySync();
peer.sync();
}
};
@ -699,7 +706,7 @@ Pool.prototype.forceSync = function forceSync() {
if (!peer.outbound || peer.pending)
continue;
peer.syncSent = false;
peer.trySync();
peer.sync();
}
};
@ -1753,20 +1760,13 @@ Pool.prototype.fulfill = function fulfill(data) {
/**
* Broadcast a transaction or block.
* @param {TX|Block|InvItem} msg
* @param {TX|Block} msg
* @returns {Promise}
* or timeout.
* @returns {BroadcastItem}
*/
Pool.prototype.broadcast = function broadcast(msg) {
var hash = msg.hash;
var item;
if (msg.toInv)
hash = msg.toInv().hash;
item = this.invMap[hash];
var hash = msg.hash('hex');
var item = this.invMap[hash];
if (item) {
item.refresh();
@ -1783,19 +1783,32 @@ Pool.prototype.broadcast = function broadcast(msg) {
};
/**
* Announce an item by sending an inv to all
* peers. This does not add it to the broadcast
* queue.
* @param {TX|Block} tx
* Announce a block to all peers.
* @param {Block} tx
*/
Pool.prototype.announce = function announce(msg) {
Pool.prototype.announceBlock = function announceBlock(msg) {
var peer;
for (peer = this.peers.head(); peer; peer = peer.next) {
if (!peer.outbound || peer.pending)
continue;
peer.tryAnnounce(msg);
peer.announceBlock(msg);
}
};
/**
* Announce a transaction to all peers.
* @param {TX} tx
*/
Pool.prototype.announceTX = function announceTX(msg) {
var peer;
for (peer = this.peers.head(); peer; peer = peer.next) {
if (!peer.outbound || peer.pending)
continue;
peer.announceTX(msg);
}
};
@ -2180,16 +2193,16 @@ HostList.prototype.add = function add(addr) {
*/
HostList.prototype.remove = function remove(addr) {
addr = this.map[addr.hostname];
var item = this.map[addr.hostname];
if (!addr)
if (!item)
return;
util.binaryRemove(this.items, addr, compare);
util.binaryRemove(this.items, item, compare);
delete this.map[addr.hostname];
delete this.map[item.hostname];
return addr;
return item;
};
/**
@ -2199,7 +2212,7 @@ HostList.prototype.remove = function remove(addr) {
HostList.prototype.ban = function ban(addr) {
this.misbehaving[addr.host] = util.now();
this.remove(addr);
return this.remove(addr);
};
/**
@ -2303,8 +2316,8 @@ function LoadRequest(pool, peer, type, hash) {
this.timeout = null;
this.onTimeout = this._onTimeout.bind(this);
this.next = null;
this.prev = null;
this.next = null;
assert(!this.pool.requestMap[this.hash]);
this.pool.requestMap[this.hash] = this;
@ -2408,39 +2421,31 @@ LoadRequest.prototype.toInv = function toInv() {
* @constructor
* @private
* @param {Pool} pool
* @param {TX|Block|InvItem} item
* @param {TX|Block} msg
* @emits BroadcastItem#ack
* @emits BroadcastItem#reject
* @emits BroadcastItem#timeout
*/
function BroadcastItem(pool, item) {
function BroadcastItem(pool, msg) {
var item;
if (!(this instanceof BroadcastItem))
return new BroadcastItem(pool, item);
return new BroadcastItem(pool, msg);
assert(!msg.mutable, 'Cannot broadcast mutable item.');
item = msg.toInv();
this.pool = pool;
this.callback = [];
this.id = this.pool.uid++;
this.msg = null;
if (item instanceof TX)
assert(!item.mutable, 'Cannot broadcast mutable TX.');
if (item.toInv) {
this.msg = item;
item = item.toInv();
}
this.msg = msg;
this.hash = item.hash;
this.type = item.type;
this.callback = [];
assert(this.type != null);
assert(typeof this.hash === 'string');
// INV does not set the witness
// mask (only GETDATA does this).
assert((this.type & constants.WITNESS_MASK) === 0);
this.prev = null;
this.next = null;
}
util.inherits(BroadcastItem, EventEmitter);
@ -2493,7 +2498,17 @@ BroadcastItem.prototype.refresh = function refresh() {
*/
BroadcastItem.prototype.announce = function announce() {
return this.pool.announce(this);
switch (this.type) {
case constants.inv.TX:
this.pool.announceTX(this.msg);
break;
case constants.inv.BLOCK:
this.pool.announceBlock(this.msg);
break;
default:
assert(false, 'Bad type.');
break;
}
};
/**
@ -2567,15 +2582,6 @@ BroadcastItem.prototype.inspect = function inspect() {
+ '>';
};
/**
* Convert broadcast item to an inv item.
* @returns {InvItem}
*/
BroadcastItem.prototype.toInv = function toInv() {
return new InvItem(this.type, this.hash);
};
/*
* Helpers
*/