From f17575f8646d68bbce626481a621dcac0831bca2 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 17 Feb 2016 02:53:43 -0800 Subject: [PATCH] queue --- lib/bcoin/blockdb.js | 12 ++++++-- lib/bcoin/peer.js | 1 + lib/bcoin/pool.js | 67 ++++++++++++++++++++++++-------------------- lib/bcoin/tx.js | 1 - 4 files changed, 46 insertions(+), 35 deletions(-) diff --git a/lib/bcoin/blockdb.js b/lib/bcoin/blockdb.js index 1a898767..7f10e6a6 100644 --- a/lib/bcoin/blockdb.js +++ b/lib/bcoin/blockdb.js @@ -281,6 +281,7 @@ BlockDB.prototype.removeBlock = function removeBlock(hash, callback) { return callback(err); // TODO: Add check to make sure we // can ONLY remove the last block. + assert(block._fileOffset >= 0); self.data.truncateAsync(block._fileOffset, function(err) { if (err) return callback(err); @@ -325,10 +326,12 @@ BlockDB.prototype.removeBlock = function removeBlock(hash, callback) { if (uaddr) batch.del('t/a/' + uaddr + '/' + hash); + assert(input.output._fileOffset >= 0); + coinOffset = self.createOffset( input.output._size, - block._fileOffset + tx._offset + input.output._offset, - block.height + input.output._fileOffset, + input.output.height ); if (address) { @@ -470,8 +473,10 @@ BlockDB.prototype.fillTX = function fillTX(tx, callback) { if (err) return callback(err); - if (tx) + if (tx) { input.output = bcoin.coin(tx, input.prevout.index); + input.output._fileOffset = tx._fileOffset + input.output._offset; + } if (!--pending) callback(null, tx); @@ -790,6 +795,7 @@ BlockDB.prototype.getTX = function getTX(hash, callback) { tx.height = record.height; tx.ts = entry.ts; tx.block = entry.hash; + tx._fileOffset = record.offset; if (self.options.paranoid && tx.hash('hex') !== hash) return callback(new Error('BlockDB is corrupt. All is lost.')); } diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 5f9037fc..71ef673d 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -52,6 +52,7 @@ function Peer(pool, options) { this.banScore = 0; this.orphans = 0; this.orphanTime = 0; + this.activeBlocks = 0; if (options.socket) { this.socket = options.socket; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index a3b763ab..aa2637f3 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -102,8 +102,6 @@ function Pool(options) { (Math.random() * 0xffffffff) | 0 ); - this._handlingBlock = 0; - this.peers = { // Peers that are loading blocks themselves regular: [], @@ -135,7 +133,8 @@ function Pool(options) { this.request = { map: {}, active: 0, - queue: [] + queue: [], + activeBlocks: 0 }; this.validate = { @@ -615,15 +614,33 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback) Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; + this._pending = this._pending || []; + + if (this._lock) { + this._pending.push([block, peer, callback]); + return; + } + + this._lock = true; + callback = utils.asyncify(callback); function done(err, result) { - if (!--self._handlingBlock) - self.emit('flush'); - callback(err, result); - } + var item; - this._handlingBlock++; + self._lock = false; + + callback(err, result); + + if (self._pending.length === 0) { + self._nextBlock(peer); + return; + } + + item = self._pending.shift(); + + self._handleBlock(item[0], item[1], item[2]); + } this._prehandleBlock(block, peer, function(err) { var requested; @@ -760,11 +777,6 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) { self.emit('chain-progress', self.chain.fillPercent(), peer); - if (added > 0 && peer._requesting) - delete peer._requesting; - - self._nextBlock(peer); - self.block.total += added; if (self.chain.height() % 20 === 0) { @@ -1588,26 +1600,15 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { }); }; -Pool.prototype._startRequests = function _startRequests(peer) { - var self = this; - - if (!this._handlingBlock) - return this._nextBlock(peer); - - this.on('flush', function() { - self._nextBlock(peer); - }); -}; - Pool.prototype._nextBlock = function _nextBlock(peer) { var item; - if (peer._requesting) + if (peer.activeBlocks > 0) return; // item = peer._blockQueue.shift(); - item = peer._blockQueue.slice(0, 5); - peer._blockQueue = peer._blockQueue.slice(5); + item = peer._blockQueue.slice(0, 250); + peer._blockQueue = peer._blockQueue.slice(250); if (peer.destroyed) return; @@ -1615,8 +1616,6 @@ Pool.prototype._nextBlock = function _nextBlock(peer) { if (!item.length) return; - // XXX MAYBE ONLY CREATE LOAD REQUEST HERE - utils.debug( 'Requesting block %s (%d/%d) from %s with getdata (height %d)', utils.revHex(item[item.length - 1].hash), @@ -1625,7 +1624,8 @@ Pool.prototype._nextBlock = function _nextBlock(peer) { peer.host, this.chain.height()); - peer._requesting = item; + peer.activeBlocks += item.length; + this.request.activeBlocks += item.length; peer.getData(item); }; @@ -1648,7 +1648,7 @@ Pool.prototype._response = function _response(hash) { cb(); }); - return true; + return item; }; Pool.prototype.getBlock = function getBlock(hash, cb) { @@ -2026,8 +2026,13 @@ LoadRequest.prototype.finish = function finish() { if (this.pool.request.map[this.hash]) { delete this.pool.request.map[this.hash]; this.pool.request.active--; + if (this.type !== 'tx') + this.pool.request.activeBlocks--; } + if (this.type !== 'tx') + this.peer.activeBlocks--; + if (this.type === 'tx') { index = this.peer._txQueue.indexOf(this); if (index !== -1) diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index f5e13827..1dfeba5d 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -692,7 +692,6 @@ TX.prototype.addOutput = function addOutput(obj, value) { } output = bcoin.output({ - tx: this, value: options.value, script: options.script, _size: options._size,