refactor block sync. fix fees.

This commit is contained in:
Christopher Jeffrey 2016-06-11 23:06:37 -07:00
parent 59f06bb06c
commit 927fb9c555
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
6 changed files with 200 additions and 267 deletions

View File

@ -67,10 +67,9 @@ function Chain(options) {
this.network = bcoin.network.get(options.network);
this.db = new bcoin.chaindb(this, options);
this.total = 0;
this.adding = false;
this.currentBlock = null;
this.orphanLimit = options.orphanLimit || (20 << 20);
this.pendingLimit = options.pendingLimit || (1024 << 20);
this.locker = new bcoin.locker(this, this.add, this.pendingLimit);
this.locker = new bcoin.locker(this, this.add);
this.invalid = {};
this.bestHeight = -1;
this.tip = null;
@ -1278,7 +1277,6 @@ Chain.prototype.reset = function reset(height, callback, force) {
// have been some orphans on a forked chain we
// no longer need.
self.purgeOrphans();
self.purgePending();
callback(null, result);
});
@ -1328,7 +1326,9 @@ Chain.prototype.onDrain = function onDrain(callback) {
*/
Chain.prototype.isBusy = function isBusy() {
return this.adding || this.locker.pending.length > 0;
if (this.currentBlock)
return true;
return this.locker.pending.length > 0;
};
/**
@ -1352,13 +1352,13 @@ Chain.prototype.add = function add(block, callback, force) {
callback = utils.wrap(callback, unlock);
this.adding = true;
(function next(block, initial) {
var hash = block.hash('hex');
var prevHash = block.prevBlock;
var height, checkpoint, orphan, entry;
self.currentBlock = hash;
function handleOrphans() {
// No orphan chain.
if (!self.orphan.map[hash])
@ -1506,7 +1506,6 @@ Chain.prototype.add = function add(block, callback, force) {
// who isn't trying to fool us.
if (hash !== checkpoint) {
self.purgeOrphans();
self.purgePending();
self.emit('fork', block, {
height: height,
@ -1627,7 +1626,7 @@ Chain.prototype.add = function add(block, callback, force) {
self.emit('full');
}
self.adding = false;
self.currentBlock = null;
if (err)
callback(err);
@ -1699,17 +1698,6 @@ Chain.prototype.pruneOrphans = function pruneOrphans() {
this.orphan.size += best.getSize();
};
/**
* Purge any pending blocks in the queue: note that
* this call is unpredictable and may screw up the
* blockchain sync. It is only used as a last resort
* if more than 500mb of pending blocks are in the queue.
*/
Chain.prototype.purgePending = function purgePending() {
return this.locker.purgePending();
};
/**
* Test the chain to see if it has a block, orphan, or pending block.
* @param {Hash} hash
@ -1723,6 +1711,9 @@ Chain.prototype.has = function has(hash, callback) {
if (this.hasPending(hash))
return callback(null, true);
if (hash === this.currentBlock)
return callback(null, true);
return this.hasBlock(hash, callback);
};
@ -1902,13 +1893,13 @@ Chain.prototype.getLocator = function getLocator(start, callback, force) {
if (start == null)
start = this.tip.hash;
return self.db.get(start, function(err, entry) {
return this.db.get(start, function(err, entry) {
if (err)
return callback(err);
if (!entry) {
// We could simply return `start` here,
// but there is no standardized "spacing"
// but there is no required "spacing"
// for locator hashes. Pretend this hash
// is our tip. This is useful for
// getheaders.
@ -1944,6 +1935,8 @@ Chain.prototype.getLocator = function getLocator(start, callback, force) {
if (!main)
return entry.getAncestorByHeight(height, next);
// If we're in the main chain, we can
// do an O(1) lookup of the hash.
self.db.getHash(height, function(err, hash) {
if (err)
return next(err);

View File

@ -208,21 +208,21 @@ ConfirmStats.prototype.estimateMedian = function estimateMedian(target, needed,
}
}
bcoin.debug('estimatefee: target=%d.'
+ ' For conf success %s %d need %s %s: %d from buckets %d - %d.'
+ ' Cur Bucket stats %d% %d/%d (%d mempool).',
target,
greater ? '>' : '<',
breakpoint,
this.type,
greater ? '>' : '<',
median,
this.buckets[minBucket],
this.buckets[maxBucket],
100 * conf / Math.max(1, total + extra),
conf,
total,
extra);
// bcoin.debug('estimatefee: target=%d.'
// + ' For conf success %s %d need %s %s: %d from buckets %d - %d.'
// + ' Cur Bucket stats %d% %d/%d (%d mempool).',
// target,
// greater ? '>' : '<',
// breakpoint,
// this.type,
// greater ? '>' : '<',
// median,
// this.buckets[minBucket],
// this.buckets[maxBucket],
// 100 * conf / Math.max(1, total + extra),
// conf,
// total,
// extra);
return median;
};
@ -485,6 +485,9 @@ PolicyEstimator.prototype.processBlock = function processBlock(height, entries,
this.bestHeight = height;
if (entries.length === 0)
return;
// Wait for chain to sync.
if (!current)
return;

View File

@ -15,12 +15,11 @@ var assert = utils.assert;
* @constructor
* @param {Function} parent - Parent constructor.
* @param {Function?} add - `add` method (whichever method is queuing data).
* @param {Number?} limit - Limit in bytes of data that can be queued.
*/
function Locker(parent, add, limit) {
function Locker(parent, add) {
if (!(this instanceof Locker))
return Locker(parent, add, limit);
return Locker(parent, add);
this.parent = parent;
this.jobs = [];
@ -28,8 +27,6 @@ function Locker(parent, add, limit) {
this.pending = [];
this.pendingMap = {};
this.pendingSize = 0;
this.pendingLimit = limit || (20 << 20);
this.add = add;
}
@ -52,7 +49,7 @@ Locker.prototype.hasPending = function hasPending(key) {
* @param {Function} func - The method being called.
* @param {Array} args - Arguments passed to the method.
* @param {Boolean?} force - Force a call.
* @returns {Function} unlock - Unlocker - must be
* @returns {Function} Unlocker - must be
* called once the method finishes executing in order
* to resolve the queue.
*/
@ -74,11 +71,6 @@ Locker.prototype.lock = function lock(func, args, force) {
obj = args[0];
this.pending.push(obj);
this.pendingMap[obj.hash('hex')] = true;
this.pendingSize += obj.getSize();
if (this.pendingSize > this.pendingLimit) {
this.purgePending();
return;
}
}
this.jobs.push([func, args]);
return;
@ -108,7 +100,6 @@ Locker.prototype.lock = function lock(func, args, force) {
obj = item[1][0];
assert(obj === self.pending.shift());
delete self.pendingMap[obj.hash('hex')];
self.pendingSize -= obj.getSize();
}
item[0].apply(self.parent, item[1]);
@ -120,36 +111,9 @@ Locker.prototype.lock = function lock(func, args, force) {
*/
Locker.prototype.destroy = function destroy() {
if (this.add)
this.purgePending();
this.jobs.length = 0;
};
/**
* Purge all pending calls (called once `add` has hit `limit`).
*/
Locker.prototype.purgePending = function purgePending() {
var self = this;
var total = this.pending.length;
assert(this.add);
this.emit('purge', total, this.pendingSize);
this.pending.forEach(function(obj) {
delete self.pendingMap[obj.hash('hex')];
});
this.pending.length = 0;
this.pendingSize = 0;
this.jobs = this.jobs.filter(function(item) {
return item[0] !== self.add;
});
if (total !== 0)
this.emit('drain');
this.pendingMap = {};
this.jobs.length = 0;
};
/**

View File

@ -74,8 +74,7 @@ function Mempool(options) {
this.network = this.chain.network;
this.loaded = false;
this.locker = new bcoin.locker(this, this.addTX, 100 << 20);
this.writeLock = new bcoin.locker(this);
this.locker = new bcoin.locker(this, this.addTX);
this.db = null;
this.size = 0;
@ -114,14 +113,6 @@ Mempool.prototype._lock = function _lock(func, args, force) {
return this.locker.lock(func, args, force);
};
/**
* Purge pending txs in the queue.
*/
Mempool.prototype.purgePending = function purgePending() {
return this.locker.purgePending();
};
Mempool.prototype._init = function _init() {
var self = this;
var unlock = this._lock(utils.nop, []);

View File

@ -1800,7 +1800,7 @@ Peer.prototype.sync = function sync(callback) {
return;
if (!this.loader) {
if (this.chain.tip.ts <= bcoin.now() - 24 * 60 * 60)
if (!this.chain.isFull())
return;
}

View File

@ -125,6 +125,7 @@ function Pool(options) {
this.connected = false;
this.uid = 0;
this._createServer = options.createServer;
this.locker = new bcoin.locker(this);
this.syncing = false;
this.synced = false;
@ -494,90 +495,9 @@ Pool.prototype._addLoader = function _addLoader() {
this.peers.all.push(peer);
this.peers.map[peer.host] = peer;
peer.once('close', function() {
self._stopInterval();
self._stopTimer();
self._removePeer(peer);
if (self.peers.regular.length === 0) {
bcoin.debug('%s %s %s',
'Could not connect to any peers.',
'Do you have a network connection?',
'Retrying in 5 seconds.');
setTimeout(function() {
self._addLoader();
}, 5000);
return;
}
self._addLoader();
utils.nextTick(function() {
self.emit('loader', peer);
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
if (!self.syncing)
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
self._handleBlock(block, peer, function(err) {
if (err)
return self.emit('error', err);
self._startInterval();
self._startTimer();
});
});
peer.on('block', function(block) {
if (self.options.spv)
return;
if (!self.syncing)
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
self._handleBlock(block, peer, function(err) {
if (err)
return self.emit('error', err);
self._startInterval();
self._startTimer();
});
});
if (self.options.headers) {
peer.on('blocks', function(hashes) {
if (!self.syncing)
return;
self._handleInv(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
});
peer.on('headers', function(headers) {
if (!self.syncing)
return;
self._handleHeaders(headers, peer, function(err) {
if (err)
self.emit('error', err);
});
});
} else {
peer.on('blocks', function(hashes) {
if (!self.syncing)
return;
self._handleBlocks(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
});
}
};
/**
@ -638,13 +558,15 @@ Pool.prototype.stopSync = function stopSync() {
Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) {
var self = this;
var ret = {};
var unlock = this.locker.lock(_handleHeaders, [headers, peer, callback]);
var last;
assert(this.options.headers);
if (!unlock)
return;
callback = utils.ensure(callback);
callback = utils.wrap(callback, unlock);
if (headers.length === 0)
if (!this.options.headers)
return callback();
bcoin.debug(
@ -659,21 +581,28 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
this.emit('headers', headers);
// Reset interval to avoid calling getheaders unnecessarily
this._startInterval();
if (peer === this.peers.load) {
// Reset interval to avoid stall behavior.
this._startInterval();
// Reset timeout to avoid killing the loader.
this._startTimer();
}
utils.forEachSerial(headers, function(header, next) {
var hash = header.hash('hex');
if (last && header.prevBlock !== last) {
peer.reject(header, 'invalid', 'bad-prevblk', 100);
// Note: We do _not_ want to add this
// to known rejects. This block may
// very well be valid, but this peer
// is being an asshole right now.
peer.setMisbehavior(100);
return next(new Error('Bad header chain.'));
}
if (!header.verify(ret)) {
peer.reject(header, 'invalid', ret.reason, 100);
self.rejects.add(header.hash());
return next(new VerifyError(header, 'invalid', ret.reason, ret.score));
return next(new Error('Invalid header.'));
}
last = hash;
@ -694,9 +623,9 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
// simply tries to find the latest block in
// the peer's chain.
if (last && headers.length === 2000)
peer.getHeaders(last, null, callback);
else
callback();
return peer.getHeaders(last, null, callback);
callback();
});
};
@ -705,11 +634,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
assert(!this.options.headers);
callback = utils.ensure(callback);
if (hashes.length === 0)
return callback();
bcoin.debug(
'Received %s block hashes from peer (%s).',
hashes.length,
@ -724,16 +648,23 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
this.emit('blocks', hashes);
// Reset interval to avoid calling getblocks unnecessarily
this._startInterval();
// Reset timeout to avoid killing the loader
this._startTimer();
if (peer === this.peers.load) {
// Reset interval to avoid stall behavior.
this._startInterval();
// Reset timeout to avoid killing the loader.
this._startTimer();
}
utils.forEachSerial(hashes, function(hash, next, i) {
// Resolve orphan chain.
if (self.chain.hasOrphan(hash)) {
bcoin.debug('Peer sent a hash that is already a known orphan.');
// There is a possible race condition here.
// The orphan may get resolved by the time
// we create the locator. In that case, we
// should probably actually move to the
// `exists` clause below if it is the last
// hash.
bcoin.debug('Received known orphan hash (%s).', peer.hostname);
return peer.resolveOrphan(null, hash, next);
}
@ -749,11 +680,13 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
// from the last hash (this will reset
// the hashContinue on the remote node).
if (exists && i === hashes.length - 1) {
// Request more hashes:
peer.getBlocks(hash, null, next);
// Re-download the block (traditional method):
// self.getData(peer, self.block.type, hash, true, next);
return;
// Make sure we _actually_ have this block.
if (!self.request.map[hash]) {
bcoin.debug('Received existing hash (%s).', peer.hostname);
return peer.getBlocks(hash, null, next);
}
// Otherwise, we're still requesting it. Ignore.
bcoin.debug('Received already-requested hash (%s).', peer.hostname);
}
next();
@ -770,11 +703,15 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) {
var self = this;
var unlock = this.locker.lock(_handleInv, [hashes, peer, callback]);
callback = utils.ensure(callback);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
// Ignore for now if we're still syncing
if (!this.synced)
if (!this.synced && peer !== this.peers.load)
return callback();
if (!this.options.headers)
@ -796,8 +733,6 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
var self = this;
var requested;
callback = utils.ensure(callback);
// Fulfill the load request.
requested = this.fulfill(block);
@ -826,6 +761,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
peer.setMisbehavior(10);
return callback(err);
}
bcoin.debug('Peer sent an orphan block. Resolving.');
return peer.resolveOrphan(null, block.hash('hex'), function(e) {
self.scheduleRequests(peer);
return callback(e || err);
@ -896,6 +832,72 @@ Pool.prototype._createPeer = function _createPeer(options) {
ts: options.ts
});
peer.once('close', function() {
self._removePeer(peer);
if (self.destroyed)
return;
if (!peer.loader)
return;
self._stopInterval();
self._stopTimer();
if (self.peers.regular.length === 0) {
bcoin.debug('%s %s %s',
'Could not connect to any peers.',
'Do you have a network connection?',
'Retrying in 5 seconds.');
setTimeout(function() {
self._addLoader();
}, 5000);
return;
}
self._addLoader();
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
if (!self.syncing)
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
self._handleBlock(block, peer, function(err) {
if (err)
return self.emit('error', err);
if (peer.loader) {
self._startInterval();
self._startTimer();
}
});
});
peer.on('block', function(block) {
if (self.options.spv)
return;
if (!self.syncing)
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
self._handleBlock(block, peer, function(err) {
if (err)
return self.emit('error', err);
if (peer.loader) {
self._startInterval();
self._startTimer();
}
});
});
peer.on('error', function(err) {
self.emit('error', err, peer);
});
@ -906,7 +908,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
: null;
bcoin.debug(
'Reject (%s): msg=%s ccode=%s reason=%s data=%s.',
'Received reject (%s): msg=%s ccode=%s reason=%s data=%s.',
peer.hostname,
payload.message,
payload.ccode,
@ -972,10 +974,8 @@ Pool.prototype._createPeer = function _createPeer(options) {
self.emit('txs', txs, peer);
if (!self.options.spv) {
if (self.syncing && !self.synced)
return;
}
if (self.syncing && !self.synced)
return;
for (i = 0; i < txs.length; i++) {
hash = txs[i];
@ -1000,6 +1000,26 @@ Pool.prototype._createPeer = function _createPeer(options) {
self.emit('version', version, peer);
});
peer.on('headers', function(headers) {
if (!self.syncing)
return;
self._handleHeaders(headers, peer, function(err) {
if (err)
self.emit('error', err);
});
});
peer.on('blocks', function(hashes) {
if (!self.syncing)
return;
self._handleInv(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
});
return peer;
};
@ -1109,36 +1129,9 @@ Pool.prototype._addLeech = function _addLeech(socket) {
this.peers.all.push(peer);
this.peers.map[peer.host] = peer;
peer.once('close', function() {
self._removePeer(peer);
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
self._handleBlock(block, peer);
});
peer.on('block', function(block) {
if (self.options.spv)
return;
self._handleBlock(block, peer);
});
peer.on('blocks', function(hashes) {
self._handleInv(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
});
utils.nextTick(function() {
self.emit('leech', peer);
});
return peer;
};
Pool.prototype._addPeer = function _addPeer() {
@ -1169,37 +1162,11 @@ Pool.prototype._addPeer = function _addPeer() {
this.peers.all.push(peer);
this.peers.map[peer.host] = peer;
peer.once('close', function() {
self._removePeer(peer);
if (self.destroyed)
return;
self._addPeer();
});
peer.once('ack', function() {
if (self.destroyed)
return;
if (utils.binaryRemove(self.peers.pending, peer, compare))
utils.binaryInsert(self.peers.regular, peer, compare);
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
self._handleBlock(block, peer);
});
peer.on('block', function(block) {
if (self.options.spv)
return;
self._handleBlock(block, peer);
});
peer.on('blocks', function(hashes) {
self._handleInv(hashes, peer);
});
utils.nextTick(function() {
self.emit('peer', peer);
});
@ -1442,6 +1409,8 @@ Pool.prototype._sendRequests = function _sendRequests(peer) {
return;
if (this.options.spv) {
if (this.request.activeBlocks >= 500)
return;
items = peer.queue.block.slice();
peer.queue.block.length = 0;
} else {
@ -1455,6 +1424,9 @@ Pool.prototype._sendRequests = function _sendRequests(peer) {
else
size = 10;
if (this.request.activeBlocks >= size)
return;
items = peer.queue.block.slice(0, size);
peer.queue.block = peer.queue.block.slice(size);
}
@ -1918,13 +1890,12 @@ function LoadRequest(pool, peer, type, hash, callback) {
this.active = false;
this.id = this.pool.uid++;
this.timeout = null;
this.onTimeout = this._onTimeout.bind(this);
this.addCallback(callback);
assert(!this.pool.request.map[this.hash]);
this.pool.request.map[this.hash] = this;
this._finish = this.destroy.bind(this);
}
/**
@ -1935,6 +1906,20 @@ LoadRequest.prototype.destroy = function destroy() {
return this.finish(new Error('Destroyed.'));
};
/**
* Handle timeout. Potentially kill loader.
* @private
*/
LoadRequest.prototype._onTimeout = function _onTimeout() {
if (this.type === this.pool.block.type
&& this.peer === this.pool.peers.load) {
bcoin.debug('Loader took too long serving a block. Finding a new one.');
this.peer.destroy();
}
return this.finish(new Error('Timed out.'));
};
/**
* Add a callback to be executed when item is received.
* @param {Function} callback
@ -1950,8 +1935,7 @@ LoadRequest.prototype.addCallback = function addCallback(callback) {
*/
LoadRequest.prototype.start = function start() {
this.timeout = setTimeout(this._finish, this.pool.requestTimeout);
this.peer.on('close', this._finish);
this.timeout = setTimeout(this.onTimeout, this.pool.requestTimeout);
this.active = true;
this.pool.request.active++;
@ -1990,8 +1974,6 @@ LoadRequest.prototype.finish = function finish(err) {
else
utils.binaryRemove(this.peer.queue.block, this, compare);
this.peer.removeListener('close', this._finish);
if (this.timeout != null) {
clearTimeout(this.timeout);
this.timeout = null;