From 6fd7173c89e3a9bdb2fd9ba30392611b08151503 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 4 May 2014 00:48:43 +0400 Subject: [PATCH] wip --- lib/bcoin/chain.js | 20 +++++++------- lib/bcoin/pool.js | 66 +++++++++++++++++++++++++--------------------- lib/bcoin/utils.js | 31 ++++++++++++++++++++++ 3 files changed, 77 insertions(+), 40 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index c9635a33..f7358a75 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -116,6 +116,11 @@ Chain.prototype.add = function add(block) { // Validated known block at this point - add it to index this.addIndex(hash, block.ts); + // At least one block was added + res = true; + this.block.list.push(block); + this._bloomBlock(block); + // Fullfill request if (this.request.map[hash]) { var req = this.request.map[hash]; @@ -126,11 +131,6 @@ Chain.prototype.add = function add(block) { }); } - // At least one block was added - res = true; - this.block.list.push(block); - this._bloomBlock(block); - if (!this.orphan.map[hash]) break; @@ -187,11 +187,11 @@ Chain.prototype.get = function get(hash, cb) { if (this.block.bloom.test(hash, 'hex')) { for (var i = 0; i < this.block.list.length; i++) { if (this.block.list[i].hash('hex') === hash) { - // NOTE: we return right after the statement - so `i` should be valid - // at the time of nextTick call - var self = this; - process.nextTick(function() { - cb(self.block.list[i]); + // NOTE: we return right after the statement - so `block` should be + // valid at the time of nextTick call + var block = this.block.list[i]; + bcoin.utils.nextTick(function() { + cb(block); }); return; } diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 6ca0d6a3..d6d87343 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -13,7 +13,7 @@ function Pool(options) { EventEmitter.call(this); this.options = options || {}; - this.size = options.size || 3; + this.size = options.size || 8; this.parallel = options.parallel || 2000; this.load = { timeout: options.loadTimeout || 10000, @@ -26,7 +26,7 @@ function Pool(options) { this.maxRetries = options.maxRetries || 300; this.requestTimeout = options.requestTimeout || 10000; this.chain = new bcoin.chain(); - this.watchList = []; + this.watchMap = {}; this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, (Math.random() * 0xffffffff) | 0), @@ -51,12 +51,13 @@ function Pool(options) { delta: 5 * 24 * 3600, // Minimum verification depth - minDepth: options.minValidateDepth || 1, + minDepth: options.minValidateDepth || 0, // getTx map map: {}, // Validation cache + reqs: new bcoin.utils.RequestCache(), cache: [], cacheSize: 1000 }; @@ -80,15 +81,6 @@ Pool.prototype._init = function _init() { self._scheduleRequests(); self._loadRange(range); }); - - setInterval(function() { - console.log('a %d q %d o %d r %d', - self.request.active, self.request.queue.length, - self.chain.orphan.count, - self.chain.request.count); - if (self.chain.request.count === 1) - console.log(Object.keys(self.chain.request.map)); - }, 1000); }; Pool.prototype._addLoader = function _addLoader() { @@ -254,10 +246,15 @@ Pool.prototype._removePeer = function _removePeer(peer) { }; Pool.prototype.watch = function watch(id) { + var hid = utils.toHex(id); + if (this.watchMap[hid]) + this.watchMap[hid]++; + else + this.watchMap[hid] = 1; + if (this.bloom.test(id, 'hex')) return; - this.watchList.push(utils.toHex(id)); if (id) this.bloom.add(id, 'hex'); if (this.peers.load) @@ -271,15 +268,16 @@ Pool.prototype.unwatch = function unwatch(id) { return; id = utils.toHex(id); - var index = this.watchList.indexOf(id); - if (index === -1) + if (!this.watchMap[id] || --this.watchMap[id] !== 0) return; - this.watchList.splice(index, 1); + + delete this.watchMap[id]; // Reset bloom filter this.bloom.reset(); - for (var i = 0; i < this.watchList.length; i++) - this.bloom.add(this.watchList[i], 'hex'); + Object.keys(this.watchMap).forEach(function(id) { + this.bloom.add(id, 'hex'); + }, this); // Resend it to peers if (this.peers.load) @@ -329,7 +327,7 @@ Pool.prototype.search = function search(id, range) { // Empty search if (hashes.length === 0) { - process.nextTick(function() { + bcoin.utils.nextTick(function() { e.emit('end', true); }); } @@ -431,9 +429,9 @@ Pool.prototype.getTx = function getTx(hash, range, cb) { // Do not perform duplicate searches if (this.validate.map[hash]) return this.validate.map[hash].push(cb); - var cbs = [ cb ]; this.validate.map[hash] = cbs; + // Add request without queueing it to get notification at the time of load var tx = null; var finished = false; @@ -455,7 +453,6 @@ Pool.prototype.getTx = function getTx(hash, range, cb) { doSearch(); function doSearch() { - console.log('Searching for ' + hash, range); var e = self.search(hash, range); e.on('end', function(empty) { if (finished) { @@ -502,12 +499,16 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) { // Probe cache first var result = this._probeValidateCache(tx); if (result) { - process.nextTick(function() { + bcoin.utils.nextTick(function() { cb(null, result); }); return; } + // Do not perform similar parallel validations + if (!this.validate.reqs.add(tx.hash('hex'), cb)) + return; + if (!_params) _params = { depth: 0, range: null }; @@ -520,10 +521,10 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) { valid: false }; - console.log('validateTx: ', tx.hash('hex'), depth); - if (depth > this.validate.minDepth && result.included) { + console.log('validateTx: ', tx.hash('hex'), depth, result.included, _params.path); + if (depth >= this.validate.minDepth && result.included) { result.valid = true; - process.nextTick(function() { + bcoin.utils.nextTick(function() { cb(null, result); }); return; @@ -533,13 +534,12 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) { var self = this; async.map(tx.inputs, function(input, cb) { var out = null; - console.log('load input: ', input.out.hash); self.getTx(input.out.hash, range, function(t, range) { - console.log('got input for: ', tx.hash('hex'), !!t); out = t; self.validateTx(out, onSubvalidate, { depth: depth + 1, - range: range + range: range, + path: (_params.path || []).concat(tx.hash('hex')) }); }); @@ -556,11 +556,13 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) { }, function(err, inputs) { if (err) { result.valid = false; + self.validate.reqs.fullfill(tx.hash('hex'), err, result); return cb(err, result); } self._addValidateCache(tx, result); - console.log(inputs); + self.validate.reqs.fullfill(tx.hash('hex'), null, result); + cb(null, result); }); }; @@ -573,6 +575,7 @@ function LoadRequest(pool, type, hash, cb) { this.peer = null; this.ts = +new Date; this.active = false; + this.noQueue = false; var self = this; this.onclose = function onclose() { @@ -617,6 +620,8 @@ LoadRequest.prototype.add = function add(noQueue) { this.pool.request.map[this.hash] = this; if (!noQueue) this.pool.request.queue.push(this); + else + this.noQueue = true; }; LoadRequest.prototype.clear = function clear() { @@ -650,7 +655,8 @@ LoadRequest.prototype.finish = function finish(entity) { // It could be that request was never sent to the node, remove it from // queue and forget about it var index = this.pool.request.queue.indexOf(this); - if (index !== -1) + assert(index !== -1 || this.noQueue); + if (!this.noQueue) this.pool.request.queue.splice(index, 1); assert(!this.peer); assert(!this.timer); diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 25db902f..fa110116 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -269,3 +269,34 @@ function testTarget(target, hash) { return true; } utils.testTarget = testTarget; + +// TODO(indutny): use process.nextTick in node.js +utils.nextTick = function nextTick(fn) { + setTimeout(fn, 0); +}; + +function RequestCache() { + this.map = {}; +}; +utils.RequestCache = RequestCache; + +RequestCache.prototype.add = function add(id, cb) { + if (this.map[id]) { + this.map[id].push(cb); + return false; + } else { + this.map[id] = [ cb ]; + return true; + } +}; + +RequestCache.prototype.fullfill = function fullfill(id, err, data) { + if (!this.map[id]) + return; + + var cbs = this.map[id]; + delete this.map[id]; + cbs.forEach(function(cb) { + cb(err, data); + }); +};