net: better locks.

This commit is contained in:
Christopher Jeffrey 2017-01-19 01:18:57 -08:00
parent 05353e0e1a
commit 9c352c5d2b
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 224 additions and 301 deletions

View File

@ -1117,25 +1117,6 @@ Chain.prototype._resetTime = co(function* resetTime(ts) {
yield this._reset(entry.height, false);
});
/**
* Wait for the chain to drain (finish processing
* all of the blocks in its queue).
* @returns {Promise}
*/
Chain.prototype.onDrain = function onDrain() {
return this.locker.wait();
};
/**
* Test whether the chain is in the process of adding blocks.
* @returns {Boolean}
*/
Chain.prototype.isBusy = function isBusy() {
return this.locker.isBusy();
};
/**
* Add a block to the chain, perform all necessary verification.
* @param {Block} block

View File

@ -136,9 +136,8 @@ function Peer(pool) {
this.invFilter = new Bloom.Rolling(50000, 0.000001);
this.requestMap = new Map();
this.blockQueue = [];
this.compactBlocks = new Map();
this.responseMap = new Map();
this.compactBlocks = new Map();
if (this.options.bip151) {
this.bip151 = new BIP151();
@ -263,7 +262,7 @@ Peer.prototype._init = function init() {
this.parser.on('packet', co(function* (packet) {
try {
yield self.handlePacket(packet);
yield self.readPacket(packet);
} catch (e) {
self.error(e);
self.destroy();
@ -1402,28 +1401,14 @@ Peer.prototype.getTX = function getTX(hashes) {
* @param {Packet} packet
*/
Peer.prototype.handlePacket = co(function* handlePacket(packet) {
var unlock;
// We stop reads and lock the peer for any
// packet with significant IO/asynchronocity.
switch (packet.type) {
case packetTypes.GETDATA:
case packetTypes.GETBLOCKS:
case packetTypes.GETHEADERS:
case packetTypes.GETUTXOS:
case packetTypes.GETBLOCKTXN:
unlock = yield this.locker.lock();
try {
this.socket.pause();
return yield this.onPacket(packet);
} finally {
this.socket.resume();
unlock();
}
break;
default:
return yield this.onPacket(packet);
Peer.prototype.readPacket = co(function* readPacket(packet) {
var unlock = yield this.locker.lock();
try {
this.socket.pause();
return yield this.handlePacket(packet);
} finally {
this.socket.resume();
unlock();
}
});
@ -1433,7 +1418,7 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) {
* @param {Packet} packet
*/
Peer.prototype.onPacket = co(function* onPacket(packet) {
Peer.prototype.handlePacket = co(function* handlePacket(packet) {
var entry;
if (this.destroyed)
@ -1573,19 +1558,78 @@ Peer.prototype.onPacket = co(function* onPacket(packet) {
entry.resolve(packet);
});
/**
* Emit a block.
* @param {Block} block
* @returns {Promise}
*/
Peer.prototype.emitBlock = co(function* emitBlock(block) {
this.emit('block', block);
yield this.pool.handleBlock(this, block);
});
/**
* Emit a block inv.
* @param {Hash[]} blocks
* @returns {Promise}
*/
Peer.prototype.emitBlockInv = co(function* emitBlockInv(blocks) {
this.emit('blocks', blocks);
yield this.pool.handleBlockInv(this, blocks);
});
/**
* Emit headers.
* @param {Header[]} headers
* @returns {Promise}
*/
Peer.prototype.emitHeaders = co(function* emitHeaders(headers) {
this.emit('headers', headers);
yield this.pool.handleHeaders(this, headers);
});
/**
* Flush merkle block once all matched
* txs have been received.
* @private
* @returns {Promise}
*/
Peer.prototype.flushMerkle = function flushMerkle() {
Peer.prototype.flushMerkle = co(function* flushMerkle() {
assert(this.lastMerkle);
this.lastBlock = util.ms();
this.emit('merkleblock', this.lastMerkle);
yield this.emitBlock(this.lastMerkle);
this.lastMerkle = null;
this.waitingTX = 0;
};
});
/**
* Emit a transaction.
* @param {TX} tx
* @returns {Promise}
*/
Peer.prototype.emitTX = co(function* emitTX(tx) {
this.emit('tx', tx);
yield this.pool.handleTX(this, tx);
});
/**
* Emit transaction inv.
* @param {Hash[]} txs
* @returns {Promise}
*/
Peer.prototype.emitTXInv = co(function* emitTXInv(txs) {
this.emit('txs', txs);
yield this.pool.handleTXInv(this, txs);
});
/**
* Handle `filterload` packet.
@ -1675,7 +1719,7 @@ Peer.prototype.handleMerkleBlock = co(function* handleMerkleBlock(packet) {
this.waitingTX = block.matches.length;
if (this.waitingTX === 0)
this.flushMerkle();
yield this.flushMerkle();
});
/**
@ -2400,10 +2444,10 @@ Peer.prototype.handleInv = co(function* handleInv(packet) {
this.emit('inv', items);
if (blocks.length > 0)
this.emit('blocks', blocks);
yield this.emitBlockInv(blocks);
if (txs.length > 0)
this.emit('txs', txs);
yield this.emitTXInv(txs);
this.logger.debug(
'Received inv packet with %d items: blocks=%d txs=%d (%s).',
@ -2434,7 +2478,7 @@ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) {
return;
}
this.emit('headers', headers);
yield this.emitHeaders(headers);
});
/**
@ -2464,7 +2508,7 @@ Peer.prototype.handleBlock = co(function* handleBlock(packet) {
this.lastBlock = util.ms();
this.emit('block', packet.block);
yield this.emitBlock(packet.block);
});
/**
@ -2481,12 +2525,12 @@ Peer.prototype.handleTX = co(function* handleTX(packet) {
if (this.lastMerkle.hasTX(tx)) {
this.lastMerkle.addTX(tx);
if (--this.waitingTX === 0)
this.flushMerkle();
yield this.flushMerkle();
return;
}
}
this.emit('tx', tx);
yield this.emitTX(tx);
});
/**
@ -2700,10 +2744,10 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) {
if (result) {
this.lastBlock = util.ms();
this.emit('block', block.toBlock());
this.logger.debug(
'Received full compact block %s (%s).',
block.rhash(), this.hostname);
yield this.emitBlock(block.toBlock());
return;
}
@ -2799,8 +2843,9 @@ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) {
this.lastBlock = util.ms();
this.emit('block', block.toBlock());
this.emit('getblocktxn', res);
yield this.emitBlock(block.toBlock());
});
/**

View File

@ -99,7 +99,7 @@ function Pool(options) {
this.chain = this.options.chain;
this.mempool = this.options.mempool;
this.server = this.options.createServer();
this.locker = new Lock();
this.locker = new Lock(true);
this.connected = false;
this.syncing = false;
@ -108,7 +108,6 @@ function Pool(options) {
this.spvFilter = null;
this.txFilter = null;
this.requestMap = new Map();
this.queueMap = new Map();
this.invMap = new Map();
this.scheduled = false;
this.pendingWatch = null;
@ -445,19 +444,11 @@ Pool.prototype.addLoader = function addLoader() {
return;
}
addr = this.getHost(false);
addr = this.getHost();
if (!addr)
return;
peer = this.peers.get(addr.hostname);
if (peer) {
this.logger.info('Repurposing peer for loader (%s).', peer.hostname);
this.setLoader(peer);
return;
}
peer = this.createPeer(addr);
this.logger.info('Setting loader peer (%s).', peer.hostname);
@ -661,56 +652,6 @@ Pool.prototype.bindPeer = function bindPeer(peer) {
self.handleAddr(peer, addrs);
});
peer.on('merkleblock', co(function* (block) {
if (!self.options.spv)
return;
try {
yield self.handleBlock(peer, block);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('block', co(function* (block) {
if (self.options.spv)
return;
try {
yield self.handleBlock(peer, block);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('tx', co(function* (tx) {
try {
yield self.handleTX(peer, tx);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('headers', co(function* (headers) {
try {
yield self.handleHeaders(peer, headers);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('blocks', co(function* (hashes) {
try {
yield self.handleBlockInv(peer, hashes);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('txs', function(hashes) {
self.handleTXInv(peer, hashes);
});
peer.on('reject', function(reject) {
self.handleReject(peer, reject);
});
@ -840,6 +781,24 @@ Pool.prototype.handleAddr = function handleAddr(peer, addrs) {
*/
Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
var hash = block.hash('hex');
var unlock = yield this.locker.lock(hash);
try {
return yield this._handleBlock(peer, block);
} finally {
unlock();
}
});
/**
* Handle `block` packet. Attempt to add to chain (without a lock).
* @private
* @param {Peer} peer
* @param {MemBlock|MerkleBlock} block
* @returns {Promise}
*/
Pool.prototype._handleBlock = co(function* handleBlock(peer, block) {
var hash = block.hash('hex');
var requested;
@ -861,10 +820,8 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
try {
yield this.chain.add(block);
} catch (err) {
if (err.type !== 'VerifyError') {
this.scheduleRequests(peer);
if (err.type !== 'VerifyError')
throw err;
}
if (err.reason === 'bad-prevblk') {
if (this.options.headers) {
@ -873,24 +830,19 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
}
this.logger.debug('Peer sent an orphan block. Resolving.');
yield peer.resolveOrphan(null, hash);
this.scheduleRequests(peer);
throw err;
}
peer.reject(block, err.code, err.reason, err.score);
this.scheduleRequests(peer);
throw err;
}
this.scheduleRequests(peer);
if (this.logger.level >= 4 && this.chain.total % 20 === 0) {
this.logger.debug('Status:'
+ ' ts=%s height=%d progress=%s'
+ ' blocks=%d orphans=%d active=%d'
+ ' queue=%d target=%s peers=%d'
+ ' target=%s peers=%d'
+ ' pending=%d jobs=%d',
util.date(block.ts),
this.chain.height,
@ -898,11 +850,10 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
this.chain.total,
this.chain.orphanCount,
this.requestMap.size,
this.queueMap.size,
block.bits,
this.peers.size(),
this.chain.locker.pending,
this.chain.locker.jobs.length);
this.locker.pending,
this.locker.jobs.length);
}
if (this.chain.total % 2000 === 0) {
@ -922,6 +873,24 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) {
*/
Pool.prototype.handleTX = co(function* handleTX(peer, tx) {
var hash = tx.hash('hex');
var unlock = yield this.locker.lock(hash);
try {
return yield this._handleTX(peer, tx);
} finally {
unlock();
}
});
/**
* Handle a transaction. Attempt to add to mempool (without a lock).
* @private
* @param {Peer} peer
* @param {TX} tx
* @returns {Promise}
*/
Pool.prototype._handleTX = co(function* handleTX(peer, tx) {
var hash = tx.hash('hex');
var requested = this.fulfill(peer, hash);
var missing;
@ -937,7 +906,6 @@ Pool.prototype.handleTX = co(function* handleTX(peer, tx) {
if (!requested)
this.txFilter.add(tx.hash());
this.emit('tx', tx, peer);
this.scheduleRequests(peer);
return;
}
@ -963,15 +931,9 @@ Pool.prototype.handleTX = co(function* handleTX(peer, tx) {
'Requesting %d missing transactions (%s).',
missing.length, peer.hostname);
try {
this.getTX(peer, missing);
} catch (e) {
this.emit('error', e);
}
this.getTX(peer, missing);
}
this.scheduleRequests(peer);
this.emit('tx', tx, peer);
});
@ -1002,7 +964,9 @@ Pool.prototype.handleHeaders = co(function* handleHeaders(peer, headers) {
*/
Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) {
var i, ret, header, hash, last;
var items = [];
var ret = new VerifyResult();
var i, header, hash, last;
if (!this.options.headers)
return;
@ -1010,8 +974,6 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) {
if (!this.syncing)
return;
ret = new VerifyResult();
this.logger.debug(
'Received %s headers from peer (%s).',
headers.length,
@ -1019,6 +981,9 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) {
this.emit('headers', headers);
if (headers.length === 0)
return;
for (i = 0; i < headers.length; i++) {
header = headers[i];
hash = header.hash('hex');
@ -1035,14 +1000,14 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) {
last = hash;
if (yield this.chain.has(hash))
if (yield this.hasBlock(hash))
continue;
this.getBlock(peer, hash);
items.push(hash);
}
// Schedule the getdata's we just added.
this.scheduleRequests(peer);
// Request the hashes we just added.
this.getBlock(peer, items);
// Restart the getheaders process
// Technically `last` is not indexed yet so
@ -1051,7 +1016,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) {
// that much since FindForkInGlobalIndex
// simply tries to find the latest block in
// the peer's chain.
if (last && headers.length === 2000)
if (headers.length === 2000)
yield peer.getHeaders(last);
});
@ -1082,6 +1047,7 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) {
*/
Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) {
var items = [];
var i, hash;
if (!this.syncing)
@ -1101,8 +1067,6 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) {
yield peer.getHeaders(null, hash);
}
this.scheduleRequests(peer);
return;
}
@ -1118,20 +1082,14 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) {
// Resolve orphan chain.
if (this.chain.hasOrphan(hash)) {
// There is a possible race condition here.
// The orphan may get resolved by the time
// we create the locator. In that case, we
// should probably actually move to the
// `exists` clause below if it is the last
// hash.
this.logger.debug('Received known orphan hash (%s).', peer.hostname);
yield peer.resolveOrphan(null, hash);
continue;
}
// Request the block if we don't have it.
if (!(yield this.chain.has(hash))) {
this.getBlock(peer, hash);
if (!(yield this.hasBlock(hash))) {
items.push(hash);
continue;
}
@ -1148,7 +1106,7 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) {
}
}
this.scheduleRequests(peer);
this.getBlock(peer, items);
});
/**
@ -1158,14 +1116,42 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) {
* @param {Hash[]} hashes
*/
Pool.prototype.handleTXInv = function handleTXInv(peer, hashes) {
Pool.prototype.handleTXInv = co(function* handleTXInv(peer, hashes) {
var unlock = yield this.locker.lock();
try {
return yield this._handleTXInv(peer, hashes);
} finally {
unlock();
}
});
/**
* Handle peer inv packet (txs).
* @private
* @param {Peer} peer
* @param {Hash[]} hashes
*/
Pool.prototype._handleTXInv = co(function* handleTXInv(peer, hashes) {
var items = [];
var i, hash;
this.emit('txs', hashes, peer);
if (this.syncing && !this.chain.synced)
return;
this.getTX(peer, hashes);
};
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.hasTX(hash))
continue;
items.push(hash);
}
this.getTX(peer, items);
});
/**
* Handle peer reject event.
@ -1290,7 +1276,7 @@ Pool.prototype.addInbound = function addInbound(socket) {
* @returns {NetAddress}
*/
Pool.prototype.getHost = function getHost(unique) {
Pool.prototype.getHost = function getHost() {
var services = this.options.requiredServices;
var now = this.network.now();
var i, entry, addr;
@ -1312,10 +1298,8 @@ Pool.prototype.getHost = function getHost(unique) {
addr = entry.addr;
if (unique) {
if (this.peers.has(addr.hostname))
continue;
}
if (this.peers.has(addr.hostname))
continue;
if (!addr.isValid())
continue;
@ -1355,7 +1339,7 @@ Pool.prototype.addOutbound = function addOutbound() {
if (!this.peers.load)
return;
addr = this.getHost(true);
addr = this.getHost();
if (!addr)
return;
@ -1424,16 +1408,6 @@ Pool.prototype.removePeer = function removePeer(peer) {
hash = hashes[i];
this.fulfill(peer, hash);
}
hashes = peer.blockQueue;
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
assert(this.queueMap.has(hash));
this.queueMap.remove(hash);
}
peer.blockQueue.length = 0;
};
/**
@ -1541,14 +1515,16 @@ Pool.prototype.watchOutpoint = function watchOutpoint(outpoint) {
};
/**
* Queue a `getdata` request to be sent. Checks existence
* in the chain before requesting.
* Queue a `getdata` request to be sent.
* @param {Peer} peer
* @param {Hash} hash - Block hash.
* @param {Hash[]} hashes
* @returns {Promise}
*/
Pool.prototype.getBlock = function getBlock(peer, hash) {
Pool.prototype.getBlock = function getBlock(peer, hashes) {
var items = [];
var i, hash;
if (!this.loaded)
return;
@ -1558,38 +1534,30 @@ Pool.prototype.getBlock = function getBlock(peer, hash) {
if (peer.destroyed)
throw new Error('Peer is destroyed (getdata).');
if (this.requestMap.has(hash))
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.requestMap.has(hash))
continue;
this.requestMap.insert(hash);
peer.requestMap.insert(hash);
items.push(hash);
}
if (items.length === 0)
return;
if (this.queueMap.has(hash))
return;
this.logger.debug(
'Requesting %d/%d blocks from peer with getdata (%s).',
items.length,
this.requestMap.size,
peer.hostname);
this.queueMap.insert(hash);
peer.blockQueue.push(hash);
peer.getBlock(items);
};
/**
* Test whether the chain has or has seen an item.
* @param {Hash} hash
* @returns {Promise} - Returns Boolean.
*/
Pool.prototype.hasBlock = co(function* hasBlock(hash) {
// Check the chain.
if (yield this.chain.has(hash))
return true;
// Check the pending requests.
if (this.requestMap.has(hash))
return true;
// Check the queued requests.
if (this.queueMap.has(hash))
return true;
return false;
});
/**
* Queue a `getdata` request to be sent. Checks existence
* in the mempool before requesting.
@ -1614,11 +1582,9 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.hasTX(hash))
if (this.requestMap.has(hash))
continue;
assert(!this.requestMap.has(hash));
this.requestMap.insert(hash);
peer.requestMap.insert(hash);
@ -1637,6 +1603,24 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
peer.getTX(items);
};
/**
* Test whether the chain has or has seen an item.
* @param {Hash} hash
* @returns {Promise} - Returns Boolean.
*/
Pool.prototype.hasBlock = co(function* hasBlock(hash) {
// Check the lock.
if (this.locker.has(hash))
return true;
// Check the chain.
if (yield this.chain.has(hash))
return true;
return false;
});
/**
* Test whether the mempool has or has seen an item.
* @param {Hash} hash
@ -1644,6 +1628,10 @@ Pool.prototype.getTX = function getTX(peer, hashes) {
*/
Pool.prototype.hasTX = function hasTX(hash) {
// Check the lock queue.
if (this.locker.has(hash))
return true;
if (!this.mempool) {
// Check the TX filter if
// we don't have a mempool.
@ -1661,100 +1649,9 @@ Pool.prototype.hasTX = function hasTX(hash) {
}
}
// Check the pending requests.
if (this.requestMap.has(hash))
return true;
return false;
};
/**
* Schedule next batch of `getdata` requests for peer.
* @param {Peer} peer
* @returns {Promise}
*/
Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) {
if (this.scheduled)
return;
this.scheduled = true;
yield this.chain.onDrain();
this.sendBlockRequests(peer);
this.scheduled = false;
});
/**
* Send scheduled requests in the request queues.
* @private
* @param {Peer} peer
*/
Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) {
var queue = peer.blockQueue;
var hashes = [];
var i, size, hash;
if (peer.destroyed)
return;
if (queue.length === 0)
return;
if (this.options.spv) {
if (this.requestMap.size >= 2000)
return;
size = Math.min(queue.length, 50000);
} else {
size = this.network.getBatchSize(this.chain.height);
if (this.requestMap.size >= size)
return;
}
for (i = 0; i < queue.length; i++) {
hash = queue[i];
if (hashes.length === size)
break;
assert(this.queueMap.has(hash));
this.queueMap.remove(hash);
assert(!this.requestMap.has(hash));
// Do a second check to make sure
// we don't have this in the chain.
// Avoids a potential race condition
// with the `chain.has()` call.
if (this.chain.seen(hash))
continue;
this.requestMap.insert(hash);
peer.requestMap.insert(hash);
hashes.push(hash);
}
peer.blockQueue = queue.slice(i);
if (hashes.length === 0)
return;
this.logger.debug(
'Requesting %d/%d blocks from peer with getdata (%s).',
hashes.length,
this.requestMap.size,
peer.hostname);
peer.getBlock(hashes);
};
/**
* Fulfill a requested item.
* @param {Peer} peer