lib: save progress

This commit is contained in:
Fedor Indutny 2014-05-03 23:55:38 +04:00
parent 40ea4d1fb1
commit f185574442
6 changed files with 362 additions and 73 deletions

View File

@ -48,3 +48,7 @@ Block.prototype.verify = function verify() {
Block.prototype.render = function render(framer) {
return [];
};
Block.prototype.hasMerkle = function hasMerkle(hash) {
return this.hashes.indexOf(hash) !== -1;
};

View File

@ -15,6 +15,11 @@ function Chain(options) {
this.options = options || {};
this.block = {
list: [],
// Bloom filter for all merkle trees
merkleBloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeed),
// Bloom filter for all known blocks
bloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeef)
};
this.orphan = {
@ -95,15 +100,15 @@ Chain.prototype.add = function add(block) {
var hash = block.hash('hex');
var prev = block.prevBlock;
// If previous block wasn't ever seen - add current to orphans
// If the block is already known to be an orphan
if (this.orphan.map[prev])
break;
// If previous block wasn't ever seen - add current to orphans
if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) {
this.orphan.count++;
this.orphan.map[prev] = block;
// Add block to bloom filter, as we now have it in memory
this.block.bloom.add(hash, 'hex');
this.emit('missing', prev, this.getRange(block.ts));
break;
}
@ -124,6 +129,7 @@ Chain.prototype.add = function add(block) {
// At least one block was added
res = true;
this.block.list.push(block);
this._bloomBlock(block);
if (!this.orphan.map[hash])
break;
@ -148,21 +154,48 @@ Chain.prototype._compress = function compress() {
// Bloom filter rebuilt is needed
this.block.list = this.block.list.slice(-1000);
this.block.bloom.reset();
this.block.merkleBloom.reset();
for (var i = 0; i < this.block.list.length; i++)
this.block.bloom.add(this.block.list[i].hash());
this._bloomBlock(this.block.list[i]);
};
Chain.prototype._bloomBlock = function _bloomBlock(block) {
this.block.bloom.add(block.hash(), 'hex');
for (var i = 0; i < block.hashes.length; i++)
this.block.merkleBloom.add(block.hashes[i], 'hex');
};
Chain.prototype.has = function has(hash) {
return this.probeIndex(hash) || !!this.orphan.map[hash];
};
Chain.prototype.hasMerkle = function hasMerkle(hash) {
if (!this.block.merkleBloom.test(hash, 'hex'))
return false;
hash = utils.toHex(hash);
for (var i = 0; i < this.block.list.length; i++)
if (this.block.list[i].hasMerkle(hash))
return true;
return false;
};
Chain.prototype.get = function get(hash, cb) {
// Cached block found
if (this.block.bloom.test(hash, 'hex')) {
for (var i = 0; i < this.block.list.length; i++)
if (this.block.list[i].hash('hex') === hash)
return cb(this.block.list[i]);
for (var i = 0; i < this.block.list.length; i++) {
if (this.block.list[i].hash('hex') === hash) {
// NOTE: we return right after the statement - so `i` should be valid
// at the time of nextTick call
var self = this;
process.nextTick(function() {
cb(self.block.list[i]);
});
return;
}
}
assert(false);
}
@ -185,11 +218,14 @@ Chain.prototype.covers = function covers(ts) {
return ts >= this.index.ts[0];
};
Chain.prototype.hashesFrom = function hashesFrom(ts, index) {
var pos = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true);
if (pos > 0)
pos--;
return this.index.hashes.slice(pos);
Chain.prototype.hashesInRange = function hashesInRange(start, end) {
var ts = this.index.ts;
var pos = utils.binaryInsert(ts, start - 2 * 3600, compareTs, true);
start = Math.max(0, pos - 1);
var pos = utils.binaryInsert(ts, end + 2 * 3600, compareTs, true);
end = pos;
return this.index.hashes.slice(start, end);
};
Chain.prototype.getLast = function getLast() {

View File

@ -1,4 +1,5 @@
var assert = require('assert');
var async = require('async');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
@ -25,6 +26,7 @@ function Pool(options) {
this.maxRetries = options.maxRetries || 300;
this.requestTimeout = options.requestTimeout || 10000;
this.chain = new bcoin.chain();
this.watchList = [];
this.bloom = new bcoin.bloom(8 * 10 * 1024,
10,
(Math.random() * 0xffffffff) | 0),
@ -37,10 +39,26 @@ function Pool(options) {
load: null
};
this.block = {
lastHash: null,
queue: [],
lastHash: null
};
this.request = {
map: {},
active: 0,
requests: {}
queue: []
};
this.validate = {
// 5 days scan delta for obtaining TXs
delta: 5 * 24 * 3600,
// Minimum verification depth
minDepth: options.minValidateDepth || 1,
// getTx map
map: {},
// Validation cache
cache: [],
cacheSize: 1000
};
this.createConnection = options.createConnection;
@ -58,10 +76,19 @@ Pool.prototype._init = function _init() {
var self = this;
this.chain.on('missing', function(hash, range) {
self._requestBlock(hash, true);
self._request('block', hash, { force: true });
self._scheduleRequests();
self._loadRange(range);
});
setInterval(function() {
console.log('a %d q %d o %d r %d',
self.request.active, self.request.queue.length,
self.chain.orphan.count,
self.chain.request.count);
if (self.chain.request.count === 1)
console.log(Object.keys(self.chain.request.map));
}, 1000);
};
Pool.prototype._addLoader = function _addLoader() {
@ -92,8 +119,7 @@ Pool.prototype._addLoader = function _addLoader() {
function destroy() {
// Chain is full and up-to-date
if (self.block.lastHash === null &&
self.chain.isFull()) {
if (self.block.lastHash === null && self.chain.isFull()) {
clearTimeout(timer);
peer.removeListener('close', onclose);
self._removePeer(peer);
@ -113,7 +139,7 @@ Pool.prototype._addLoader = function _addLoader() {
// Request each block
hashes.forEach(function(hash) {
self._requestBlock(hash);
self._request('block', hash);
});
self._scheduleRequests();
@ -143,7 +169,7 @@ Pool.prototype._loadRange = function _loadRange(range) {
};
Pool.prototype._load = function _load() {
if (this.block.queue.length >= this.load.hwm) {
if (this.request.queue.length >= this.load.hwm) {
this.load.hiReached = true;
return false;
}
@ -196,27 +222,20 @@ Pool.prototype._addPeer = function _addPeer() {
});
peer.on('merkleblock', function(block) {
var has = self.chain.has(block);
self.chain.add(block);
var req = self.block.requests[block.hash('hex')];
if (!req)
return;
assert(!has);
req.finish(block);
self._response(block);
});
peer.on('notfound', function(items) {
items.forEach(function(item) {
if (item.type !== 'filtered')
return;
var req = self.block.requests[utils.toHex(item.hash)];
var req = self.request.map[utils.toHex(item.hash)];
if (req)
req.finish(null);
});
});
peer.on('tx', function(tx) {
self._response(tx);
self.emit('tx', tx);
});
};
@ -235,31 +254,59 @@ Pool.prototype._removePeer = function _removePeer(peer) {
};
Pool.prototype.watch = function watch(id) {
if (typeof id === 'string')
id = utils.toArray(id, 'hex');
if (this.bloom.test(id, 'hex'))
return;
this.watchList.push(utils.toHex(id));
if (id)
this.bloom.add(id);
this.bloom.add(id, 'hex');
if (this.peers.load)
this.peers.load.updateWatch();
for (var i = 0; i < this.peers.block.length; i++)
this.peers.block[i].updateWatch();
};
Pool.prototype.search = function search(id, limit, cb) {
Pool.prototype.unwatch = function unwatch(id) {
if (!this.bloom.test(id, 'hex'))
return;
id = utils.toHex(id);
var index = this.watchList.indexOf(id);
if (index === -1)
return;
this.watchList.splice(index, 1);
// Reset bloom filter
this.bloom.reset();
for (var i = 0; i < this.watchList.length; i++)
this.bloom.add(this.watchList[i], 'hex');
// Resend it to peers
if (this.peers.load)
this.peers.load.updateWatch();
for (var i = 0; i < this.peers.block.length; i++)
this.peers.block[i].updateWatch();
};
Pool.prototype.search = function search(id, range) {
if (typeof id === 'string')
id = utils.toArray(id, 'hex');
// Optional limit argument
if (typeof limit === 'function') {
cb = limit;
limit = null;
}
if (range)
range = { start: range.start, end: range.end };
else
range = { start: 0, end: 0 };
// Last 5 days by default, this covers 1000 blocks that we have in the
// chain by default
if (!limit)
limit = (+new Date / 1000 - 432000);
if (!range.end)
range.end = +new Date / 1000;
if (!range.start)
range.start = +new Date / 1000 - 432000;
var self = this;
var hashes = this.chain.hashesFrom(limit);
var e = new EventEmitter();
var hashes = this.chain.hashesInRange(range.start, range.end);
var waiting = hashes.length;
this.watch(id);
@ -270,36 +317,63 @@ Pool.prototype.search = function search(id, limit, cb) {
// Get block's prev and request it and all of it's parents up to
// the next known block hash
self.chain.get(block.prevBlock, function() {
if (--waiting === 0)
cb();
waiting--;
e.emit('progress', hashes.length - waiting, hashes.length);
if (waiting === 0) {
self.unwatch(id);
e.emit('end');
}
});
});
}, this);
// Empty search
if (hashes.length === 0) {
process.nextTick(function() {
e.emit('end', true);
});
}
return e;
};
Pool.prototype._requestBlock = function _requestBlock(hash, force) {
Pool.prototype._request = function _request(type, hash, options, cb) {
if (typeof hash === 'string')
hash = utils.toArray(hash, 'hex');
// Block should be not in chain, or be requested
if (!force && this.chain.has(hash))
return;
// Optional `force`
if (typeof options === 'function') {
cb = options;
options = {};
}
if (!options)
options = {};
var hex = utils.toHex(hash);
if (this.block.requests[hex])
if (this.request.map[hex])
return this.request.map[hex].addCallback(cb);
// Block should be not in chain, or be requested
if (!options.force && type === 'block' && this.chain.has(hash))
return;
var req = new BlockRequest(this, hex);
this.block.requests[hex] = req;
this.block.queue.push(req);
var req = new LoadRequest(this, type, hex, cb);
req.add(options.noQueue);
};
Pool.prototype._response = function _response(entity) {
var req = this.request.map[entity.hash('hex')];
if (!req)
return;
req.finish(entity);
};
Pool.prototype._scheduleRequests = function _scheduleRequests() {
if (this.block.active > this.parallel / 2)
if (this.request.active > this.parallel / 2)
return;
// No need to wait - already have enough data
if (this.block.queue.length > this.parallel) {
if (this.request.queue.length > this.parallel) {
if (this.load.timer !== null)
clearTimeout(this.load.timer);
this.load.timer = null;
@ -318,17 +392,18 @@ Pool.prototype._scheduleRequests = function _scheduleRequests() {
};
Pool.prototype._doRequests = function _doRequests() {
if (this.block.active >= this.parallel)
if (this.request.active >= this.parallel)
return;
// No peers so far
if (this.peers.block.length === 0)
return;
var above = this.block.queue.length >= this.load.lwm;
var items = this.block.queue.slice(0, this.parallel - this.block.active);
this.block.queue = this.block.queue.slice(items.length);
var below = this.block.queue.length < this.load.lwm;
var queue = this.request.queue;
var above = queue.length >= this.load.lwm;
var items = queue.slice(0, this.parallel - this.request.active);
this.request.queue = queue.slice(items.length);
var below = this.request.queue.length < this.load.lwm;
// Watermark boundary crossed, load more blocks
if (above && below && this.load.hiReached)
@ -345,9 +420,155 @@ Pool.prototype._doRequests = function _doRequests() {
}
};
function BlockRequest(pool, hash) {
Pool.prototype.getTx = function getTx(hash, range, cb) {
hash = utils.toHex(hash);
if (typeof range === 'function') {
cb = range;
range = null;
}
// Do not perform duplicate searches
if (this.validate.map[hash])
return this.validate.map[hash].push(cb);
var cbs = [ cb ];
this.validate.map[hash] = cbs;
// Add request without queueing it to get notification at the time of load
var tx = null;
var finished = false;
var req = this._request('tx', hash, { noQueue: true }, function(t) {
finished = true;
tx = t;
});
// Do incremental search until the TX is found
var delta = this.validate.delta;
// Start from the existing range if given
if (range)
range = { start: range.start, end: range.end };
else
range = { start: (+new Date / 1000) - delta, end: 0 };
var self = this;
doSearch();
function doSearch() {
console.log('Searching for ' + hash, range);
var e = self.search(hash, range);
e.on('end', function(empty) {
if (finished) {
delete self.validate.map[hash];
cbs.forEach(function(cb) {
cb(tx, range);
});
return;
}
// Tried everything, but still no matches
if (empty)
return cb(null);
// Not found yet, continue scanning
range.end = range.start;
range.start -= delta;
if (range.start < 0)
range.start = 0;
doSearch();
});
}
};
Pool.prototype._addValidateCache = function addValidateCache(tx, result) {
this.validate.cache.push({
hash: tx.hash('hex'),
result: result
});
if (this.validate.cache.length > this.validate.cacheSize)
this.validate.cache = this.validate.cache.slice(-this.validate.cacheSize);
};
Pool.prototype._probeValidateCache = function probeValidateCache(tx) {
for (var i = 0; i < this.validate.cache.length; i++) {
var entry = this.validate.cache[i];
if (entry.hash === tx.hash('hex'))
return entry.result;
}
};
Pool.prototype.validateTx = function validateTx(tx, cb, _params) {
// Probe cache first
var result = this._probeValidateCache(tx);
if (result) {
process.nextTick(function() {
cb(null, result);
});
return;
}
if (!_params)
_params = { depth: 0, range: null };
// Propagate range to improve speed of search
var depth = _params.depth;
var range = _params.range;
var result = {
included: this.chain.hasMerkle(tx.hash()),
valid: false
};
console.log('validateTx: ', tx.hash('hex'), depth);
if (depth > this.validate.minDepth && result.included) {
result.valid = true;
process.nextTick(function() {
cb(null, result);
});
return;
}
// Load all inputs and validate them
var self = this;
async.map(tx.inputs, function(input, cb) {
var out = null;
console.log('load input: ', input.out.hash);
self.getTx(input.out.hash, range, function(t, range) {
console.log('got input for: ', tx.hash('hex'), !!t);
out = t;
self.validateTx(out, onSubvalidate, {
depth: depth + 1,
range: range
});
});
function onSubvalidate(err, subres) {
if (err)
return cb(err);
cb(null, {
input: input,
tx: out,
valid: subres.valid
});
}
}, function(err, inputs) {
if (err) {
result.valid = false;
return cb(err, result);
}
self._addValidateCache(tx, result);
console.log(inputs);
});
};
function LoadRequest(pool, type, hash, cb) {
this.pool = pool
this.type = type;
this.hash = hash;
this.cbs = cb ? [ cb ] : [];
this.timer = null;
this.peer = null;
this.ts = +new Date;
@ -359,12 +580,12 @@ function BlockRequest(pool, hash) {
};
}
BlockRequest.prototype.start = function start(peer) {
LoadRequest.prototype.start = function start(peer) {
var self = this;
assert(!this.active);
this.active = true;
this.pool.block.active++;
this.pool.request.active++;
assert(!this.timer);
this.timer = setTimeout(function() {
@ -376,19 +597,31 @@ BlockRequest.prototype.start = function start(peer) {
this.peer = peer;
this.peer.once('close', this.onclose);
var reqType;
if (this.type === 'block')
reqType = 'filtered';
else if (this.type === 'tx')
reqType = 'tx';
return {
type: 'filtered',
type: reqType,
hash: this.hash
};
};
BlockRequest.compare = function compare(a, b) {
LoadRequest.compare = function compare(a, b) {
return a.ts - b.ts;
};
BlockRequest.prototype.clear = function clear() {
LoadRequest.prototype.add = function add(noQueue) {
this.pool.request.map[this.hash] = this;
if (!noQueue)
this.pool.request.queue.push(this);
};
LoadRequest.prototype.clear = function clear() {
assert(this.active);
this.pool.block.active--;
this.pool.request.active--;
this.active = false;
this.peer.removeListener('close', this.onclose);
this.peer = null;
@ -396,9 +629,9 @@ BlockRequest.prototype.clear = function clear() {
this.timer = null;
};
BlockRequest.prototype.retry = function retry() {
LoadRequest.prototype.retry = function retry() {
// Put block into the queue, ensure that the queue is always sorted by ts
utils.binaryInsert(this.pool.block.queue, this, BlockRequest.compare);
utils.binaryInsert(this.pool.request.queue, this, LoadRequest.compare);
var peer = this.peer;
this.clear();
@ -410,21 +643,29 @@ BlockRequest.prototype.retry = function retry() {
this.pool._scheduleRequests();
};
BlockRequest.prototype.finish = function finish() {
LoadRequest.prototype.finish = function finish(entity) {
if (this.active) {
this.clear();
} else {
// It could be that request was never sent to the node, remove it from
// queue and forget about it
var index = this.pool.block.queue.indexOf(this);
var index = this.pool.request.queue.indexOf(this);
if (index !== -1)
this.pool.block.queue.splice(index, 1);
this.pool.request.queue.splice(index, 1);
assert(!this.peer);
assert(!this.timer);
}
delete this.pool.block.requests[this.hash];
delete this.pool.request.map[this.hash];
// We may have some free slots in queue now
this.pool._scheduleRequests();
this.pool.finished++;
this.cbs.forEach(function(cb) {
cb(entity);
});
};
LoadRequest.prototype.addCallback = function addCallback(cb) {
if (cb)
this.cbs.push(cb);
};

View File

@ -23,6 +23,7 @@ function TX(data) {
script: bcoin.script.parse(output.script)
};
});
this.lock = data.lock;
this._hash = null;
this._raw = data._raw || null;
@ -43,3 +44,6 @@ TX.prototype.hash = function hash(enc) {
TX.prototype.render = function render(framer) {
return [];
};
TX.prototype.verify = function verify() {
};

View File

@ -192,6 +192,9 @@ function zero2(word) {
}
function toHex(msg) {
if (typeof msg === 'string')
return msg;
var res = '';
for (var i = 0; i < msg.length; i++)
res += zero2(msg[i].toString(16));

View File

@ -21,6 +21,7 @@
},
"homepage": "https://github.com/indutny/bcoin",
"dependencies": {
"async": "^0.8.0",
"bn.js": "^0.2.0",
"elliptic": "^0.7.0",
"hash.js": "^0.2.0"