refactor pool.

This commit is contained in:
Christopher Jeffrey 2016-06-08 03:36:08 -07:00
parent 287d718d51
commit 7791f7b21d
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 199 additions and 185 deletions

View File

@ -96,7 +96,7 @@ function ChainDB(chain, options) {
// Key size: 68b (* 2)
this.coinWindow = ((165 * 1024 + 5000 * 9) + (5000 * 68 * 2)) * 5;
this.coinCache = new bcoin.lru(this.coinWindow);
this.coinCache = new NullCache(this.coinWindow);
this.cacheHash = new bcoin.lru(this.cacheWindow, 1);
this.cacheHeight = new bcoin.lru(this.cacheWindow, 1);

View File

@ -95,6 +95,8 @@ function Peer(pool, options) {
this.invFilter = new bcoin.bloom.rolling(50000, 0.000001);
this.lastBlock = null;
this.waiting = 0;
this.syncSent = false;
this.loader = options.loader || false;
this.challenge = null;
this.lastPong = -1;
@ -178,7 +180,7 @@ Peer.prototype._init = function init() {
});
this.parser.on('error', function(err) {
self.sendReject(null, 'malformed', 'error parsing message', 10);
self.reject(null, 'malformed', 'error parsing message', 10);
self._error(err, true);
});
};
@ -237,6 +239,13 @@ Peer.prototype._onconnect = function _onconnect() {
if (self.pool.feeRate !== -1)
self.setFeeRate(self.pool.feeRate);
// Start syncing the chain.
self.sync();
// Ask for the mempool if we're synced.
if (self.loader && self.pool.synced)
self.sendMempool();
// Finally we can let the pool know
// that this peer is ready to go.
self.emit('ack');
@ -476,8 +485,7 @@ Peer.prototype.updateWatch = function updateWatch() {
if (!this.pool.options.spv)
return;
if (this.ack)
this.write(this.framer.filterLoad(this.pool.spvFilter));
this.write(this.framer.filterLoad(this.pool.spvFilter));
};
/**
@ -1581,7 +1589,7 @@ Peer.prototype.sendAlert = function sendAlert(details) {
* @param {Hash?} stop - Hash to stop at.
*/
Peer.prototype.getHeaders = function getHeaders(locator, stop) {
Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) {
bcoin.debug(
'Requesting headers packet from peer with getheaders (%s).',
this.hostname);
@ -1600,7 +1608,7 @@ Peer.prototype.getHeaders = function getHeaders(locator, stop) {
* @param {Hash?} stop - Hash to stop at.
*/
Peer.prototype.getBlocks = function getBlocks(locator, stop) {
Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) {
bcoin.debug(
'Requesting inv packet from peer with getblocks (%s).',
this.hostname);
@ -1617,7 +1625,7 @@ Peer.prototype.getBlocks = function getBlocks(locator, stop) {
* Send `mempool` to peer.
*/
Peer.prototype.getMempool = function getMempool() {
Peer.prototype.sendMempool = function sendMempool() {
bcoin.debug(
'Requesting inv packet from peer with mempool (%s).',
this.hostname);
@ -1630,7 +1638,7 @@ Peer.prototype.getMempool = function getMempool() {
* @param {Object} details - See {@link Framer.reject}.
*/
Peer.prototype.reject = function reject(details) {
Peer.prototype.sendReject = function sendReject(details) {
bcoin.debug(
'Sending reject packet to peer (%s).',
this.hostname);
@ -1665,8 +1673,144 @@ Peer.prototype.setMisbehavior = function setMisbehavior(score) {
* @param {Number} score
*/
Peer.prototype.sendReject = function sendReject(obj, code, reason, score) {
return this.pool.reject(this, obj, code, reason, score);
Peer.prototype.reject = function reject(obj, code, reason, score) {
var type;
if (obj) {
type = (obj instanceof bcoin.tx) ? 'tx' : 'block';
bcoin.debug('Rejecting %s %s (%s): ccode=%s reason=%s.',
type, obj.rhash, this.hostname, code, reason);
this.sendReject({
message: type,
ccode: code,
reason: reason,
data: obj.hash()
});
this.pool.rejects.add(obj.hash());
} else {
bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.',
this.hostname, code, reason);
this.sendReject({
ccode: code,
reason: reason,
data: null
});
}
if (score != null)
this.setMisbehavior(score);
};
/**
* Send `getblocks` to peer after building
* locator and resolving orphan root.
* @param {Hash} tip - Tip to build chain locator from.
* @param {Hash} orphan - Orphan hash to resolve.
* @param {Function} callback
*/
Peer.prototype.resolveOrphan = function resolveOrphan(tip, orphan, callback) {
var self = this;
callback = utils.ensure(callback);
assert(orphan);
this.chain.getLocator(tip, function(err, locator) {
if (err)
return callback(err);
orphan = self.chain.getOrphanRoot(orphan);
// Was probably resolved.
if (!orphan) {
bcoin.debug('Orphan root was already resolved.');
return callback();
}
self.sendGetBlocks(locator, orphan.root);
callback();
});
};
/**
* Send `getheaders` to peer after building locator.
* @param {Hash} tip - Tip to build chain locator from.
* @param {Hash?} stop
* @param {Function} callback
*/
Peer.prototype.getHeaders = function getHeaders(tip, stop, callback) {
var self = this;
callback = utils.ensure(callback);
this.chain.getLocator(tip, function(err, locator) {
if (err)
return callback(err);
self.sendGetHeaders(locator, stop);
callback();
});
};
/**
* Send `getblocks` to peer after building locator.
* @param {Hash} tip - Tip hash to build chain locator from.
* @param {Hash?} stop
* @param {Function} callback
*/
Peer.prototype.getBlocks = function getBlocks(tip, stop, callback) {
var self = this;
callback = utils.ensure(callback);
this.chain.getLocator(tip, function(err, locator) {
if (err)
return callback(err);
self.sendGetBlocks(locator, stop);
callback();
});
};
/**
* Start syncing from peer.
* @param {Function} callback
*/
Peer.prototype.sync = function sync(callback) {
var tip;
if (!this.pool.syncing)
return;
if (this.syncSent)
return;
if (!this.loader) {
if (this.chain.tip.ts <= bcoin.now() - 24 * 60 * 60)
return;
}
this.syncSent = true;
if (this.options.headers) {
if (!this.chain.tip.isGenesis())
tip = this.chain.tip.prevBlock;
return this.getHeaders(tip, null, callback);
}
return this.getBlocks(null, null, callback);
};
/**

View File

@ -292,10 +292,19 @@ Pool.prototype._init = function _init() {
this.chain.on('full', function() {
self._stopTimer();
self._stopInterval();
if (!self.synced && self.peers.load)
self.peers.load.getMempool();
if (!self.synced) {
// Ask loader for a mempool snapshot.
if (self.peers.load)
self.peers.load.sendMempool();
// Ask all peers for their latest blocks.
self.sync();
}
self.synced = true;
self.emit('full');
bcoin.debug('Chain is fully synced (height=%d).', self.chain.height);
});
@ -323,82 +332,6 @@ Pool.prototype._init = function _init() {
});
};
/**
* Send `getblocks` to peer after building locator.
* @param {Peer} peer
* @param {Hash} top - Top hash to build chain locator from.
* @param {Hash?} stop
* @param {Function} callback
*/
Pool.prototype.getBlocks = function getBlocks(peer, top, stop, callback) {
callback = utils.ensure(callback);
this.chain.getLocator(top, function(err, locator) {
if (err)
return callback(err);
peer.getBlocks(locator, stop);
callback();
});
};
/**
* Send `getblocks` to peer after building
* locator and resolving orphan root.
* @param {Peer} peer
* @param {Hash} top - Top hash to build chain locator from.
* @param {Hash} orphan - Orphan hash to resolve.
* @param {Function} callback
*/
Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan, callback) {
var self = this;
callback = utils.ensure(callback);
assert(orphan);
this.chain.getLocator(top, function(err, locator) {
if (err)
return callback(err);
orphan = self.chain.getOrphanRoot(orphan);
// Was probably resolved.
if (!orphan) {
bcoin.debug('Orphan root was already resolved.');
return callback();
}
peer.getBlocks(locator, orphan.root);
callback();
});
};
/**
* Send `getheaders` to peer after building locator.
* @param {Peer} peer
* @param {Hash} top - Top hash to build chain locator from.
* @param {Hash?} stop
* @param {Function} callback
*/
Pool.prototype.getHeaders = function getHeaders(peer, top, stop, callback) {
callback = utils.ensure(callback);
this.chain.getLocator(top, function(err, locator) {
if (err)
return callback(err);
peer.getHeaders(locator, stop);
callback();
});
};
/**
* Start listening on a server socket.
* @param {Function} callback
@ -549,6 +482,7 @@ Pool.prototype._addLoader = function _addLoader() {
peer = this._createPeer({
host: this.getLoaderHost(),
loader: true,
network: true,
spv: this.options.spv,
witness: this.options.witness
@ -577,16 +511,6 @@ Pool.prototype._addLoader = function _addLoader() {
self._addLoader();
});
peer.once('ack', function() {
if (self.synced)
peer.getMempool();
if (!self.syncing)
return;
self._load();
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
@ -673,8 +597,15 @@ Pool.prototype.startSync = function startSync() {
return;
}
if (this.peers.load.ack)
this._load();
this.sync();
};
Pool.prototype.sync = function sync() {
if (this.peers.load)
this.peers.load.sync();
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].sync();
};
/**
@ -692,6 +623,12 @@ Pool.prototype.stopSync = function stopSync() {
this._stopInterval();
this._stopTimer();
if (this.peers.load)
this.peers.load.syncSent = false;
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].syncSent = false;
};
Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) {
@ -725,12 +662,12 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
var hash = header.hash('hex');
if (last && header.prevBlock !== last) {
peer.sendReject(header, 'invalid', 'bad-prevblk', 100);
peer.reject(header, 'invalid', 'bad-prevblk', 100);
return next(new Error('Bad header chain.'));
}
if (!header.verify(ret)) {
peer.sendReject(header, 'invalid', ret.reason, 100);
peer.reject(header, 'invalid', ret.reason, 100);
self.rejects.add(header.hash());
return next(new VerifyError(header, 'invalid', ret.reason, ret.score));
}
@ -753,7 +690,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
// simply tries to find the latest block in
// the peer's chain.
if (last && headers.length === 2000)
self.getHeaders(peer, last, null, callback);
peer.getHeaders(last, null, callback);
else
callback();
});
@ -793,7 +730,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
// Resolve orphan chain.
if (self.chain.hasOrphan(hash)) {
bcoin.debug('Peer sent a hash that is already a known orphan.');
return self.resolveOrphan(peer, null, hash, next);
return peer.resolveOrphan(null, hash, next);
}
self.getData(peer, self.block.type, hash, function(err, exists) {
@ -809,7 +746,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
// the hashContinue on the remote node).
if (exists && i === hashes.length - 1) {
// Request more hashes:
self.getBlocks(peer, hash, null, next);
peer.getBlocks(hash, null, next);
// Re-download the block (traditional method):
// self.getData(peer, self.block.type, hash, true, next);
return;
@ -840,7 +777,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) {
return this._handleBlocks(hashes, peer, callback);
utils.forEachSerial(hashes, function(hash, next) {
self.getHeaders(peer, null, hash, next);
peer.getHeaders(null, hash, next);
}, function(err) {
if (err)
return callback(err);
@ -878,14 +815,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
}
if (err.score !== -1)
peer.sendReject(block, err.code, err.reason, err.score);
peer.reject(block, err.code, err.reason, err.score);
if (err.reason === 'bad-prevblk') {
if (self.options.headers) {
peer.setMisbehavior(10);
return callback(err);
}
return self.resolveOrphan(peer, null, block.hash('hex'), function(e) {
return peer.resolveOrphan(null, block.hash('hex'), function(e) {
self.scheduleRequests(peer);
return callback(e || err);
});
@ -925,21 +862,6 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
});
};
Pool.prototype._load = function _load() {
if (!this.syncing)
return;
if (!this.peers.load) {
this._addLoader();
return;
}
if (this.options.headers)
this.getHeaders(this.peers.load, null, null);
else
this.getBlocks(this.peers.load, null, null);
};
/**
* Send `mempool` to all peers.
*/
@ -948,10 +870,10 @@ Pool.prototype.getMempool = function getMempool() {
var i;
if (this.peers.load)
this.peers.load.getMempool();
this.peers.load.sendMempool();
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].getMempool();
this.peers.regular[i].sendMempool();
};
Pool.prototype._createPeer = function _createPeer(options) {
@ -960,6 +882,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
var peer = new bcoin.peer(this, {
host: options.host,
createSocket: this.options.createSocket,
loader: options.loader,
relay: this.options.relay,
socket: options.socket,
network: options.network,
@ -1153,7 +1076,7 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
if (err) {
if (err.type === 'VerifyError') {
if (err.score !== -1)
peer.sendReject(tx, err.code, err.reason, err.score);
peer.reject(tx, err.code, err.reason, err.score);
self.rejects.add(tx.hash());
return callback(err);
}
@ -1282,18 +1205,6 @@ Pool.prototype._addPeer = function _addPeer() {
});
};
Pool.prototype.bestPeer = function bestPeer() {
return this.peers.regular.reduce(function(best, peer) {
if (!peer.version || !peer.socket)
return;
if (!best || peer.version.height > best.version.height)
return peer;
return best;
}, null);
};
Pool.prototype._removePeer = function _removePeer(peer) {
utils.binaryRemove(this.peers.pending, peer, compare);
utils.binaryRemove(this.peers.regular, peer, compare);
@ -1302,14 +1213,15 @@ Pool.prototype._removePeer = function _removePeer(peer) {
delete this.peers.map[peer.host];
if (this.peers.load === peer) {
Object.keys(this.request.map).forEach(function(hash) {
var item = this.request.map[hash];
if (item.peer === peer)
item.finish(new Error('Peer closed.'));
}, this);
bcoin.debug('Removed loader peer (%s).', peer.hostname);
this.peers.load = null;
}
Object.keys(this.request.map).forEach(function(hash) {
var item = this.request.map[hash];
if (item.peer === peer)
item.finish(new Error('Peer closed.'));
}, this);
};
/**
@ -1926,48 +1838,6 @@ Pool.prototype.isMisbehaving = function isMisbehaving(host) {
return false;
};
/**
* Send a `reject` packet to peer.
* @see Framer.reject
* @param {Peer} peer
* @param {(TX|Block)?} obj
* @param {String} code - cccode.
* @param {String} reason
* @param {Number} score
*/
Pool.prototype.reject = function reject(peer, obj, code, reason, score) {
var type;
if (obj) {
type = (obj instanceof bcoin.tx) ? 'tx' : 'block';
bcoin.debug('Rejecting %s %s (%s): ccode=%s reason=%s.',
type, obj.rhash, peer.hostname, code, reason);
peer.reject({
message: type,
ccode: code,
reason: reason,
data: obj.hash()
});
this.rejects.add(obj.hash());
} else {
bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.',
peer.hostname, code, reason);
peer.reject({
ccode: code,
reason: reason,
data: null
});
}
if (score != null)
peer.setMisbehavior(score);
};
/**
* Attempt to retrieve external IP from icanhazip.com.
* @param {Function} callback