This commit is contained in:
Christopher Jeffrey 2016-09-29 16:58:20 -07:00
parent 661a8f2f20
commit 9aaf5ea2a0
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 135 additions and 214 deletions

View File

@ -1639,10 +1639,14 @@ function FilterLoadPacket(filter, n, tweak, update) {
Packet.call(this);
this.filter = filter || DUMMY;
this.n = n || 0;
this.tweak = tweak || 0;
this.update = update || 0;
if (filter instanceof bcoin.bloom) {
this.fromFilter(filter);
} else {
this.filter = filter || DUMMY;
this.n = n || 0;
this.tweak = tweak || 0;
this.update = update || 0;
}
}
utils.inherits(FilterLoadPacket, Packet);

View File

@ -207,9 +207,13 @@ Peer.prototype._init = function init() {
self.parser.feed(chunk);
});
this.parser.on('packet', function(packet) {
self._onPacket(packet);
});
this.parser.on('packet', co(function *(packet) {
try {
yield self._onPacket(packet);
} catch (e) {
self.error(e);
}
}));
this.parser.on('error', function(err) {
self.error(err, true);
@ -486,7 +490,7 @@ Peer.prototype.isLoader = function isLoader() {
* @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items
*/
Peer.prototype.announce = function announce(items) {
Peer.prototype.announce = co(function* announce(items) {
var inv = [];
var headers = [];
var i, item, entry;
@ -541,18 +545,18 @@ Peer.prototype.announce = function announce(items) {
inv.push(item);
}
this.sendInv(inv);
yield this.sendInv(inv);
if (headers.length > 0)
this.sendHeaders(headers);
};
yield this.sendHeaders(headers);
});
/**
* Send inv to a peer.
* @param {InvItem[]} items
*/
Peer.prototype.sendInv = function sendInv(items) {
Peer.prototype.sendInv = co(function* sendInv(items) {
var i, chunk;
if (this.destroyed)
@ -572,16 +576,16 @@ Peer.prototype.sendInv = function sendInv(items) {
for (i = 0; i < items.length; i += 50000) {
chunk = items.slice(i, i + 50000);
this.send(new packets.InvPacket(chunk));
yield this.send(new packets.InvPacket(chunk));
}
};
});
/**
* Send headers to a peer.
* @param {Headers[]} items
*/
Peer.prototype.sendHeaders = function sendHeaders(items) {
Peer.prototype.sendHeaders = co(function* sendHeaders(items) {
var i, chunk;
if (this.destroyed)
@ -601,9 +605,9 @@ Peer.prototype.sendHeaders = function sendHeaders(items) {
for (i = 0; i < items.length; i += 2000) {
chunk = items.slice(i, i + 2000);
this.send(new packets.HeadersPacket(chunk));
yield this.send(new packets.HeadersPacket(chunk));
}
};
});
/**
* Send a `version` packet.
@ -622,7 +626,7 @@ Peer.prototype.sendVersion = function sendVersion() {
relay: this.options.relay
});
this.send(packet);
return this.send(packet);
};
/**
@ -633,20 +637,18 @@ Peer.prototype.sendPing = function sendPing() {
if (!this.version)
return;
if (this.version.version <= 60000) {
this.send(new packets.PingPacket());
return;
}
if (this.version.version <= 60000)
return this.send(new packets.PingPacket());
if (this.challenge) {
this.logger.debug('Peer has not responded to ping (%s).', this.hostname);
return;
return Promise.resolve(null);
}
this.lastPing = utils.ms();
this.challenge = utils.nonce();
this.send(new packets.PingPacket(this.challenge));
return this.send(new packets.PingPacket(this.challenge));
};
/**
@ -677,9 +679,9 @@ Peer.prototype.isWatched = function isWatched(item) {
Peer.prototype.updateWatch = function updateWatch() {
if (!this.options.spv)
return;
return Promise.resolve(null);
this.send(packets.FilterLoadPacket.fromFilter(this.pool.spvFilter));
return this.send(new packets.FilterLoadPacket(this.pool.spvFilter));
};
/**
@ -688,7 +690,7 @@ Peer.prototype.updateWatch = function updateWatch() {
*/
Peer.prototype.sendFeeRate = function sendFeeRate(rate) {
this.send(new packets.FeeFilterPacket(rate));
return this.send(new packets.FeeFilterPacket(rate));
};
/**
@ -757,9 +759,13 @@ Peer.prototype.write = function write(data) {
this.lastSend = utils.ms();
return new Promise(function(resolve, reject) {
self.socket.write(data, wrap(resolve, reject));
});
if (this.socket.write(data) === false) {
return new Promise(function(resolve, reject) {
self.socket.once('drain', resolve);
});
}
return Promise.resolve(null);
};
/**
@ -788,24 +794,6 @@ Peer.prototype.send = function send(packet) {
return this.write(payload);
};
/**
* Send a packet. Throttle reads and wait for flush.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.flush = co(function* flush(packet) {
assert(this.outgoing >= 0);
if (this.outgoing++ === 0)
this.socket.pause();
yield this.send(packet);
if (--this.outgoing === 0)
this.socket.resume();
});
/**
* Emit an error and destroy the peer.
* @private
@ -923,7 +911,7 @@ Peer.prototype.getData = function getData(items) {
data[i] = item;
}
this.send(new packets.GetDataPacket(data));
return this.send(new packets.GetDataPacket(data));
};
/**
@ -932,7 +920,26 @@ Peer.prototype.getData = function getData(items) {
* @param {Packet} packet
*/
Peer.prototype._onPacket = function onPacket(packet) {
Peer.prototype._onPacket = co(function* onPacket(packet) {
var unlock = yield this.locker.lock();
this.socket.pause();
try {
return yield this.__onPacket(packet);
} finally {
this.socket.resume();
unlock();
}
});
/**
* Handle a packet payload without a lock.
* @private
* @param {Packet} packet
*/
Peer.prototype.__onPacket = co(function* onPacket(packet) {
this.lastRecv = utils.ms();
if (this.bip151
@ -957,29 +964,29 @@ Peer.prototype._onPacket = function onPacket(packet) {
switch (packet.type) {
case packetTypes.VERSION:
return this._handleVersion(packet);
return yield this._handleVersion(packet);
case packetTypes.VERACK:
return this._handleVerack(packet);
case packetTypes.PING:
return this._handlePing(packet);
return yield this._handlePing(packet);
case packetTypes.PONG:
return this._handlePong(packet);
case packetTypes.ALERT:
return this._handleAlert(packet);
case packetTypes.GETADDR:
return this._handleGetAddr(packet);
return yield this._handleGetAddr(packet);
case packetTypes.ADDR:
return this._handleAddr(packet);
case packetTypes.INV:
return this._handleInv(packet);
case packetTypes.GETDATA:
return this._handleGetData(packet);
return yield this._handleGetData(packet);
case packetTypes.NOTFOUND:
return this._handleNotFound(packet);
case packetTypes.GETBLOCKS:
return this._handleGetBlocks(packet);
return yield this._handleGetBlocks(packet);
case packetTypes.GETHEADERS:
return this._handleGetHeaders(packet);
return yield this._handleGetHeaders(packet);
case packetTypes.HEADERS:
return this._handleHeaders(packet);
case packetTypes.SENDHEADERS:
@ -991,7 +998,7 @@ Peer.prototype._onPacket = function onPacket(packet) {
case packetTypes.REJECT:
return this._handleReject(packet);
case packetTypes.MEMPOOL:
return this._handleMempool(packet);
return yield this._handleMempool(packet);
case packetTypes.FILTERLOAD:
return this._handleFilterLoad(packet);
case packetTypes.FILTERADD:
@ -1001,7 +1008,7 @@ Peer.prototype._onPacket = function onPacket(packet) {
case packetTypes.MERKLEBLOCK:
return this._handleMerkleBlock(packet);
case packetTypes.GETUTXOS:
return this._handleGetUTXOs(packet);
return yield this._handleGetUTXOs(packet);
case packetTypes.UTXOS:
return this._handleUTXOs(packet);
case packetTypes.HAVEWITNESS:
@ -1011,28 +1018,28 @@ Peer.prototype._onPacket = function onPacket(packet) {
case packetTypes.SENDCMPCT:
return this._handleSendCmpct(packet);
case packetTypes.CMPCTBLOCK:
return this._handleCmpctBlock(packet);
return yield this._handleCmpctBlock(packet);
case packetTypes.GETBLOCKTXN:
return this._handleGetBlockTxn(packet);
return yield this._handleGetBlockTxn(packet);
case packetTypes.BLOCKTXN:
return this._handleBlockTxn(packet);
case packetTypes.ENCINIT:
return this._handleEncinit(packet);
return yield this._handleEncinit(packet);
case packetTypes.ENCACK:
return this._handleEncack(packet);
case packetTypes.AUTHCHALLENGE:
return this._handleAuthChallenge(packet);
return yield this._handleAuthChallenge(packet);
case packetTypes.AUTHREPLY:
return this._handleAuthReply(packet);
return yield this._handleAuthReply(packet);
case packetTypes.AUTHPROPOSE:
return this._handleAuthPropose(packet);
return yield this._handleAuthPropose(packet);
case packetTypes.UNKNOWN:
return this._handleUnknown(packet);
default:
assert(false, 'Bad packet type.');
break;
}
};
});
/**
* Flush merkle block once all matched
@ -1162,23 +1169,6 @@ Peer.prototype._handleUTXOs = function _handleUTXOs(utxos) {
*/
Peer.prototype._handleGetUTXOs = co(function* _handleGetUTXOs(packet) {
var unlock = yield this.locker.lock();
try {
return yield this.__handleGetUTXOs(packet);
} catch (e) {
this.emit('error', e);
} finally {
unlock();
}
});
/**
* Handle `getheaders` packet without lock.
* @private
* @param {GetHeadersPacket}
*/
Peer.prototype.__handleGetUTXOs = co(function* _handleGetUTXOs(packet) {
var i, utxos, prevout, hash, index, coin;
if (!this.chain.synced)
@ -1229,7 +1219,7 @@ Peer.prototype.__handleGetUTXOs = co(function* _handleGetUTXOs(packet) {
utxos.height = this.chain.height;
utxos.tip = this.chain.tip.hash;
this.send(utxos);
yield this.send(utxos);
});
/**
@ -1250,23 +1240,6 @@ Peer.prototype._handleHaveWitness = function _handleHaveWitness(packet) {
*/
Peer.prototype._handleGetHeaders = co(function* _handleGetHeaders(packet) {
var unlock = yield this.locker.lock();
try {
return yield this.__handleGetHeaders(packet);
} catch (e) {
this.emit('error', e);
} finally {
unlock();
}
});
/**
* Handle `getheaders` packet without lock.
* @private
* @param {GetHeadersPacket}
*/
Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) {
var headers = [];
var hash, entry;
@ -1305,7 +1278,7 @@ Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) {
entry = yield entry.getNext();
}
this.sendHeaders(headers);
yield this.sendHeaders(headers);
});
/**
@ -1315,23 +1288,6 @@ Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) {
*/
Peer.prototype._handleGetBlocks = co(function* _handleGetBlocks(packet) {
var unlock = yield this.locker.lock();
try {
return yield this.__handleGetBlocks(packet);
} catch (e) {
this.emit('error', e);
} finally {
unlock();
}
});
/**
* Handle `getblocks` packet without lock.
* @private
* @param {GetBlocksPacket}
*/
Peer.prototype.__handleGetBlocks = co(function* _handleGetBlocks(packet) {
var blocks = [];
var hash;
@ -1366,7 +1322,7 @@ Peer.prototype.__handleGetBlocks = co(function* _handleGetBlocks(packet) {
hash = yield this.chain.db.getNextHash(hash);
}
this.sendInv(blocks);
yield this.sendInv(blocks);
});
/**
@ -1439,8 +1395,9 @@ Peer.prototype._handleVersion = co(function* _handleVersion(version) {
this.relay = version.relay;
this.version = version;
this.send(new packets.VerackPacket());
this.fire('version', version);
yield this.send(new packets.VerackPacket());
});
/**
@ -1479,7 +1436,7 @@ Peer.prototype._handleMempool = function _handleMempool(packet) {
this.logger.debug('Sending mempool snapshot (%s).', this.hostname);
this.sendInv(items);
return this.sendInv(items);
};
/**
@ -1539,23 +1496,6 @@ Peer.prototype._getItem = co(function* _getItem(item) {
*/
Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
var unlock = yield this.locker.lock();
try {
return yield this.__handleGetData(packet);
} catch (e) {
this.emit('error', e);
} finally {
unlock();
}
});
/**
* Handle `getdata` packet without lock.
* @private
* @param {GetDataPacket}
*/
Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
var notFound = [];
var items = packet.items;
var i, j, item, entry, tx, block;
@ -1587,7 +1527,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
continue;
}
yield this.flush(new packets.TXPacket(tx, item.hasWitness()));
yield this.send(new packets.TXPacket(tx, item.hasWitness()));
continue;
}
@ -1597,7 +1537,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
switch (item.type) {
case constants.inv.BLOCK:
case constants.inv.WITNESS_BLOCK:
yield this.flush(new packets.BlockPacket(block, item.hasWitness()));
yield this.send(new packets.BlockPacket(block, item.hasWitness()));
break;
case constants.inv.FILTERED_BLOCK:
case constants.inv.WITNESS_FILTERED_BLOCK:
@ -1608,18 +1548,18 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
block = block.toMerkle(this.spvFilter);
yield this.flush(new packets.MerkleBlockPacket(block));
yield this.send(new packets.MerkleBlockPacket(block));
for (j = 0; j < block.txs.length; j++) {
tx = block.txs[j];
yield this.flush(new packets.TXPacket(tx, item.hasWitness()));
yield this.send(new packets.TXPacket(tx, item.hasWitness()));
}
break;
case constants.inv.CMPCT_BLOCK:
// Fallback to full block.
if (block.height < this.chain.tip.height - 10) {
yield this.flush(new packets.BlockPacket(block, false));
yield this.send(new packets.BlockPacket(block, false));
break;
}
@ -1634,7 +1574,8 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
break;
}
yield this.flush(new packets.CmpctBlockPacket(block, false));
yield this.send(new packets.CmpctBlockPacket(block, false));
break;
default:
this.logger.warning(
@ -1646,7 +1587,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
}
if (item.hash === this.hashContinue) {
this.sendInv(new InvItem(constants.inv.BLOCK, this.chain.tip.hash));
yield this.sendInv(new InvItem(constants.inv.BLOCK, this.chain.tip.hash));
this.hashContinue = null;
}
}
@ -1658,7 +1599,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) {
this.hostname);
if (notFound.length > 0)
this.send(new packets.NotFoundPacket(notFound));
yield this.send(new packets.NotFoundPacket(notFound));
});
/**
@ -1700,11 +1641,11 @@ Peer.prototype._handleAddr = function _handleAddr(packet) {
* @param {PingPacket}
*/
Peer.prototype._handlePing = function _handlePing(packet) {
if (packet.nonce)
this.send(new packets.PongPacket(packet.nonce));
Peer.prototype._handlePing = co(function* _handlePing(packet) {
this.fire('ping', this.minPing);
};
if (packet.nonce)
yield this.send(new packets.PongPacket(packet.nonce));
});
/**
* Handle `pong` packet.
@ -1751,7 +1692,7 @@ Peer.prototype._handlePong = function _handlePong(packet) {
* @param {GetAddrPacket}
*/
Peer.prototype._handleGetAddr = function _handleGetAddr(packet) {
Peer.prototype._handleGetAddr = co(function* _handleGetAddr(packet) {
var items = [];
var i, addr;
@ -1788,8 +1729,8 @@ Peer.prototype._handleGetAddr = function _handleGetAddr(packet) {
items.length,
this.hostname);
this.send(new packets.AddrPacket(items));
};
yield this.send(new packets.AddrPacket(items));
});
/**
* Handle `inv` packet.
@ -1939,21 +1880,16 @@ Peer.prototype._handleAlert = function _handleAlert(alert) {
* @param {EncinitPacket}
*/
Peer.prototype._handleEncinit = function _handleEncinit(packet) {
Peer.prototype._handleEncinit = co(function* _handleEncinit(packet) {
if (!this.bip151)
return;
try {
this.bip151.encinit(packet.publicKey, packet.cipher);
} catch (e) {
this.error(e);
return;
}
this.send(this.bip151.toEncack());
this.bip151.encinit(packet.publicKey, packet.cipher);
this.fire('encinit', packet);
};
yield this.send(this.bip151.toEncack());
});
/**
* Handle `encack` packet.
@ -1961,19 +1897,14 @@ Peer.prototype._handleEncinit = function _handleEncinit(packet) {
* @param {EncackPacket}
*/
Peer.prototype._handleEncack = function _handleEncack(packet) {
Peer.prototype._handleEncack = co(function* _handleEncack(packet) {
if (!this.bip151)
return;
try {
this.bip151.encack(packet.publicKey);
} catch (e) {
this.error(e);
return;
}
this.bip151.encack(packet.publicKey);
this.fire('encack', packet);
};
});
/**
* Handle `authchallenge` packet.
@ -1981,23 +1912,18 @@ Peer.prototype._handleEncack = function _handleEncack(packet) {
* @param {AuthChallengePacket}
*/
Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(packet) {
Peer.prototype._handleAuthChallenge = co(function* _handleAuthChallenge(packet) {
var sig;
if (!this.bip150)
return;
try {
sig = this.bip150.challenge(packet.hash);
} catch (e) {
this.error(e);
return;
}
this.send(new packets.AuthReplyPacket(sig));
sig = this.bip150.challenge(packet.hash);
this.fire('authchallenge', packet.hash);
};
yield this.send(new packets.AuthReplyPacket(sig));
});
/**
* Handle `authreply` packet.
@ -2005,24 +1931,19 @@ Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(packet) {
* @param {AuthReplyPacket}
*/
Peer.prototype._handleAuthReply = function _handleAuthReply(packet) {
Peer.prototype._handleAuthReply = co(function* _handleAuthReply(packet) {
var hash;
if (!this.bip150)
return;
try {
hash = this.bip150.reply(packet.signature);
} catch (e) {
this.error(e);
return;
}
hash = this.bip150.reply(packet.signature);
if (hash)
this.send(new packets.AuthProposePacket(hash));
yield this.send(new packets.AuthProposePacket(hash));
this.fire('authreply', packet.signature);
};
});
/**
* Handle `authpropose` packet.
@ -2030,23 +1951,18 @@ Peer.prototype._handleAuthReply = function _handleAuthReply(packet) {
* @param {AuthProposePacket}
*/
Peer.prototype._handleAuthPropose = function _handleAuthPropose(packet) {
Peer.prototype._handleAuthPropose = co(function* _handleAuthPropose(packet) {
var hash;
if (!this.bip150)
return;
try {
hash = this.bip150.propose(packet.hash);
} catch (e) {
this.error(e);
return;
}
hash = this.bip150.propose(packet.hash);
this.send(new packets.AuthChallengePacket(hash));
yield this.send(new packets.AuthChallengePacket(hash));
this.fire('authpropose', packet.hash);
};
});
/**
* Handle an unknown packet.
@ -2127,7 +2043,7 @@ Peer.prototype._handleCmpctBlock = co(function* _handleCmpctBlock(packet) {
return;
}
this.send(new packets.GetBlockTxnPacket(block.toRequest()));
yield this.send(new packets.GetBlockTxnPacket(block.toRequest()));
this.logger.debug(
'Received semi-full compact block %s (%s).',
@ -2184,7 +2100,7 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) {
res = bcoin.bip152.TXResponse.fromBlock(block, req);
yield this.flush(new packets.BlockTxnPacket(res, false));
yield this.send(new packets.BlockTxnPacket(res, false));
this.fire('blocktxn', req);
});
@ -2229,9 +2145,9 @@ Peer.prototype._handleBlockTxn = function _handleBlockTxn(packet) {
Peer.prototype.sendAlert = function sendAlert(alert) {
if (!this.invFilter.added(alert.hash()))
return;
return Promise.resolve(null);
this.send(alert);
return this.send(alert);
};
/**
@ -2260,7 +2176,7 @@ Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) {
this.logger.debug('Height: %d, Hash: %s, Stop: %s', height, hash, stop);
this.send(packet);
return this.send(packet);
};
/**
@ -2288,7 +2204,7 @@ Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) {
this.logger.debug('Height: %d, Hash: %s, Stop: %s', height, hash, stop);
this.send(packet);
return this.send(packet);
};
/**
@ -2303,14 +2219,14 @@ Peer.prototype.sendMempool = function sendMempool() {
this.logger.debug(
'Cannot request mempool for non-bloom peer (%s).',
this.hostname);
return;
return Promise.resolve(null);
}
this.logger.debug(
'Requesting inv packet from peer with mempool (%s).',
this.hostname);
this.send(new packets.MempoolPacket());
return this.send(new packets.MempoolPacket());
};
/**
@ -2335,7 +2251,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) {
'Sending reject packet to peer (%s).',
this.hostname);
this.send(reject);
return this.send(reject);
};
/**
@ -2344,7 +2260,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) {
Peer.prototype.sendCompact = function sendCompact() {
this.logger.info('Initializing compact blocks (%s).', this.hostname);
this.send(new packets.SendCmpctPacket(0, 1));
return this.send(new packets.SendCmpctPacket(0, 1));
};
/**
@ -2392,9 +2308,10 @@ Peer.prototype.ignore = function ignore() {
*/
Peer.prototype.reject = function reject(obj, code, reason, score) {
this.sendReject(code, reason, obj);
var promise = this.sendReject(code, reason, obj);
if (score > 0)
this.setMisbehavior(score);
return promise;
};
/**
@ -2419,7 +2336,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) {
return;
}
this.sendGetBlocks(locator, root);
yield this.sendGetBlocks(locator, root);
});
/**
@ -2431,7 +2348,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) {
Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) {
var locator = yield this.chain.getLocator(tip);
this.sendGetHeaders(locator, stop);
return this.sendGetHeaders(locator, stop);
});
/**
@ -2443,7 +2360,7 @@ Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) {
Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) {
var locator = yield this.chain.getLocator(tip);
this.sendGetBlocks(locator, stop);
return this.sendGetBlocks(locator, stop);
});
/**
@ -2486,7 +2403,7 @@ Peer.prototype.sync = function sync() {
return this.getHeaders(tip);
}
this.getBlocks();
return this.getBlocks();
};
/**