peer: refactor.

This commit is contained in:
Christopher Jeffrey 2016-12-16 15:03:39 -08:00
parent 4269d16fee
commit 8ccefb8e71
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 48 additions and 38 deletions

View File

@ -1147,6 +1147,8 @@ InvPacket.prototype.getSize = function getSize() {
InvPacket.prototype.toWriter = function toWriter(bw) {
var i, item;
assert(this.items.length <= 50000);
bw.writeVarint(this.items.length);
for (i = 0; i < this.items.length; i++) {
@ -1178,6 +1180,8 @@ InvPacket.prototype.fromReader = function fromReader(br) {
count = br.readVarint();
assert(count <= 50000, 'Inv item count too high.');
for (i = 0; i < count; i++)
this.items.push(InvItem.fromReader(br));

View File

@ -189,7 +189,7 @@ Peer.prototype._init = function init() {
this.parser.on('packet', co(function* (packet) {
try {
yield self._onPacket(packet);
yield self.handlePacket(packet);
} catch (e) {
self.destroy();
self.error(e);
@ -330,19 +330,16 @@ Peer.prototype.connect = function connect(port, host) {
*/
Peer.prototype.open = co(function* open() {
yield this._wait();
yield this._stallify();
yield this._bip151();
yield this._bip150();
yield this._handshake();
yield this._finalize();
yield this.initConnect();
yield this.initStall();
yield this.initBIP151();
yield this.initBIP150();
yield this.initVerack();
yield this.finalize();
if (this.destroyed)
throw new Error('Peer was destroyed.');
clearTimeout(this.connectTimeout);
this.connectTimeout = null;
// Finally we can let the pool know
// that this peer is ready to go.
this.emit('open');
@ -367,7 +364,7 @@ Peer.prototype.tryOpen = co(function* tryOpen() {
* @private
*/
Peer.prototype._wait = function _wait() {
Peer.prototype.initConnect = function initConnect() {
var self = this;
if (this.connected) {
@ -397,7 +394,7 @@ Peer.prototype._wait = function _wait() {
* @private
*/
Peer.prototype._stallify = function _stallify() {
Peer.prototype.initStall = function initStall() {
var self = this;
assert(!this.stallTimer);
this.stallTimer = setInterval(function() {
@ -412,7 +409,7 @@ Peer.prototype._stallify = function _stallify() {
* @private
*/
Peer.prototype._bip151 = co(function* _bip151() {
Peer.prototype.initBIP151 = co(function* initBIP151() {
// Send encinit. Wait for handshake to complete.
if (!this.bip151)
return;
@ -442,7 +439,7 @@ Peer.prototype._bip151 = co(function* _bip151() {
* @private
*/
Peer.prototype._bip150 = co(function* _bip150() {
Peer.prototype.initBIP150 = co(function* initBIP150() {
if (!this.bip151 || !this.bip150)
return;
@ -475,7 +472,7 @@ Peer.prototype._bip150 = co(function* _bip150() {
* @private
*/
Peer.prototype._handshake = co(function* _handshake() {
Peer.prototype.initVerack = co(function* initVerack() {
// Say hello.
this.sendVersion();
@ -509,7 +506,7 @@ Peer.prototype._handshake = co(function* _handshake() {
* @private
*/
Peer.prototype._finalize = co(function* _finalize() {
Peer.prototype.finalize = co(function* finalize() {
var self = this;
// Setup the ping interval.
@ -591,7 +588,7 @@ Peer.prototype.announceBlock = function announceBlock(blocks) {
// they're using compact block mode 1.
if (this.compactMode && this.compactMode.mode === 1) {
this.invFilter.add(block.hash());
this._sendCompactBlock(block, this.compactWitness);
this.sendCompactBlock(block, this.compactWitness);
continue;
}
@ -1083,24 +1080,29 @@ Peer.prototype.getData = function getData(items) {
* @param {Packet} packet
*/
Peer.prototype._onPacket = co(function* onPacket(packet) {
Peer.prototype.handlePacket = co(function* handlePacket(packet) {
var unlock;
// We stop reads and lock the peer for any
// packet with significant IO/asynchronocity.
switch (packet.type) {
case packetTypes.VERSION:
case packetTypes.CMPCTBLOCK:
// These can't have locks or stop reads.
return yield this.__onPacket(packet);
default:
case packetTypes.GETDATA:
case packetTypes.GETBLOCKS:
case packetTypes.GETHEADERS:
case packetTypes.GETUTXOS:
unlock = yield this.locker.lock();
this.socket.pause();
try {
return yield this.__onPacket(packet);
return yield this.onPacket(packet);
} finally {
this.socket.resume();
unlock();
}
break;
default:
return yield this.onPacket(packet);
}
});
@ -1110,7 +1112,7 @@ Peer.prototype._onPacket = co(function* onPacket(packet) {
* @param {Packet} packet
*/
Peer.prototype.__onPacket = co(function* onPacket(packet) {
Peer.prototype.onPacket = co(function* onPacket(packet) {
this.lastRecv = util.ms();
if (this.destroyed)
@ -1611,11 +1613,12 @@ Peer.prototype.handleMempool = co(function* handleMempool(packet) {
/**
* Get a block/tx from the broadcast map.
* @private
* @param {InvItem} item
* @returns {Promise}
*/
Peer.prototype._getBroadcasted = function _getBroadcasted(item) {
Peer.prototype.getBroadcasted = function getBroadcasted(item) {
var entry = this.pool.invMap[item.hash];
if (!entry)
@ -1646,12 +1649,13 @@ Peer.prototype._getBroadcasted = function _getBroadcasted(item) {
/**
* Get a block/tx either from the broadcast map, mempool, or blockchain.
* @private
* @param {InvItem} item
* @returns {Promise}
*/
Peer.prototype._getItem = co(function* _getItem(item) {
var entry = this._getBroadcasted(item);
Peer.prototype.getItem = co(function* getItem(item) {
var entry = this.getBroadcasted(item);
if (entry)
return entry;
@ -1676,12 +1680,13 @@ Peer.prototype._getItem = co(function* _getItem(item) {
/**
* Send a block from the broadcast list or chain.
* @private
* @param {InvItem} item
* @returns {Boolean}
*/
Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) {
var block = this._getBroadcasted(item);
Peer.prototype.sendBlock = co(function* sendBlock(item, witness) {
var block = this.getBroadcasted(item);
// Check for a broadcasted item first.
if (block) {
@ -1720,12 +1725,13 @@ Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) {
/**
* Send a compact block.
* @private
* @param {Block} block
* @param {Boolean} witness
* @returns {Boolean}
*/
Peer.prototype._sendCompactBlock = function _sendCompactBlock(block, witness) {
Peer.prototype.sendCompactBlock = function sendCompactBlock(block, witness) {
// Try again with a new nonce
// if we get a siphash collision.
for (;;) {
@ -1761,7 +1767,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) {
item = items[i];
if (item.isTX()) {
tx = yield this._getItem(item);
tx = yield this.getItem(item);
if (!tx) {
notFound.push(item);
@ -1788,7 +1794,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) {
switch (item.type) {
case constants.inv.BLOCK:
case constants.inv.WITNESS_BLOCK:
result = yield this._sendBlock(item, item.hasWitness());
result = yield this.sendBlock(item, item.hasWitness());
if (!result) {
notFound.push(item);
continue;
@ -1802,7 +1808,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) {
continue;
}
block = yield this._getItem(item);
block = yield this.getItem(item);
if (!block) {
notFound.push(item);
@ -1826,7 +1832,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) {
// Fallback to full block.
height = yield this.chain.db.getHeight(item.hash);
if (height < this.chain.tip.height - 10) {
result = yield this._sendBlock(item, this.compactWitness);
result = yield this.sendBlock(item, this.compactWitness);
if (!result) {
notFound.push(item);
continue;
@ -1835,14 +1841,14 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) {
break;
}
block = yield this._getItem(item);
block = yield this.getItem(item);
if (!block) {
notFound.push(item);
continue;
}
yield this._sendCompactBlock(block, this.compactWitness);
yield this.sendCompactBlock(block, this.compactWitness);
blocks++;
@ -2387,7 +2393,7 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) {
item = new InvItem(constants.inv.BLOCK, req.hash);
block = yield this._getItem(item);
block = yield this.getItem(item);
if (!block) {
this.logger.debug(