This commit is contained in:
Fedor Indutny 2014-05-04 00:48:43 +04:00
parent f185574442
commit 6fd7173c89
3 changed files with 77 additions and 40 deletions

View File

@ -116,6 +116,11 @@ Chain.prototype.add = function add(block) {
// Validated known block at this point - add it to index
this.addIndex(hash, block.ts);
// At least one block was added
res = true;
this.block.list.push(block);
this._bloomBlock(block);
// Fullfill request
if (this.request.map[hash]) {
var req = this.request.map[hash];
@ -126,11 +131,6 @@ Chain.prototype.add = function add(block) {
});
}
// At least one block was added
res = true;
this.block.list.push(block);
this._bloomBlock(block);
if (!this.orphan.map[hash])
break;
@ -187,11 +187,11 @@ Chain.prototype.get = function get(hash, cb) {
if (this.block.bloom.test(hash, 'hex')) {
for (var i = 0; i < this.block.list.length; i++) {
if (this.block.list[i].hash('hex') === hash) {
// NOTE: we return right after the statement - so `i` should be valid
// at the time of nextTick call
var self = this;
process.nextTick(function() {
cb(self.block.list[i]);
// NOTE: we return right after the statement - so `block` should be
// valid at the time of nextTick call
var block = this.block.list[i];
bcoin.utils.nextTick(function() {
cb(block);
});
return;
}

View File

@ -13,7 +13,7 @@ function Pool(options) {
EventEmitter.call(this);
this.options = options || {};
this.size = options.size || 3;
this.size = options.size || 8;
this.parallel = options.parallel || 2000;
this.load = {
timeout: options.loadTimeout || 10000,
@ -26,7 +26,7 @@ function Pool(options) {
this.maxRetries = options.maxRetries || 300;
this.requestTimeout = options.requestTimeout || 10000;
this.chain = new bcoin.chain();
this.watchList = [];
this.watchMap = {};
this.bloom = new bcoin.bloom(8 * 10 * 1024,
10,
(Math.random() * 0xffffffff) | 0),
@ -51,12 +51,13 @@ function Pool(options) {
delta: 5 * 24 * 3600,
// Minimum verification depth
minDepth: options.minValidateDepth || 1,
minDepth: options.minValidateDepth || 0,
// getTx map
map: {},
// Validation cache
reqs: new bcoin.utils.RequestCache(),
cache: [],
cacheSize: 1000
};
@ -80,15 +81,6 @@ Pool.prototype._init = function _init() {
self._scheduleRequests();
self._loadRange(range);
});
setInterval(function() {
console.log('a %d q %d o %d r %d',
self.request.active, self.request.queue.length,
self.chain.orphan.count,
self.chain.request.count);
if (self.chain.request.count === 1)
console.log(Object.keys(self.chain.request.map));
}, 1000);
};
Pool.prototype._addLoader = function _addLoader() {
@ -254,10 +246,15 @@ Pool.prototype._removePeer = function _removePeer(peer) {
};
Pool.prototype.watch = function watch(id) {
var hid = utils.toHex(id);
if (this.watchMap[hid])
this.watchMap[hid]++;
else
this.watchMap[hid] = 1;
if (this.bloom.test(id, 'hex'))
return;
this.watchList.push(utils.toHex(id));
if (id)
this.bloom.add(id, 'hex');
if (this.peers.load)
@ -271,15 +268,16 @@ Pool.prototype.unwatch = function unwatch(id) {
return;
id = utils.toHex(id);
var index = this.watchList.indexOf(id);
if (index === -1)
if (!this.watchMap[id] || --this.watchMap[id] !== 0)
return;
this.watchList.splice(index, 1);
delete this.watchMap[id];
// Reset bloom filter
this.bloom.reset();
for (var i = 0; i < this.watchList.length; i++)
this.bloom.add(this.watchList[i], 'hex');
Object.keys(this.watchMap).forEach(function(id) {
this.bloom.add(id, 'hex');
}, this);
// Resend it to peers
if (this.peers.load)
@ -329,7 +327,7 @@ Pool.prototype.search = function search(id, range) {
// Empty search
if (hashes.length === 0) {
process.nextTick(function() {
bcoin.utils.nextTick(function() {
e.emit('end', true);
});
}
@ -431,9 +429,9 @@ Pool.prototype.getTx = function getTx(hash, range, cb) {
// Do not perform duplicate searches
if (this.validate.map[hash])
return this.validate.map[hash].push(cb);
var cbs = [ cb ];
this.validate.map[hash] = cbs;
// Add request without queueing it to get notification at the time of load
var tx = null;
var finished = false;
@ -455,7 +453,6 @@ Pool.prototype.getTx = function getTx(hash, range, cb) {
doSearch();
function doSearch() {
console.log('Searching for ' + hash, range);
var e = self.search(hash, range);
e.on('end', function(empty) {
if (finished) {
@ -502,12 +499,16 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) {
// Probe cache first
var result = this._probeValidateCache(tx);
if (result) {
process.nextTick(function() {
bcoin.utils.nextTick(function() {
cb(null, result);
});
return;
}
// Do not perform similar parallel validations
if (!this.validate.reqs.add(tx.hash('hex'), cb))
return;
if (!_params)
_params = { depth: 0, range: null };
@ -520,10 +521,10 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) {
valid: false
};
console.log('validateTx: ', tx.hash('hex'), depth);
if (depth > this.validate.minDepth && result.included) {
console.log('validateTx: ', tx.hash('hex'), depth, result.included, _params.path);
if (depth >= this.validate.minDepth && result.included) {
result.valid = true;
process.nextTick(function() {
bcoin.utils.nextTick(function() {
cb(null, result);
});
return;
@ -533,13 +534,12 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) {
var self = this;
async.map(tx.inputs, function(input, cb) {
var out = null;
console.log('load input: ', input.out.hash);
self.getTx(input.out.hash, range, function(t, range) {
console.log('got input for: ', tx.hash('hex'), !!t);
out = t;
self.validateTx(out, onSubvalidate, {
depth: depth + 1,
range: range
range: range,
path: (_params.path || []).concat(tx.hash('hex'))
});
});
@ -556,11 +556,13 @@ Pool.prototype.validateTx = function validateTx(tx, cb, _params) {
}, function(err, inputs) {
if (err) {
result.valid = false;
self.validate.reqs.fullfill(tx.hash('hex'), err, result);
return cb(err, result);
}
self._addValidateCache(tx, result);
console.log(inputs);
self.validate.reqs.fullfill(tx.hash('hex'), null, result);
cb(null, result);
});
};
@ -573,6 +575,7 @@ function LoadRequest(pool, type, hash, cb) {
this.peer = null;
this.ts = +new Date;
this.active = false;
this.noQueue = false;
var self = this;
this.onclose = function onclose() {
@ -617,6 +620,8 @@ LoadRequest.prototype.add = function add(noQueue) {
this.pool.request.map[this.hash] = this;
if (!noQueue)
this.pool.request.queue.push(this);
else
this.noQueue = true;
};
LoadRequest.prototype.clear = function clear() {
@ -650,7 +655,8 @@ LoadRequest.prototype.finish = function finish(entity) {
// It could be that request was never sent to the node, remove it from
// queue and forget about it
var index = this.pool.request.queue.indexOf(this);
if (index !== -1)
assert(index !== -1 || this.noQueue);
if (!this.noQueue)
this.pool.request.queue.splice(index, 1);
assert(!this.peer);
assert(!this.timer);

View File

@ -269,3 +269,34 @@ function testTarget(target, hash) {
return true;
}
utils.testTarget = testTarget;
// TODO(indutny): use process.nextTick in node.js
utils.nextTick = function nextTick(fn) {
setTimeout(fn, 0);
};
function RequestCache() {
this.map = {};
};
utils.RequestCache = RequestCache;
RequestCache.prototype.add = function add(id, cb) {
if (this.map[id]) {
this.map[id].push(cb);
return false;
} else {
this.map[id] = [ cb ];
return true;
}
};
RequestCache.prototype.fullfill = function fullfill(id, err, data) {
if (!this.map[id])
return;
var cbs = this.map[id];
delete this.map[id];
cbs.forEach(function(cb) {
cb(err, data);
});
};