pool: refactor, make search work!

This commit is contained in:
Fedor Indutny 2014-05-03 16:11:06 +04:00
parent 0eecb94b3f
commit 1ac3208360
5 changed files with 197 additions and 172 deletions

View File

@ -6,11 +6,11 @@ function Bloom(size, n, tweak) {
return new Bloom(size, n, tweak); return new Bloom(size, n, tweak);
this.filter = new Array(Math.ceil(size / 32)); this.filter = new Array(Math.ceil(size / 32));
for (var i = 0; i < this.filter.length; i++)
this.filter[i] = 0;
this.size = size; this.size = size;
this.n = n; this.n = n;
this.tweak = tweak; this.tweak = tweak;
this.reset();
} }
module.exports = Bloom; module.exports = Bloom;
@ -18,7 +18,14 @@ Bloom.prototype.hash = function hash(val, n) {
return Bloom.hash(val, sum32(mul32(n, 0xfba4c795), this.tweak)) % this.size; return Bloom.hash(val, sum32(mul32(n, 0xfba4c795), this.tweak)) % this.size;
}; };
Bloom.prototype.add = function add(val) { Bloom.prototype.reset = function reset() {
for (var i = 0; i < this.filter.length; i++)
this.filter[i] = 0;
};
Bloom.prototype.add = function add(val, enc) {
val = utils.toArray(val, enc);
for (var i = 0; i < this.n; i++) { for (var i = 0; i < this.n; i++) {
var bit = this.hash(val, i); var bit = this.hash(val, i);
var pos = 1 << (bit & 0x1f); var pos = 1 << (bit & 0x1f);
@ -28,7 +35,9 @@ Bloom.prototype.add = function add(val) {
} }
}; };
Bloom.prototype.test = function test(val) { Bloom.prototype.test = function test(val, enc) {
val = utils.toArray(val, enc);
for (var i = 0; i < this.n; i++) { for (var i = 0; i < this.n; i++) {
var bit = this.hash(val, i); var bit = this.hash(val, i);
var pos = 1 << (bit & 0x1f); var pos = 1 << (bit & 0x1f);

View File

@ -13,31 +13,77 @@ function Chain(options) {
EventEmitter.call(this); EventEmitter.call(this);
this.options = options || {}; this.options = options || {};
this.blocks = []; this.block = {
this.fifo = []; list: [],
this.hashes = preload.hashes.slice(); bloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeef)
this.ts = preload.ts.slice(); };
this.orphan = { this.orphan = {
map: {}, map: {},
count: 0 count: 0
}; };
this.requests = { this.index = {
bloom: new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbee0),
hashes: preload.hashes.slice(),
ts: preload.ts.slice()
};
this.request = {
map: {}, map: {},
count: 0 count: 0
}; };
this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbeef);
if (this.hashes.length === 0) { if (this.index.hashes.length === 0)
this.add(new bcoin.block(constants.genesis)); this.add(new bcoin.block(constants.genesis));
} else {
// Add all preloaded hashes to the bloom filter for (var i = 0; i < this.index.hashes.length; i++)
for (var i = 0; i < this.hashes.length; i++) this.index.bloom.add(this.index.hashes[i], 'hex');
this.bloom.add(utils.toArray(this.hashes[i], 'hex'));
}
} }
util.inherits(Chain, EventEmitter); util.inherits(Chain, EventEmitter);
module.exports = Chain; module.exports = Chain;
function compareTs(a, b) {
return a -b;
}
Chain.prototype.probeIndex = function probeIndex(hash, ts) {
if (!this.index.bloom.test(hash, 'hex'))
return false;
var start = 0;
var end = this.index.ts.length;
if (ts) {
start = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true);
start = Math.max(0, start - 1);
end = utils.binaryInsert(this.index.ts, ts + 2 * 3600, compareTs, true);
}
for (var i = start; i < end; i++)
if (this.index.hashes[i] === hash)
return true;
return false;
};
Chain.prototype.addIndex = function addIndex(hash, ts) {
if (this.probeIndex(hash, ts))
return;
var pos = utils.binaryInsert(this.index.ts, ts, compareTs);
this.index.hashes.splice(pos, 0, hash);
this.index.bloom.add(hash, 'hex');
};
Chain.prototype.getRange = function getRange(ts) {
var start = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true);
var end = utils.binaryInsert(this.index.ts, ts + 2 * 3600, compareTs, true);
if (start > 0)
start--;
if (end > 0)
end--;
return { start: this.index.hashes[start], end: this.index.hashes[end] };
};
Chain.prototype.add = function add(block) { Chain.prototype.add = function add(block) {
var res = false; var res = false;
var initial = block; var initial = block;
@ -47,152 +93,105 @@ Chain.prototype.add = function add(block) {
break; break;
var hash = block.hash('hex'); var hash = block.hash('hex');
var rhash = block.hash();
var prev = block.prevBlock; var prev = block.prevBlock;
// Validity period is two hours // If previous block wasn't ever seen - add current to orphans
var pos = utils.binaryInsert(this.ts, block.ts - 2 * 3600, function(a, b) { if (this.orphan.map[prev])
return a - b; break;
}, true); if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) {
var test = this.bloom.test(rhash) && this.orphan.count++;
prev !== initial && this.orphan.map[prev] = block;
!this.orphan.map[prev];
var match = this.hashes.length === 0; // Add block to bloom filter, as we now have it in memory
for (pos--; !match && 0 <= 0 && pos < this.hashes.length; pos++) this.block.bloom.add(hash, 'hex');
match = this.hashes[pos] === prev; this.emit('missing', prev, this.getRange(block.ts));
// If last hash at ts matches prev hash or we already know this block -
// add it to either list or FIFO
if (!match && !test) {
// Add orphan
if (!this.orphan.map[prev]) {
this.orphan.count++;
this.orphan.map[prev] = block;
this.bloom.add(rhash);
this.emit('missing', prev);
}
break; break;
} }
// It may be a re-requested block // Validated known block at this point - add it to index
if (!this.bloom.test(rhash)) { this.addIndex(hash, block.ts);
// Sorted insert
var pos = utils.binaryInsert(this.ts, block.ts, function(a, b) {
return a - b;
}, true);
this.ts.splice(pos, 0, block.ts); // Fullfill request
this.hashes.splice(pos, 0, hash); if (this.request.map[hash]) {
this.bloom.add(rhash); var req = this.request.map[hash];
} delete this.request.map[hash];
this.request.count--;
// Some old block for caching purposes, should be a FIFO
if (!this.covers(block.ts)) {
this.fifo.push(block);
// A new block
} else {
// Insert block into a cache set if it isn't already there
var pos = utils.binaryInsert(this.blocks, block, function(a, b) {
return a.ts - b.ts;
}, true);
this.blocks.splice(pos, 0, block);
}
// Fullfill requests
if (this.requests.map[hash]) {
var req = this.requests.map[hash];
delete this.requests.map[hash];
this.requests.count--;
req.forEach(function(cb) { req.forEach(function(cb) {
cb(block); cb(block);
}); });
} }
// At least one block was added
res = true; res = true;
this.block.list.push(block);
// Compress old blocks if (!this.orphan.map[hash])
this._compress(); break;
// We have orphan child for this block - add it to chain // We have orphan child for this block - add it to chain
if (this.orphan.map[hash]) { block = this.orphan.map[hash];
block = this.orphan.map[hash]; delete this.orphan.map[hash];
delete this.orphan.map[hash]; this.orphan.count--;
this.orphan.count--;
continue;
}
break;
} while (true); } while (true);
// Compress old blocks
this._compress();
return res; return res;
}; };
Chain.prototype._compress = function compress() { Chain.prototype._compress = function compress() {
// Store only last 1000 blocks and 1000 FIFO // Keep at least 1000 blocks and at most 2000
if (this.blocks.length > 1000) if (this.block.list.length < 2000)
this.blocks = this.blocks.slice(-1000); return;
if (this.fifo.length > 1000)
this.fifo = this.fifo.slice(-1000); // Bloom filter rebuilt is needed
this.block.list = this.block.list.slice(-1000);
this.block.bloom.reset();
for (var i = 0; i < this.block.list.length; i++)
this.block.bloom.add(this.block.list[i].hash());
}; };
Chain.prototype.getLast = function getLast() { Chain.prototype.has = function has(hash) {
return { return this.probeIndex(hash) || !!this.orphan.map[hash];
hash: this.hashes[this.hashes.length - 1],
ts: this.ts[this.ts.length - 1]
};
};
Chain.prototype.has = function has(hash, cacheonly) {
if (!Array.isArray(hash))
hash = utils.toArray(hash, 'hex');
if (!cacheonly)
return this.bloom.test(hash) && !this.requests.map[utils.toHex(hash)];
for (var i = 0; i < this.blocks.length; i++)
if (this.blocks[i].hash('hex') === hash)
return true;
for (var i = 0; i < this.fifo.length; i++)
if (this.fifo[i].hash('hex') === hash)
return true;
return false;
}; };
Chain.prototype.get = function get(hash, cb) { Chain.prototype.get = function get(hash, cb) {
if (Array.isArray(hash)) // Cached block found
hash = utils.toHex(hash); if (this.block.bloom.test(hash, 'hex')) {
for (var i = 0; i < this.block.list.length; i++)
if (this.block.list[i].hash('hex') === hash)
return cb(this.block.list[i]);
assert(false);
}
for (var i = 0; i < this.blocks.length; i++) if (this.request.map[hash]) {
if (this.blocks[i].hash('hex') === hash) this.request.map[hash].push(cb);
return cb(this.blocks[i]); this.request.count++;
for (var i = 0; i < this.fifo.length; i++)
if (this.fifo[i].hash('hex') === hash)
return cb(this.fifo[i]);
if (this.requests.map[hash]) {
this.requests.map[hash].push(cb);
this.requests.count++;
} else { } else {
this.requests.map[hash] = [ cb ]; this.request.map[hash] = [ cb ];
} }
this.emit('missing', hash); this.emit('missing', hash);
}; };
Chain.prototype.isFull = function isFull() { Chain.prototype.isFull = function isFull() {
// < 10m since last block // < 10m since last block
return !this.requests.count && return !this.request.count &&
(+new Date / 1000) - this.ts[this.ts.length - 1] < 10 * 60; (+new Date / 1000) - this.index.ts[this.index.ts.length - 1] < 10 * 60;
}; };
Chain.prototype.covers = function covers(ts) { Chain.prototype.covers = function covers(ts) {
return this.ts.length && this.ts[0] <= ts; return ts >= this.index.ts[0];
}; };
Chain.prototype.hashesFrom = function hashesFrom(ts) { Chain.prototype.hashesFrom = function hashesFrom(ts, index) {
var pos = utils.binaryInsert(this.ts, ts, function(a, b) { var pos = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true);
return a - b; if (pos > 0)
}, true); pos--;
return this.hashes.slice(pos); return this.index.hashes.slice(pos);
};
Chain.prototype.getLast = function getLast() {
return this.index.hashes[this.index.hashes.length - 1];
}; };

View File

@ -284,6 +284,7 @@ Peer.prototype._handleInv = function handleInv(items) {
this.getData(txs); this.getData(txs);
}; };
Peer.prototype.loadBlocks = function loadBlocks(hash) { Peer.prototype.loadBlocks = function loadBlocks(hash, stop) {
this._write(this.framer.getBlocks([ hash ])); this._write(this.framer.getBlocks(Array.isArray(hash) ? hash : [ hash ],
stop));
}; };

View File

@ -12,18 +12,18 @@ function Pool(options) {
EventEmitter.call(this); EventEmitter.call(this);
this.options = options || {}; this.options = options || {};
this.size = options.size || 4; this.size = options.size || 2;
this.parallel = options.parallel || 1000; this.parallel = options.parallel || 2000;
this.load = { this.load = {
timeout: options.loadTimeout || 10000, timeout: options.loadTimeout || 10000,
window: options.loadWindow || 2500, window: options.loadWindow || 250,
timer: null, timer: null,
lwm: options.lwm || this.parallel * 2, lwm: options.lwm || this.parallel * 2,
hwm: options.hwm || this.parallel * 8, hwm: options.hwm || this.parallel * 8,
hiReached: false hiReached: false
}; };
this.maxRetries = options.maxRetries || 50; this.maxRetries = options.maxRetries || 300;
this.requestTimeout = options.requestTimeout || 10000; this.requestTimeout = options.requestTimeout || 30000;
this.chain = new bcoin.chain(); this.chain = new bcoin.chain();
this.bloom = new bcoin.bloom(8 * 10 * 1024, this.bloom = new bcoin.bloom(8 * 10 * 1024,
10, 10,
@ -37,8 +37,7 @@ function Pool(options) {
load: null load: null
}; };
this.block = { this.block = {
nextLoad: [], lastHash: null,
loadTs: Infinity,
queue: [], queue: [],
active: 0, active: 0,
requests: {} requests: {}
@ -58,28 +57,21 @@ Pool.prototype._init = function _init() {
this._addPeer(); this._addPeer();
var self = this; var self = this;
this.chain.on('missing', function(hash) { this.chain.on('missing', function(hash, range) {
self._requestBlock(hash); self._requestBlock(hash, true);
self._scheduleRequests(); self._scheduleRequests();
self._load(); self._loadRange(range);
}); });
setInterval(function() {
console.log(self.chain.fifo.length, self.chain.blocks.length, self.chain.orphan.count, self.chain.hashes.length, self.chain.ts.length);
}, 5000);
}; };
Pool.prototype._addLoader = function _addLoader() { Pool.prototype._addLoader = function _addLoader() {
if (this.peers.load !== null) if (this.peers.load !== null)
return; return;
var socket = this.createConnection(); var socket = this.createConnection();
var peer = bcoin.peer(this, socket, this.options.peer); var peer = bcoin.peer(this, socket, this.options.peer);
this.peers.load = peer; this.peers.load = peer;
// Queue block's hash for search
if (this.block.loadTs < Infinity)
this.block.nextLoad.push(this.chain.hashesFrom(this.block.loadTs)[0]);
var self = this; var self = this;
peer.once('error', function() { peer.once('error', function() {
// Just ignore, it will result in `close` anyway // Just ignore, it will result in `close` anyway
@ -100,7 +92,8 @@ Pool.prototype._addLoader = function _addLoader() {
function destroy() { function destroy() {
// Chain is full and up-to-date // Chain is full and up-to-date
if (self.block.nextLoad.length === 0 && self.chain.isFull()) { if (self.block.lastHash === null &&
self.chain.isFull()) {
clearTimeout(timer); clearTimeout(timer);
peer.removeListener('close', onclose); peer.removeListener('close', onclose);
self._removePeer(peer); self._removePeer(peer);
@ -113,8 +106,8 @@ Pool.prototype._addLoader = function _addLoader() {
// Split blocks and request them using multiple peers // Split blocks and request them using multiple peers
peer.on('blocks', function(hashes) { peer.on('blocks', function(hashes) {
if (hashes.length === 0) { if (hashes.length === 0) {
// Reset search loads // Reset global load
self.block.loadTs = Infinity; self.block.lastHash = null;
return; return;
} }
@ -125,22 +118,31 @@ Pool.prototype._addLoader = function _addLoader() {
self._scheduleRequests(); self._scheduleRequests();
self.block.nextLoad.push(hashes[hashes.length - 1]); // Store last hash to continue global load
self._lastHash = hashes[hashes.length - 1];
clearTimeout(timer); clearTimeout(timer);
// Reinstantiate timeout // Reinstantiate timeout
if (self._load()) if (self._load())
timer = setTimeout(destroy, self.load.timeout); timer = setTimeout(destroy, self.load.timeout);
}); });
}; };
Pool.prototype._load = function load(from) { Pool.prototype._loadRange = function _loadRange(range) {
// Requested load of earlier block ids if (!range)
if (from !== undefined && this.block.loadTs > from) { return;
this.block.loadTs = from;
this.block.nextLoad.push(this.chain.hashesFrom(from)[0]);
}
// We will be requesting block anyway
if (range.start === range.end)
return;
if (!this.peers.load)
this._addLoader();
this.peers.load.loadBlocks(range.start, range.end);
};
Pool.prototype._load = function _load() {
if (this.block.queue.length >= this.load.hwm) { if (this.block.queue.length >= this.load.hwm) {
this.load.hiReached = true; this.load.hiReached = true;
return false; return false;
@ -149,15 +151,15 @@ Pool.prototype._load = function load(from) {
// Load more blocks, starting from last hash // Load more blocks, starting from last hash
var hash; var hash;
if (this.block.nextLoad.length) { if (this.block.lastHash)
hash = this.block.nextLoad.shift(); hash = this.block.lastHash;
} else { else
hash = this.chain.getLast().hash; hash = this.chain.getLast();
}
if (!this.peers.load) if (!this.peers.load)
this._addLoader(); this._addLoader();
this.peers.load.loadBlocks(hash); else
this.peers.load.loadBlocks(hash);
return true; return true;
}; };
@ -256,18 +258,23 @@ Pool.prototype.search = function search(id, limit, cb) {
if (!limit) if (!limit)
limit = (+new Date / 1000 - 432000); limit = (+new Date / 1000 - 432000);
var self = this;
var hashes = this.chain.hashesFrom(limit);
var waiting = hashes.length;
this.watch(id); this.watch(id);
if (!this.chain.covers(limit)) { hashes.slice().reverse().forEach(function(hash) {
// Load some block hashes first // Get the block that is in index
this._load(limit); this.chain.get(hash, function(block) {
} else { // Get block's prev and request it and all of it's parents up to
// Load the blocks themselves! // the next known block hash
this.chain.hashesFrom(limit).forEach(function(hash) { self.chain.get(block.prevBlock, function() {
this._requestBlock(hash, true); if (--waiting === 0)
}, this); cb();
this._scheduleRequests(); });
} });
}, this);
}; };
Pool.prototype._requestBlock = function _requestBlock(hash, force) { Pool.prototype._requestBlock = function _requestBlock(hash, force) {
@ -275,7 +282,7 @@ Pool.prototype._requestBlock = function _requestBlock(hash, force) {
hash = utils.toArray(hash, 'hex'); hash = utils.toArray(hash, 'hex');
// Block should be not in chain, or be requested // Block should be not in chain, or be requested
if (this.chain.has(hash, force)) if (!force && this.chain.has(hash))
return; return;
var hex = utils.toHex(hash); var hex = utils.toHex(hash);
@ -314,6 +321,10 @@ Pool.prototype._doRequests = function _doRequests() {
if (this.block.active >= this.parallel) if (this.block.active >= this.parallel)
return; return;
// No peers so far
if (this.peers.block.length === 0)
return;
var above = this.block.queue.length >= this.load.lwm; var above = this.block.queue.length >= this.load.lwm;
var items = this.block.queue.slice(0, this.parallel - this.block.active); var items = this.block.queue.slice(0, this.parallel - this.block.active);
this.block.queue = this.block.queue.slice(items.length); this.block.queue = this.block.queue.slice(items.length);

View File

@ -199,7 +199,12 @@ Framer.prototype.getBlocks = function getBlocks(hashes, stop) {
off += len; off += len;
} }
var len = stop ? utils.copy(stop, p, off) : 0; if (stop) {
stop = utils.toArray(stop, 'hex');
var len = utils.copy(stop, p, off);
} else {
var len = 0;
}
for (; len < 32; len++) for (; len < 32; len++)
p[off + len] = 0; p[off + len] = 0;
assert.equal(off + len, p.length); assert.equal(off + len, p.length);