From 8043be6e40b06020d8524db16d3cd754b0224c3b Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 8 May 2014 16:28:45 +0400 Subject: [PATCH] pool: make some functions asynchronous --- lib/bcoin/chain.js | 68 +++++++++++++++++++++++------- lib/bcoin/pool.js | 99 ++++++++++++++++++++++++++------------------ lib/bcoin/tx-pool.js | 16 +++---- lib/bcoin/utils.js | 8 ++++ lib/bcoin/wallet.js | 8 +++- 5 files changed, 135 insertions(+), 64 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 60e8ddf8..58ac8f4f 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -40,6 +40,7 @@ function Chain(options) { this.fromJSON(preload); + this.loading = false; this._init(); } util.inherits(Chain, EventEmitter); @@ -53,18 +54,23 @@ Chain.prototype._init = function _init() { if (!this._storage) return; + this.loading = true; var self = this; var s = this._storage.createReadStream({ start: this._prefix, end: this._prefix + 'z' - }) + }); s.on('data', function(data) { var hash = data.key.slice(self.prefix.length); - self.addIndex(hash, data.value.ts, data.value.height); + self._addIndex(hash, data.value.ts, data.value.height); }); s.on('error', function(err) { self.emit('error', err); }); + s.on('end', function() { + self.loading = false; + self.emit('load'); + }); }; Chain.prototype._getRange = function _getRange(hash, ts, futureOnly) { @@ -88,7 +94,7 @@ Chain.prototype._getRange = function _getRange(hash, ts, futureOnly) { return { start: start, end: end }; }; -Chain.prototype.probeIndex = function probeIndex(hash, ts) { +Chain.prototype._probeIndex = function _probeIndex(hash, ts) { if (!this.index.bloom.test(hash, 'hex')) return false; if (!this.strict) @@ -109,8 +115,8 @@ Chain.prototype.probeIndex = function probeIndex(hash, ts) { return false; }; -Chain.prototype.addIndex = function addIndex(hash, ts, height) { - if (this.probeIndex(hash, ts)) +Chain.prototype._addIndex = function _addIndex(hash, ts, height) { + if (this._probeIndex(hash, ts)) return; var pos = utils.binaryInsert(this.index.ts, ts, compareTs, true); @@ -142,6 +148,13 @@ Chain.prototype.addIndex = function addIndex(hash, ts, height) { }; Chain.prototype.add = function add(block) { + if (this.loading) { + this.once('load', function() { + this.add(block); + }); + return; + } + var res = false; var initial = block; do { @@ -157,7 +170,8 @@ Chain.prototype.add = function add(block) { break; // If previous block wasn't ever seen - add current to orphans - if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) { + if (!this._probeIndex(hash, block.ts) && + !this._probeIndex(prev, block.ts)) { this.orphan.count++; this.orphan.map[prev] = block; @@ -168,7 +182,7 @@ Chain.prototype.add = function add(block) { } // Validated known block at this point - add it to index - this.addIndex(hash, block.ts); + this._addIndex(hash, block.ts); // At least one block was added res = true; @@ -213,18 +227,30 @@ Chain.prototype._bloomBlock = function _bloomBlock(block) { this.block.merkleBloom.add(block.hashes[i], 'hex'); }; -Chain.prototype.has = function has(hash, noProbe) { +Chain.prototype.has = function has(hash, noProbe, cb) { + if (typeof noProbe === 'function') { + cb = noProbe; + noProbe = false; + } + if (this.loading) { + this.once('load', function() { + this.has(hash, noProbe, cb); + }); + return; + } + cb = utils.asyncify(cb); + if (noProbe && this.block.bloom.test(hash, 'hex')) { if (this.strict) { for (var i = 0; i < this.block.list.length; i++) if (this.block.list[i].hash('hex') === hash) - return true; + return cb(true); } else { - return true; + return cb(true); } } - return !noProbe && this.probeIndex(hash) && !!this.orphan.map[hash]; + return cb(!noProbe && this._probeIndex(hash) && !!this.orphan.map[hash]); }; Chain.prototype.hasMerkle = function hasMerkle(hash) { @@ -268,7 +294,14 @@ Chain.prototype.isFull = function isFull() { (+new Date / 1000) - this.index.ts[this.index.ts.length - 1] < 40 * 60; }; -Chain.prototype.hashesInRange = function hashesInRange(start, end) { +Chain.prototype.hashesInRange = function hashesInRange(start, end, cb) { + if (this.loading) { + this.once('load', function() { + this.hashesInRange(start, end, cb); + }); + return; + } + cb = utils.asyncify(cb); var ts = this.index.ts; start = utils.binaryInsert(ts, start, compareTs, true); @@ -279,8 +312,15 @@ Chain.prototype.hashesInRange = function hashesInRange(start, end) { return this.index.hashes.slice(start, end); }; -Chain.prototype.getLast = function getLast() { - return this.index.hashes[this.index.hashes.length - 1]; +Chain.prototype.getLast = function getLast(cb) { + if (this.loading) { + this.once('load', function() { + this.getLast(cb); + }); + return; + } + cb = utils.asyncify(cb); + return cb(this.index.hashes[this.index.hashes.length - 1]); }; Chain.prototype.toJSON = function toJSON() { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index de9d3d54..baa644bc 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -156,20 +156,23 @@ Pool.prototype._addLoader = function _addLoader() { self._scheduleRequests(); // The part of the response is in chain, no need to escalate requests - var allNew = hashes.every(function(hash) { - return !self.chain.has(utils.toHex(hash)); + async.every(hashes, function(hash, cb) { + self.chain.has(utils.toHex(hash), function(res) { + cb(!res); + }); + }, function(allNew) { + if (!allNew) + return; + + // Store last hash to continue global load + self.block.lastHash = hashes[hashes.length - 1]; + + clearTimeout(timer); + + // Reinstantiate timeout + if (self._load()) + timer = setTimeout(destroy, self.load.timeout); }); - if (!allNew) - return; - - // Store last hash to continue global load - self.block.lastHash = hashes[hashes.length - 1]; - - clearTimeout(timer); - - // Reinstantiate timeout - if (self._load()) - timer = setTimeout(destroy, self.load.timeout); }); }; @@ -200,18 +203,20 @@ Pool.prototype._load = function _load() { return false; } this.load.hiReached = false; + var self = this; // Load more blocks, starting from last hash - var hash; if (this.block.lastHash) - hash = this.block.lastHash; + next(this.block.lastHash); else - hash = this.chain.getLast(); + this.chain.getLast(next); - if (!this.peers.load) - this._addLoader(); - else - this.peers.load.loadBlocks([ hash ]); + function next(hash) { + if (!self.peers.load) + self._addLoader(); + else + self.peers.load.loadBlocks([ hash ]); + } return true; }; @@ -366,16 +371,19 @@ Pool.prototype.search = function search(id, range) { var self = this; var e = new EventEmitter(); - var hashes = this.chain.hashesInRange(range.start, range.end); - var waiting = hashes.length; + this.chain.hashesInRange(range.start, range.end, function(hashes) { + var waiting = hashes.length; - if (id) - this.watch(id); + if (id) + self.watch(id); - this._loadRange(hashes); - hashes.slice().reverse().forEach(function(hash) { - // Get the block that is in index - this.chain.get(hash, function(block) { + self._loadRange(hashes); + hashes.slice().reverse().forEach(function(hash) { + // Get the block that is in index + self.chain.get(hash, loadBlock); + }); + + function loadBlock(block) { // Get block's prev and request it and all of it's parents up to // the next known block hash self.chain.get(block.prevBlock, function() { @@ -387,15 +395,15 @@ Pool.prototype.search = function search(id, range) { e.emit('end'); } }); - }); - }, this); + } - // Empty search - if (hashes.length === 0) { - bcoin.utils.nextTick(function() { - e.emit('end', true); - }); - } + // Empty search + if (hashes.length === 0) { + bcoin.utils.nextTick(function() { + e.emit('end', true); + }); + } + }); return e; }; @@ -416,12 +424,21 @@ Pool.prototype._request = function _request(type, hash, options, cb) { if (this.request.map[hex]) return this.request.map[hex].addCallback(cb); - // Block should be not in chain, or be requested - if (type === 'block' && this.chain.has(hash, options.force)) - return; + var self = this; - var req = new LoadRequest(this, type, hex, cb); - req.add(options.noQueue); + // Block should be not in chain, or be requested + if (type === 'block') + return this.chain.has(hash, options.force, next) + else + return next(false); + + function next(has) { + if (has) + return; + + var req = new LoadRequest(self, type, hex, cb); + req.add(options.noQueue); + } }; Pool.prototype._response = function _response(entity) { diff --git a/lib/bcoin/tx-pool.js b/lib/bcoin/tx-pool.js index f649c091..582bbb4a 100644 --- a/lib/bcoin/tx-pool.js +++ b/lib/bcoin/tx-pool.js @@ -34,7 +34,7 @@ TXPool.prototype._init = function init() { end: this._prefix + 'z' }) s.on('data', function(data) { - self.add(bcoin.tx.fromJSON(data.value), true); + self.add(bcoin.tx.fromJSON(data), true); }); s.on('error', function(err) { self.emit('error', err); @@ -44,14 +44,13 @@ TXPool.prototype._init = function init() { TXPool.prototype.add = function add(tx, noWrite) { var hash = tx.hash('hex'); - if (!this._wallet.own(tx)) - return; - // Do not add TX two times if (this._all[hash]) return false; this._all[hash] = tx; + var own = this._wallet.own(tx); + // Consume unspent money or add orphans for (var i = 0; i < tx.inputs.length; i++) { var input = tx.inputs[i]; @@ -62,17 +61,21 @@ TXPool.prototype.add = function add(tx, noWrite) { // Add TX to inputs and spend money tx.input(unspent.tx, unspent.index); delete this._unspent[key]; + this.emit('update'); continue; } // Double-spend?! - if (this._orphans[key]) + if (!own || this._orphans[key]) continue; // Add orphan, if no parent transaction is yet known this._orphans[key] = { tx: tx, index: i }; } + if (!own) + return; + // Add unspent outputs or fullfill orphans for (var i = 0; i < tx.outputs.length; i++) { var out = tx.outputs[i]; @@ -81,6 +84,7 @@ TXPool.prototype.add = function add(tx, noWrite) { var orphan = this._orphans[key]; if (!orphan) { this._unspent[key] = { tx: tx, index: i }; + this.emit('update'); continue; } delete this._orphans[key]; @@ -97,8 +101,6 @@ TXPool.prototype.add = function add(tx, noWrite) { }); } - this.emit('tx', tx); - return true; }; diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 3c4177c7..9e1336fb 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -317,3 +317,11 @@ RequestCache.prototype.fullfill = function fullfill(id, err, data) { cb(err, data); }); }; + +utils.asyncify = function asyncify(fn) { + return function _asynicifedFn(err, data1, data2) { + utils.nextTick(function() { + fn(err, data1, data2); + }); + } +} diff --git a/lib/bcoin/wallet.js b/lib/bcoin/wallet.js index b8fa1207..82c73c6a 100644 --- a/lib/bcoin/wallet.js +++ b/lib/bcoin/wallet.js @@ -46,8 +46,12 @@ module.exports = Wallet; Wallet.prototype._init = function init() { // Notify owners about new accepted transactions var self = this; - this.tx.on('tx', function(tx) { - self.emit('tx', tx); + var prevBalance = null; + this.tx.on('update', function(tx) { + var b = this.balance(); + if (prevBalance && prevBalance.cmp(b) !== 0) + self.emit('balance', b); + prevBalance = b; }); this.tx.on('error', function(err) {