diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index de9464a0..f67f864f 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -67,10 +67,9 @@ function Chain(options) { this.network = bcoin.network.get(options.network); this.db = new bcoin.chaindb(this, options); this.total = 0; - this.adding = false; + this.currentBlock = null; this.orphanLimit = options.orphanLimit || (20 << 20); - this.pendingLimit = options.pendingLimit || (1024 << 20); - this.locker = new bcoin.locker(this, this.add, this.pendingLimit); + this.locker = new bcoin.locker(this, this.add); this.invalid = {}; this.bestHeight = -1; this.tip = null; @@ -1278,7 +1277,6 @@ Chain.prototype.reset = function reset(height, callback, force) { // have been some orphans on a forked chain we // no longer need. self.purgeOrphans(); - self.purgePending(); callback(null, result); }); @@ -1328,7 +1326,9 @@ Chain.prototype.onDrain = function onDrain(callback) { */ Chain.prototype.isBusy = function isBusy() { - return this.adding || this.locker.pending.length > 0; + if (this.currentBlock) + return true; + return this.locker.pending.length > 0; }; /** @@ -1352,13 +1352,13 @@ Chain.prototype.add = function add(block, callback, force) { callback = utils.wrap(callback, unlock); - this.adding = true; - (function next(block, initial) { var hash = block.hash('hex'); var prevHash = block.prevBlock; var height, checkpoint, orphan, entry; + self.currentBlock = hash; + function handleOrphans() { // No orphan chain. if (!self.orphan.map[hash]) @@ -1506,7 +1506,6 @@ Chain.prototype.add = function add(block, callback, force) { // who isn't trying to fool us. if (hash !== checkpoint) { self.purgeOrphans(); - self.purgePending(); self.emit('fork', block, { height: height, @@ -1627,7 +1626,7 @@ Chain.prototype.add = function add(block, callback, force) { self.emit('full'); } - self.adding = false; + self.currentBlock = null; if (err) callback(err); @@ -1699,17 +1698,6 @@ Chain.prototype.pruneOrphans = function pruneOrphans() { this.orphan.size += best.getSize(); }; -/** - * Purge any pending blocks in the queue: note that - * this call is unpredictable and may screw up the - * blockchain sync. It is only used as a last resort - * if more than 500mb of pending blocks are in the queue. - */ - -Chain.prototype.purgePending = function purgePending() { - return this.locker.purgePending(); -}; - /** * Test the chain to see if it has a block, orphan, or pending block. * @param {Hash} hash @@ -1723,6 +1711,9 @@ Chain.prototype.has = function has(hash, callback) { if (this.hasPending(hash)) return callback(null, true); + if (hash === this.currentBlock) + return callback(null, true); + return this.hasBlock(hash, callback); }; @@ -1902,13 +1893,13 @@ Chain.prototype.getLocator = function getLocator(start, callback, force) { if (start == null) start = this.tip.hash; - return self.db.get(start, function(err, entry) { + return this.db.get(start, function(err, entry) { if (err) return callback(err); if (!entry) { // We could simply return `start` here, - // but there is no standardized "spacing" + // but there is no required "spacing" // for locator hashes. Pretend this hash // is our tip. This is useful for // getheaders. @@ -1944,6 +1935,8 @@ Chain.prototype.getLocator = function getLocator(start, callback, force) { if (!main) return entry.getAncestorByHeight(height, next); + // If we're in the main chain, we can + // do an O(1) lookup of the hash. self.db.getHash(height, function(err, hash) { if (err) return next(err); diff --git a/lib/bcoin/fees.js b/lib/bcoin/fees.js index ed1acfc4..ec9d38a9 100644 --- a/lib/bcoin/fees.js +++ b/lib/bcoin/fees.js @@ -208,21 +208,21 @@ ConfirmStats.prototype.estimateMedian = function estimateMedian(target, needed, } } - bcoin.debug('estimatefee: target=%d.' - + ' For conf success %s %d need %s %s: %d from buckets %d - %d.' - + ' Cur Bucket stats %d% %d/%d (%d mempool).', - target, - greater ? '>' : '<', - breakpoint, - this.type, - greater ? '>' : '<', - median, - this.buckets[minBucket], - this.buckets[maxBucket], - 100 * conf / Math.max(1, total + extra), - conf, - total, - extra); + // bcoin.debug('estimatefee: target=%d.' + // + ' For conf success %s %d need %s %s: %d from buckets %d - %d.' + // + ' Cur Bucket stats %d% %d/%d (%d mempool).', + // target, + // greater ? '>' : '<', + // breakpoint, + // this.type, + // greater ? '>' : '<', + // median, + // this.buckets[minBucket], + // this.buckets[maxBucket], + // 100 * conf / Math.max(1, total + extra), + // conf, + // total, + // extra); return median; }; @@ -485,6 +485,9 @@ PolicyEstimator.prototype.processBlock = function processBlock(height, entries, this.bestHeight = height; + if (entries.length === 0) + return; + // Wait for chain to sync. if (!current) return; diff --git a/lib/bcoin/locker.js b/lib/bcoin/locker.js index 8f78043d..ce36e2fd 100644 --- a/lib/bcoin/locker.js +++ b/lib/bcoin/locker.js @@ -15,12 +15,11 @@ var assert = utils.assert; * @constructor * @param {Function} parent - Parent constructor. * @param {Function?} add - `add` method (whichever method is queuing data). - * @param {Number?} limit - Limit in bytes of data that can be queued. */ -function Locker(parent, add, limit) { +function Locker(parent, add) { if (!(this instanceof Locker)) - return Locker(parent, add, limit); + return Locker(parent, add); this.parent = parent; this.jobs = []; @@ -28,8 +27,6 @@ function Locker(parent, add, limit) { this.pending = []; this.pendingMap = {}; - this.pendingSize = 0; - this.pendingLimit = limit || (20 << 20); this.add = add; } @@ -52,7 +49,7 @@ Locker.prototype.hasPending = function hasPending(key) { * @param {Function} func - The method being called. * @param {Array} args - Arguments passed to the method. * @param {Boolean?} force - Force a call. - * @returns {Function} unlock - Unlocker - must be + * @returns {Function} Unlocker - must be * called once the method finishes executing in order * to resolve the queue. */ @@ -74,11 +71,6 @@ Locker.prototype.lock = function lock(func, args, force) { obj = args[0]; this.pending.push(obj); this.pendingMap[obj.hash('hex')] = true; - this.pendingSize += obj.getSize(); - if (this.pendingSize > this.pendingLimit) { - this.purgePending(); - return; - } } this.jobs.push([func, args]); return; @@ -108,7 +100,6 @@ Locker.prototype.lock = function lock(func, args, force) { obj = item[1][0]; assert(obj === self.pending.shift()); delete self.pendingMap[obj.hash('hex')]; - self.pendingSize -= obj.getSize(); } item[0].apply(self.parent, item[1]); @@ -120,36 +111,9 @@ Locker.prototype.lock = function lock(func, args, force) { */ Locker.prototype.destroy = function destroy() { - if (this.add) - this.purgePending(); - this.jobs.length = 0; -}; - -/** - * Purge all pending calls (called once `add` has hit `limit`). - */ - -Locker.prototype.purgePending = function purgePending() { - var self = this; - var total = this.pending.length; - - assert(this.add); - - this.emit('purge', total, this.pendingSize); - - this.pending.forEach(function(obj) { - delete self.pendingMap[obj.hash('hex')]; - }); - this.pending.length = 0; - this.pendingSize = 0; - - this.jobs = this.jobs.filter(function(item) { - return item[0] !== self.add; - }); - - if (total !== 0) - this.emit('drain'); + this.pendingMap = {}; + this.jobs.length = 0; }; /** diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index c016d862..f5787329 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -74,8 +74,7 @@ function Mempool(options) { this.network = this.chain.network; this.loaded = false; - this.locker = new bcoin.locker(this, this.addTX, 100 << 20); - this.writeLock = new bcoin.locker(this); + this.locker = new bcoin.locker(this, this.addTX); this.db = null; this.size = 0; @@ -114,14 +113,6 @@ Mempool.prototype._lock = function _lock(func, args, force) { return this.locker.lock(func, args, force); }; -/** - * Purge pending txs in the queue. - */ - -Mempool.prototype.purgePending = function purgePending() { - return this.locker.purgePending(); -}; - Mempool.prototype._init = function _init() { var self = this; var unlock = this._lock(utils.nop, []); diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 56a8dd4b..e4f001b5 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -1800,7 +1800,7 @@ Peer.prototype.sync = function sync(callback) { return; if (!this.loader) { - if (this.chain.tip.ts <= bcoin.now() - 24 * 60 * 60) + if (!this.chain.isFull()) return; } diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index e826cff1..e5cd3910 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -125,6 +125,7 @@ function Pool(options) { this.connected = false; this.uid = 0; this._createServer = options.createServer; + this.locker = new bcoin.locker(this); this.syncing = false; this.synced = false; @@ -494,90 +495,9 @@ Pool.prototype._addLoader = function _addLoader() { this.peers.all.push(peer); this.peers.map[peer.host] = peer; - peer.once('close', function() { - self._stopInterval(); - self._stopTimer(); - self._removePeer(peer); - if (self.peers.regular.length === 0) { - bcoin.debug('%s %s %s', - 'Could not connect to any peers.', - 'Do you have a network connection?', - 'Retrying in 5 seconds.'); - setTimeout(function() { - self._addLoader(); - }, 5000); - return; - } - self._addLoader(); + utils.nextTick(function() { + self.emit('loader', peer); }); - - peer.on('merkleblock', function(block) { - if (!self.options.spv) - return; - - if (!self.syncing) - return; - - // If the peer sent us a block that was added - // to the chain (not orphans), reset the timeout. - self._handleBlock(block, peer, function(err) { - if (err) - return self.emit('error', err); - - self._startInterval(); - self._startTimer(); - }); - }); - - peer.on('block', function(block) { - if (self.options.spv) - return; - - if (!self.syncing) - return; - - // If the peer sent us a block that was added - // to the chain (not orphans), reset the timeout. - self._handleBlock(block, peer, function(err) { - if (err) - return self.emit('error', err); - - self._startInterval(); - self._startTimer(); - }); - }); - - if (self.options.headers) { - peer.on('blocks', function(hashes) { - if (!self.syncing) - return; - - self._handleInv(hashes, peer, function(err) { - if (err) - self.emit('error', err); - }); - }); - - peer.on('headers', function(headers) { - if (!self.syncing) - return; - - self._handleHeaders(headers, peer, function(err) { - if (err) - self.emit('error', err); - }); - }); - } else { - peer.on('blocks', function(hashes) { - if (!self.syncing) - return; - - self._handleBlocks(hashes, peer, function(err) { - if (err) - self.emit('error', err); - }); - }); - } }; /** @@ -638,13 +558,15 @@ Pool.prototype.stopSync = function stopSync() { Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) { var self = this; var ret = {}; + var unlock = this.locker.lock(_handleHeaders, [headers, peer, callback]); var last; - assert(this.options.headers); + if (!unlock) + return; - callback = utils.ensure(callback); + callback = utils.wrap(callback, unlock); - if (headers.length === 0) + if (!this.options.headers) return callback(); bcoin.debug( @@ -659,21 +581,28 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) this.emit('headers', headers); - // Reset interval to avoid calling getheaders unnecessarily - this._startInterval(); + if (peer === this.peers.load) { + // Reset interval to avoid stall behavior. + this._startInterval(); + // Reset timeout to avoid killing the loader. + this._startTimer(); + } utils.forEachSerial(headers, function(header, next) { var hash = header.hash('hex'); if (last && header.prevBlock !== last) { - peer.reject(header, 'invalid', 'bad-prevblk', 100); + // Note: We do _not_ want to add this + // to known rejects. This block may + // very well be valid, but this peer + // is being an asshole right now. + peer.setMisbehavior(100); return next(new Error('Bad header chain.')); } if (!header.verify(ret)) { peer.reject(header, 'invalid', ret.reason, 100); - self.rejects.add(header.hash()); - return next(new VerifyError(header, 'invalid', ret.reason, ret.score)); + return next(new Error('Invalid header.')); } last = hash; @@ -694,9 +623,9 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) // simply tries to find the latest block in // the peer's chain. if (last && headers.length === 2000) - peer.getHeaders(last, null, callback); - else - callback(); + return peer.getHeaders(last, null, callback); + + callback(); }); }; @@ -705,11 +634,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { assert(!this.options.headers); - callback = utils.ensure(callback); - - if (hashes.length === 0) - return callback(); - bcoin.debug( 'Received %s block hashes from peer (%s).', hashes.length, @@ -724,16 +648,23 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { this.emit('blocks', hashes); - // Reset interval to avoid calling getblocks unnecessarily - this._startInterval(); - - // Reset timeout to avoid killing the loader - this._startTimer(); + if (peer === this.peers.load) { + // Reset interval to avoid stall behavior. + this._startInterval(); + // Reset timeout to avoid killing the loader. + this._startTimer(); + } utils.forEachSerial(hashes, function(hash, next, i) { // Resolve orphan chain. if (self.chain.hasOrphan(hash)) { - bcoin.debug('Peer sent a hash that is already a known orphan.'); + // There is a possible race condition here. + // The orphan may get resolved by the time + // we create the locator. In that case, we + // should probably actually move to the + // `exists` clause below if it is the last + // hash. + bcoin.debug('Received known orphan hash (%s).', peer.hostname); return peer.resolveOrphan(null, hash, next); } @@ -749,11 +680,13 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { // from the last hash (this will reset // the hashContinue on the remote node). if (exists && i === hashes.length - 1) { - // Request more hashes: - peer.getBlocks(hash, null, next); - // Re-download the block (traditional method): - // self.getData(peer, self.block.type, hash, true, next); - return; + // Make sure we _actually_ have this block. + if (!self.request.map[hash]) { + bcoin.debug('Received existing hash (%s).', peer.hostname); + return peer.getBlocks(hash, null, next); + } + // Otherwise, we're still requesting it. Ignore. + bcoin.debug('Received already-requested hash (%s).', peer.hostname); } next(); @@ -770,11 +703,15 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) { var self = this; + var unlock = this.locker.lock(_handleInv, [hashes, peer, callback]); - callback = utils.ensure(callback); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); // Ignore for now if we're still syncing - if (!this.synced) + if (!this.synced && peer !== this.peers.load) return callback(); if (!this.options.headers) @@ -796,8 +733,6 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; var requested; - callback = utils.ensure(callback); - // Fulfill the load request. requested = this.fulfill(block); @@ -826,6 +761,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { peer.setMisbehavior(10); return callback(err); } + bcoin.debug('Peer sent an orphan block. Resolving.'); return peer.resolveOrphan(null, block.hash('hex'), function(e) { self.scheduleRequests(peer); return callback(e || err); @@ -896,6 +832,72 @@ Pool.prototype._createPeer = function _createPeer(options) { ts: options.ts }); + peer.once('close', function() { + self._removePeer(peer); + + if (self.destroyed) + return; + + if (!peer.loader) + return; + + self._stopInterval(); + self._stopTimer(); + + if (self.peers.regular.length === 0) { + bcoin.debug('%s %s %s', + 'Could not connect to any peers.', + 'Do you have a network connection?', + 'Retrying in 5 seconds.'); + setTimeout(function() { + self._addLoader(); + }, 5000); + return; + } + + self._addLoader(); + }); + + peer.on('merkleblock', function(block) { + if (!self.options.spv) + return; + + if (!self.syncing) + return; + + // If the peer sent us a block that was added + // to the chain (not orphans), reset the timeout. + self._handleBlock(block, peer, function(err) { + if (err) + return self.emit('error', err); + + if (peer.loader) { + self._startInterval(); + self._startTimer(); + } + }); + }); + + peer.on('block', function(block) { + if (self.options.spv) + return; + + if (!self.syncing) + return; + + // If the peer sent us a block that was added + // to the chain (not orphans), reset the timeout. + self._handleBlock(block, peer, function(err) { + if (err) + return self.emit('error', err); + + if (peer.loader) { + self._startInterval(); + self._startTimer(); + } + }); + }); + peer.on('error', function(err) { self.emit('error', err, peer); }); @@ -906,7 +908,7 @@ Pool.prototype._createPeer = function _createPeer(options) { : null; bcoin.debug( - 'Reject (%s): msg=%s ccode=%s reason=%s data=%s.', + 'Received reject (%s): msg=%s ccode=%s reason=%s data=%s.', peer.hostname, payload.message, payload.ccode, @@ -972,10 +974,8 @@ Pool.prototype._createPeer = function _createPeer(options) { self.emit('txs', txs, peer); - if (!self.options.spv) { - if (self.syncing && !self.synced) - return; - } + if (self.syncing && !self.synced) + return; for (i = 0; i < txs.length; i++) { hash = txs[i]; @@ -1000,6 +1000,26 @@ Pool.prototype._createPeer = function _createPeer(options) { self.emit('version', version, peer); }); + peer.on('headers', function(headers) { + if (!self.syncing) + return; + + self._handleHeaders(headers, peer, function(err) { + if (err) + self.emit('error', err); + }); + }); + + peer.on('blocks', function(hashes) { + if (!self.syncing) + return; + + self._handleInv(hashes, peer, function(err) { + if (err) + self.emit('error', err); + }); + }); + return peer; }; @@ -1109,36 +1129,9 @@ Pool.prototype._addLeech = function _addLeech(socket) { this.peers.all.push(peer); this.peers.map[peer.host] = peer; - peer.once('close', function() { - self._removePeer(peer); - }); - - peer.on('merkleblock', function(block) { - if (!self.options.spv) - return; - - self._handleBlock(block, peer); - }); - - peer.on('block', function(block) { - if (self.options.spv) - return; - - self._handleBlock(block, peer); - }); - - peer.on('blocks', function(hashes) { - self._handleInv(hashes, peer, function(err) { - if (err) - self.emit('error', err); - }); - }); - utils.nextTick(function() { self.emit('leech', peer); }); - - return peer; }; Pool.prototype._addPeer = function _addPeer() { @@ -1169,37 +1162,11 @@ Pool.prototype._addPeer = function _addPeer() { this.peers.all.push(peer); this.peers.map[peer.host] = peer; - peer.once('close', function() { - self._removePeer(peer); - if (self.destroyed) - return; - self._addPeer(); - }); - peer.once('ack', function() { - if (self.destroyed) - return; - if (utils.binaryRemove(self.peers.pending, peer, compare)) utils.binaryInsert(self.peers.regular, peer, compare); }); - peer.on('merkleblock', function(block) { - if (!self.options.spv) - return; - self._handleBlock(block, peer); - }); - - peer.on('block', function(block) { - if (self.options.spv) - return; - self._handleBlock(block, peer); - }); - - peer.on('blocks', function(hashes) { - self._handleInv(hashes, peer); - }); - utils.nextTick(function() { self.emit('peer', peer); }); @@ -1442,6 +1409,8 @@ Pool.prototype._sendRequests = function _sendRequests(peer) { return; if (this.options.spv) { + if (this.request.activeBlocks >= 500) + return; items = peer.queue.block.slice(); peer.queue.block.length = 0; } else { @@ -1455,6 +1424,9 @@ Pool.prototype._sendRequests = function _sendRequests(peer) { else size = 10; + if (this.request.activeBlocks >= size) + return; + items = peer.queue.block.slice(0, size); peer.queue.block = peer.queue.block.slice(size); } @@ -1918,13 +1890,12 @@ function LoadRequest(pool, peer, type, hash, callback) { this.active = false; this.id = this.pool.uid++; this.timeout = null; + this.onTimeout = this._onTimeout.bind(this); this.addCallback(callback); assert(!this.pool.request.map[this.hash]); this.pool.request.map[this.hash] = this; - - this._finish = this.destroy.bind(this); } /** @@ -1935,6 +1906,20 @@ LoadRequest.prototype.destroy = function destroy() { return this.finish(new Error('Destroyed.')); }; +/** + * Handle timeout. Potentially kill loader. + * @private + */ + +LoadRequest.prototype._onTimeout = function _onTimeout() { + if (this.type === this.pool.block.type + && this.peer === this.pool.peers.load) { + bcoin.debug('Loader took too long serving a block. Finding a new one.'); + this.peer.destroy(); + } + return this.finish(new Error('Timed out.')); +}; + /** * Add a callback to be executed when item is received. * @param {Function} callback @@ -1950,8 +1935,7 @@ LoadRequest.prototype.addCallback = function addCallback(callback) { */ LoadRequest.prototype.start = function start() { - this.timeout = setTimeout(this._finish, this.pool.requestTimeout); - this.peer.on('close', this._finish); + this.timeout = setTimeout(this.onTimeout, this.pool.requestTimeout); this.active = true; this.pool.request.active++; @@ -1990,8 +1974,6 @@ LoadRequest.prototype.finish = function finish(err) { else utils.binaryRemove(this.peer.queue.block, this, compare); - this.peer.removeListener('close', this._finish); - if (this.timeout != null) { clearTimeout(this.timeout); this.timeout = null;