net: always pause socket when handling a packet.

This commit is contained in:
Christopher Jeffrey 2016-09-29 21:51:07 -07:00
parent 9aaf5ea2a0
commit 37de8bf2a7
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
6 changed files with 183 additions and 69 deletions

View File

@ -1480,6 +1480,25 @@ ChainDB.prototype.getBlock = co(function* getBlock(hash) {
return block;
});
/**
* Retrieve a block from the database (not filled with coins).
* @param {Hash} hash
* @returns {Promise} - Returns {@link Block}.
*/
ChainDB.prototype.getRawBlock = co(function* getRawBlock(hash) {
var items = yield this.getBoth(hash);
var height;
if (!items)
return;
hash = items[0];
height = items[1];
return yield this.db.get(layout.b(hash));
});
/**
* Check whether coins are still unspent. Necessary for bip30.
* @see https://bitcointalk.org/index.php?topic=67738.0

View File

@ -775,7 +775,7 @@ Peer.prototype.write = function write(data) {
*/
Peer.prototype.send = function send(packet) {
var tx, checksum, payload;
var tx, checksum;
// Used cached hashes as the
// packet checksum for speed.
@ -789,8 +789,17 @@ Peer.prototype.send = function send(packet) {
}
}
payload = this.framer.packet(packet.cmd, packet.toRaw(), checksum);
return this.sendRaw(packet.cmd, packet.toRaw(), checksum);
};
/**
* Send a packet.
* @param {Packet} packet
* @returns {Promise}
*/
Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) {
var payload = this.framer.packet(cmd, body, checksum);
return this.write(payload);
};
@ -1334,39 +1343,34 @@ Peer.prototype._handleGetBlocks = co(function* _handleGetBlocks(packet) {
Peer.prototype._handleVersion = co(function* _handleVersion(version) {
if (!this.network.selfConnect) {
if (version.nonce.cmp(this.pool.localNonce) === 0) {
this.error('We connected to ourself. Oops.');
this.ignore();
return;
throw new Error('We connected to ourself. Oops.');
}
}
if (version.version < constants.MIN_VERSION) {
this.error('Peer does not support required protocol version.');
this.ignore();
return;
throw new Error('Peer does not support required protocol version.');
}
if (this.outbound) {
if (!version.hasNetwork()) {
this.error('Peer does not support network services.');
this.ignore();
return;
throw new Error('Peer does not support network services.');
}
}
if (this.options.headers) {
if (!version.hasHeaders()) {
this.error('Peer does not support getheaders.');
this.ignore();
return;
throw new Error('Peer does not support getheaders.');
}
}
if (this.options.spv) {
if (!version.hasBloom()) {
this.error('Peer does not support BIP37.');
this.ignore();
return;
throw new Error('Peer does not support BIP37.');
}
}
@ -1375,17 +1379,15 @@ Peer.prototype._handleVersion = co(function* _handleVersion(version) {
if (!this.haveWitness) {
if (!this.network.oldWitness) {
this.error('Peer does not support segregated witness.');
this.ignore();
return;
throw new Error('Peer does not support segregated witness.');
}
try {
yield this.request('havewitness');
} catch (err) {
this.error('Peer does not support segregated witness.');
this.ignore();
return;
throw new Error('Peer does not support segregated witness.');
}
this.haveWitness = true;
@ -1446,31 +1448,48 @@ Peer.prototype._handleMempool = function _handleMempool(packet) {
* [Error, {@link Block}|{@link MempoolEntry}].
*/
Peer.prototype._getItem = co(function* _getItem(item) {
Peer.prototype._getBroadcasted = function _getBroadcasted(item) {
var entry = this.pool.invMap[item.hash];
if (entry) {
this.logger.debug(
'Peer requested %s %s as a %s packet (%s).',
entry.type === constants.inv.TX ? 'tx' : 'block',
utils.revHex(entry.hash),
item.hasWitness() ? 'witness' : 'normal',
this.hostname);
if (!entry)
return;
entry.ack(this);
this.logger.debug(
'Peer requested %s %s as a %s packet (%s).',
entry.type === constants.inv.TX ? 'tx' : 'block',
utils.revHex(entry.hash),
item.hasWitness() ? 'witness' : 'normal',
this.hostname);
if (entry.msg) {
if (item.isTX()) {
if (entry.type === constants.inv.TX)
return entry.msg;
} else {
if (entry.type === constants.inv.BLOCK)
return entry.msg;
}
entry.ack(this);
if (!entry.msg)
return;
if (item.isTX()) {
if (entry.type !== constants.inv.TX)
return;
} else {
if (entry.type !== constants.inv.BLOCK)
return;
}
}
return entry.msg;
};
/**
* Get a block/tx either from the broadcast map, mempool, or blockchain.
* @param {InvItem} item
* @returns {Promise}
* [Error, {@link Block}|{@link MempoolEntry}].
*/
Peer.prototype._getItem = co(function* _getItem(item) {
var entry = this._getBroadcasted(item);
if (entry)
return entry;
if (this.options.selfish)
return;
@ -1486,7 +1505,47 @@ Peer.prototype._getItem = co(function* _getItem(item) {
if (this.chain.db.options.prune)
return;
return yield this.chain.db.getBlock(item.hash);
return yield this.chain.db.getRawBlock(item.hash);
});
/**
* Get a block/tx either from the broadcast map, mempool, or blockchain.
* @param {InvItem} item
* @returns {Promise}
* [Error, {@link Block}|{@link MempoolEntry}].
*/
Peer.prototype._sendBlock = co(function* _sendBlock(item) {
var block = this._getBroadcasted(item);
if (block) {
yield this.send(new packets.BlockPacket(block, item.hasWitness()));
return true;
}
if (this.options.selfish
|| this.chain.db.options.spv
|| this.chain.db.options.prune) {
return false;
}
if (item.hasWitness() === !!this.options.witness) {
block = yield this.chain.db.getRawBlock(item.hash);
if (!block)
return false;
yield this.sendRaw('block', block);
} else {
block = yield this.chain.db.getBlock(item.hash);
if (!block)
return false;
yield this.send(new packets.BlockPacket(block, item.hasWitness()));
}
return true;
});
/**
@ -1498,24 +1557,21 @@ Peer.prototype._getItem = co(function* _getItem(item) {
Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
var notFound = [];
var items = packet.items;
var i, j, item, entry, tx, block;
var i, j, item, tx, block, result;
if (items.length > 50000) {
this.error('getdata size too large (%s).', items.length);
return;
}
if (items.length > 50000)
throw new Error('getdata size too large (' + items.length + ').');
for (i = 0; i < items.length; i++) {
item = items[i];
entry = yield this._getItem(item);
if (!entry) {
notFound.push(item);
continue;
}
if (item.isTX()) {
tx = entry;
tx = yield this._getItem(item);
if (!tx) {
notFound.push(item);
continue;
}
// Coinbases are an insta-ban from any node.
// This should technically never happen, but
@ -1532,12 +1588,14 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
continue;
}
block = entry;
switch (item.type) {
case constants.inv.BLOCK:
case constants.inv.WITNESS_BLOCK:
yield this.send(new packets.BlockPacket(block, item.hasWitness()));
result = yield this._sendBlock(item);
if (!result) {
notFound.push(item);
continue;
}
break;
case constants.inv.FILTERED_BLOCK:
case constants.inv.WITNESS_FILTERED_BLOCK:
@ -1546,6 +1604,13 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
continue;
}
block = yield this._getItem(item);
if (!block) {
notFound.push(item);
continue;
}
block = block.toMerkle(this.spvFilter);
yield this.send(new packets.MerkleBlockPacket(block));
@ -1559,10 +1624,21 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) {
case constants.inv.CMPCT_BLOCK:
// Fallback to full block.
if (block.height < this.chain.tip.height - 10) {
yield this.send(new packets.BlockPacket(block, false));
result = yield this._sendBlock(item);
if (!result) {
notFound.push(item);
continue;
}
break;
}
block = yield this._getItem(item);
if (!block) {
notFound.push(item);
continue;
}
// Try again with a new nonce
// if we get a siphash collision.
for (;;) {
@ -2213,7 +2289,7 @@ Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) {
Peer.prototype.sendMempool = function sendMempool() {
if (!this.version)
return;
return Promise.resolve(null);
if (!this.version.hasBloom()) {
this.logger.debug(
@ -2372,20 +2448,20 @@ Peer.prototype.sync = function sync() {
var tip;
if (!this.pool.syncing)
return;
return Promise.resolve(null);
if (!this.ack)
return;
return Promise.resolve(null);
if (this.syncSent)
return;
return Promise.resolve(null);
if (!this.version.hasNetwork())
return;
return Promise.resolve(null);
if (!this.isLoader()) {
if (!this.chain.synced)
return;
return Promise.resolve(null);
}
// Ask for the mempool if we're synced.

View File

@ -18,6 +18,8 @@ function ProxySocket(uri) {
this.socket = new IOClient(uri, { reconnection: false });
this.sendBuffer = [];
this.recvBuffer = [];
this.paused = false;
this.snonce = null;
this.bytesWritten = 0;
this.bytesRead = 0;
@ -60,6 +62,10 @@ ProxySocket.prototype._init = function _init() {
this.socket.on('tcp data', function(data) {
data = new Buffer(data, 'hex');
if (self.paused) {
self.recvBuffer.push(data);
return;
}
self.bytesRead += data.length;
self.emit('data', data);
});
@ -149,6 +155,27 @@ ProxySocket.prototype.write = function write(data, callback) {
return true;
};
ProxySocket.prototype.pause = function pause() {
this.paused = true;
this.socket.emit('tcp pause');
};
ProxySocket.prototype.resume = function resume() {
var recv = this.recvBuffer;
var i, data;
this.paused = false;
this.recvBuffer = [];
for (i = 0; i < recv.length; i++) {
data = recv[i];
this.bytesRead += data.length;
this.emit('data', data);
}
this.socket.emit('tcp resume');
};
ProxySocket.prototype.destroy = function destroy() {
if (this.closed)
return;

View File

@ -86,7 +86,7 @@ Locker.prototype.lock = function lock(arg1, arg2) {
if (force) {
assert(this.busy);
return new Promise(nop);
return Promise.resolve(null);
}
if (this.busy) {
@ -220,7 +220,7 @@ MappedLock.prototype.lock = function lock(key, force) {
if (force || key == null) {
assert(key == null || this.busy[key]);
return new Promise(nop);
return Promise.resolve(null);
}
if (this.busy[key]) {
@ -268,14 +268,6 @@ MappedLock.prototype.unlock = function unlock(key) {
};
};
/*
* Helpers
*/
function nop(resolve, reject) {
resolve(utils.nop);
}
/*
* Expose
*/

View File

@ -324,4 +324,4 @@ exports.call = call;
exports.promisify = promisify;
exports.every = every;
module.exports = spawn;
module.exports = exports;

View File

@ -1042,7 +1042,7 @@ Wallet.prototype.send = co(function* send(options) {
Wallet.prototype._send = co(function* send(options) {
var tx = yield this.createTX(options, true);
yield this.sign(tx);
yield this.sign(tx, options);
if (!tx.isSigned())
throw new Error('TX could not be fully signed.');