chain: preload and get

This commit is contained in:
Fedor Indutny 2014-05-02 14:50:21 +04:00
parent 9e0808eb39
commit c3239b7f36
6 changed files with 14082 additions and 64 deletions

View File

@ -1,23 +1,37 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var bcoin = require('../bcoin');
var constants = bcoin.protocol.constants;
var preload = bcoin.protocol.preload;
var utils = bcoin.utils;
function Chain(options) {
if (!(this instanceof Chain))
return new Chain(options);
EventEmitter.call(this);
this.options = options || {};
this.blocks = [];
this.hashes = [];
this.ts = [];
this.hashes = preload.hashes.slice();
this.ts = preload.ts.slice();
this.orphan = {
map: {},
count: 0
};
this.requests = {};
this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbeef);
this.last = null;
this.add(new bcoin.block(constants.genesis));
if (this.hashes.length === 0) {
this.add(new bcoin.block(constants.genesis));
} else {
// Add all preloaded hashes to the bloom filter
for (var i = 0; i < this.hashes.length; i++)
this.bloom.add(utils.toArray(this.hashes[i], 'hex'));
}
}
util.inherits(Chain, EventEmitter);
module.exports = Chain;
Chain.prototype.add = function add(block) {
@ -29,24 +43,45 @@ Chain.prototype.add = function add(block) {
var rhash = block.hash();
var prev = block.prevBlock;
var pos = utils.binaryInsert(this.ts, block.ts, function(a, b) {
return a - b;
}, true);
var last = (this.hashes.length && pos > 0) ? this.hashes[pos - 1] : null;
// Add orphan
if (this.last && prev !== this.last) {
if (last && prev !== last) {
if (!this.bloom.test(rhash) && !this.orphan.map[prev]) {
this.orphan.count++;
this.orphan.map[prev] = block;
this.bloom.add(rhash);
this.emit('missing', prev);
}
break;
}
var hash = block.hash('hex');
this.bloom.add(rhash);
// Sorted insert
var pos = utils.binaryInsert(this.ts, block.ts, function(a, b) {
return a - b;
}, true);
var hash = block.hash('hex');
this.ts.splice(pos, 0, block.ts);
this.hashes.splice(pos, 0, hash);
// Blocks is a FIFO queue, acting like a cache for the requests
this.blocks.push(block);
this.hashes.push(hash);
this.ts.push(block.ts);
this.last = hash;
// Fullfill requests
if (this.requests[hash]) {
var req = this.requests[hash];
delete this.requests[hash];
req.forEach(function(cb) {
cb(block);
});
}
res = true;
// Compress old blocks
@ -75,9 +110,32 @@ Chain.prototype._compress = function compress() {
};
Chain.prototype.getLast = function getLast() {
return this.blocks[this.blocks.length - 1];
return {
hash: this.hashes[this.hashes.length - 1],
ts: this.ts[this.ts.length - 1]
};
};
Chain.prototype.has = function has(hash) {
return this.bloom.test(hash);
return this.bloom.test(hash) && !this.requests[utils.toHex(hash)];
};
Chain.prototype.get = function get(hash, cb) {
if (Array.isArray(hash))
hash = utils.toHex(hash);
for (var i = 0; i < this.blocks.length; i++)
if (this.blocks[i].hash('hex') === hash)
return cb(this.blocks[i]);
this.emit('missing', hash);
if (this.requests[hash])
this.requests[hash].push(cb);
else
this.requests[hash] = [ cb ];
};
Chain.prototype.isFull = function isFull() {
// < 10m since last block
return (+new Date / 1000) - this.ts[this.ts.length - 1] < 10 * 60;
};

View File

@ -1,4 +1,7 @@
var assert = require('assert');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var bcoin = require('../bcoin');
var utils = bcoin.utils;
@ -6,11 +9,13 @@ function Pool(options) {
if (!(this instanceof Pool))
return new Pool(options);
EventEmitter.call(this);
this.options = options || {};
this.size = options.size || 64;
this.parallel = options.parallel || 8000;
this.size = options.size || 2;
this.parallel = options.parallel || 1000;
this.load = {
timeout: options.loadTimeout || 5000,
timeout: options.loadTimeout || 10000,
window: options.loadWindow || 2500,
timer: null,
lwm: options.lwm || this.parallel * 2,
@ -33,41 +38,33 @@ function Pool(options) {
};
this.block = {
lastSeen: null,
lastTs: null,
queue: [],
active: 0,
requests: {}
};
this.finished = 0;
this.searches = [];
this.createConnection = options.createConnection;
assert(this.createConnection);
this._init();
var self = this;
setInterval(function() {
console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d d %d',
self.chain.ts.length, self.chain.orphan.count,
self.block.active,
self.block.queue.length,
Object.keys(self.block.requests).length,
process.memoryUsage().heapUsed,
self.finished - self.chain.ts.length - self.chain.orphan.count);
}, 5000);
process.on('SIGUSR2', function() {
require('fs').writeFileSync('/tmp/1.json', JSON.stringify({
hashes: self.chain.hashes,
ts: self.chain.ts
}));
});
}
util.inherits(Pool, EventEmitter);
module.exports = Pool;
Pool.prototype._init = function _init() {
this._addLoader();
for (var i = 0; i < this.size; i++)
this._addPeer();
var self = this;
this.chain.on('missing', function(hash) {
self._requestBlock(hash);
self._scheduleRequests();
if (self.peers.load === null)
self._addLoader();
});
};
Pool.prototype._addLoader = function _addLoader() {
@ -81,11 +78,12 @@ Pool.prototype._addLoader = function _addLoader() {
// Just ignore, it will result in `close` anyway
});
peer.once('close', function() {
peer.once('close', onclose);
function onclose() {
clearTimeout(timer);
self._removePeer(peer);
self._addLoader();
});
};
peer.once('ack', function() {
peer.updateWatch();
@ -94,6 +92,13 @@ Pool.prototype._addLoader = function _addLoader() {
});
function destroy() {
// Chain is full and up-to-date
if (self.chain.isFull()) {
clearTimeout(timer);
peer.removeListener('close', onclose);
self._removePeer(peer);
}
self.block.lastSeen = null;
peer.destroy();
}
@ -134,9 +139,8 @@ Pool.prototype._load = function load() {
if (this.block.lastSeen)
hash = this.block.lastSeen;
else
hash = this.chain.getLast().hash();
hash = this.chain.getLast().hash;
console.log('Loading from: ' + utils.toHex(hash.slice().reverse()));
this.peers.load.loadBlocks(hash);
return true;
@ -189,14 +193,13 @@ Pool.prototype._addPeer = function _addPeer() {
return;
var req = self.block.requests[utils.toHex(item.hash)];
console.log('notfound', !!req);
if (req)
req.finish(null);
});
});
peer.on('tx', function(tx) {
console.log('got tx', tx.hash('hex'));
self.emit('tx', tx);
});
};
@ -214,13 +217,30 @@ Pool.prototype._removePeer = function _removePeer(peer) {
};
Pool.prototype.watch = function watch(id) {
if (typeof id === 'string')
id = utils.toArray(id, 'hex');
if (id)
this.bloom.add(id);
for (var i = 0; i < this.peers.block.length; i++)
this.peers.block[i].updateWatch();
};
Pool.prototype.search = function search(id, limit) {
if (typeof id === 'string')
id = utils.toArray(id, 'hex');
this.watch(id);
this.searches.push({
id: id,
// Last 30 days by default
limit: limit || (+new Date / 1000 - 2592000)
});
};
Pool.prototype._requestBlock = function _requestBlock(hash) {
if (typeof hash === 'string')
hash = utils.toArray(hash, 'hex');
// Block is already in chain, or being requested
if (this.chain.has(hash))
return;
@ -295,28 +315,6 @@ function BlockRequest(pool, hash) {
};
}
function binaryInsert(list, item, compare) {
var start = 0,
end = list.length;
while (start < end) {
var pos = (start + end) >> 1;
var cmp = compare(item, list[pos]);
if (cmp === 0) {
start = pos;
end = pos;
break;
} else if (cmp < 0) {
end = pos;
} else {
start = pos + 1;
}
}
list.splice(start, 0, item);
}
BlockRequest.prototype.start = function start(peer) {
var self = this;
assert(!this.active);
@ -356,7 +354,7 @@ BlockRequest.prototype.clear = function clear() {
BlockRequest.prototype.retry = function retry() {
// Put block into the queue, ensure that the queue is always sorted by ts
binaryInsert(this.pool.block.queue, this, BlockRequest.compare);
utils.binaryInsert(this.pool.block.queue, this, BlockRequest.compare);
var peer = this.peer;
this.clear();

View File

@ -190,7 +190,10 @@ Framer.prototype.getBlocks = function getBlocks(hashes, stop) {
p.length = off + 32 * (hashes.length + 1);
for (var i = 0; i < hashes.length; i++) {
var len = utils.copy(hashes[i], p, off);
var hash = hashes[i];
if (typeof hash === 'string')
hash = utils.toArray(hash, 'hex');
var len = utils.copy(hash, p, off);
for (; len < 32; len++)
p[off + len] = 0;
off += len;

View File

@ -3,3 +3,4 @@ var protocol = exports;
protocol.constants = require('./constants');
protocol.framer = require('./framer');
protocol.parser = require('./parser');
protocol.preload = require('./preload');

13933
lib/bcoin/protocol/preload.js Normal file

File diff suppressed because it is too large Load Diff

View File

@ -198,3 +198,28 @@ function toHex(msg) {
return res;
}
utils.toHex = toHex;
function binaryInsert(list, item, compare, search) {
var start = 0,
end = list.length;
while (start < end) {
var pos = (start + end) >> 1;
var cmp = compare(item, list[pos]);
if (cmp === 0) {
start = pos;
end = pos;
break;
} else if (cmp < 0) {
end = pos;
} else {
start = pos + 1;
}
}
if (!search)
list.splice(start, 0, item);
return start;
}
utils.binaryInsert = binaryInsert;