diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index e7f870fa..6c38b230 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -48,3 +48,7 @@ Block.prototype.verify = function verify() { Block.prototype.render = function render(framer) { return []; }; + +Block.prototype.hasMerkle = function hasMerkle(hash) { + return this.hashes.indexOf(hash) !== -1; +}; diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 30a8db70..c9635a33 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -15,6 +15,11 @@ function Chain(options) { this.options = options || {}; this.block = { list: [], + + // Bloom filter for all merkle trees + merkleBloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeed), + + // Bloom filter for all known blocks bloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeef) }; this.orphan = { @@ -95,15 +100,15 @@ Chain.prototype.add = function add(block) { var hash = block.hash('hex'); var prev = block.prevBlock; - // If previous block wasn't ever seen - add current to orphans + // If the block is already known to be an orphan if (this.orphan.map[prev]) break; + + // If previous block wasn't ever seen - add current to orphans if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) { this.orphan.count++; this.orphan.map[prev] = block; - // Add block to bloom filter, as we now have it in memory - this.block.bloom.add(hash, 'hex'); this.emit('missing', prev, this.getRange(block.ts)); break; } @@ -124,6 +129,7 @@ Chain.prototype.add = function add(block) { // At least one block was added res = true; this.block.list.push(block); + this._bloomBlock(block); if (!this.orphan.map[hash]) break; @@ -148,21 +154,48 @@ Chain.prototype._compress = function compress() { // Bloom filter rebuilt is needed this.block.list = this.block.list.slice(-1000); this.block.bloom.reset(); + this.block.merkleBloom.reset(); for (var i = 0; i < this.block.list.length; i++) - this.block.bloom.add(this.block.list[i].hash()); + this._bloomBlock(this.block.list[i]); +}; + +Chain.prototype._bloomBlock = function _bloomBlock(block) { + this.block.bloom.add(block.hash(), 'hex'); + for (var i = 0; i < block.hashes.length; i++) + this.block.merkleBloom.add(block.hashes[i], 'hex'); }; Chain.prototype.has = function has(hash) { return this.probeIndex(hash) || !!this.orphan.map[hash]; }; +Chain.prototype.hasMerkle = function hasMerkle(hash) { + if (!this.block.merkleBloom.test(hash, 'hex')) + return false; + + hash = utils.toHex(hash); + for (var i = 0; i < this.block.list.length; i++) + if (this.block.list[i].hasMerkle(hash)) + return true; + + return false; +}; + Chain.prototype.get = function get(hash, cb) { // Cached block found if (this.block.bloom.test(hash, 'hex')) { - for (var i = 0; i < this.block.list.length; i++) - if (this.block.list[i].hash('hex') === hash) - return cb(this.block.list[i]); + for (var i = 0; i < this.block.list.length; i++) { + if (this.block.list[i].hash('hex') === hash) { + // NOTE: we return right after the statement - so `i` should be valid + // at the time of nextTick call + var self = this; + process.nextTick(function() { + cb(self.block.list[i]); + }); + return; + } + } assert(false); } @@ -185,11 +218,14 @@ Chain.prototype.covers = function covers(ts) { return ts >= this.index.ts[0]; }; -Chain.prototype.hashesFrom = function hashesFrom(ts, index) { - var pos = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true); - if (pos > 0) - pos--; - return this.index.hashes.slice(pos); +Chain.prototype.hashesInRange = function hashesInRange(start, end) { + var ts = this.index.ts; + + var pos = utils.binaryInsert(ts, start - 2 * 3600, compareTs, true); + start = Math.max(0, pos - 1); + var pos = utils.binaryInsert(ts, end + 2 * 3600, compareTs, true); + end = pos; + return this.index.hashes.slice(start, end); }; Chain.prototype.getLast = function getLast() { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 38018abc..6ca0d6a3 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -1,4 +1,5 @@ var assert = require('assert'); +var async = require('async'); var util = require('util'); var EventEmitter = require('events').EventEmitter; @@ -25,6 +26,7 @@ function Pool(options) { this.maxRetries = options.maxRetries || 300; this.requestTimeout = options.requestTimeout || 10000; this.chain = new bcoin.chain(); + this.watchList = []; this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, (Math.random() * 0xffffffff) | 0), @@ -37,10 +39,26 @@ function Pool(options) { load: null }; this.block = { - lastHash: null, - queue: [], + lastHash: null + }; + this.request = { + map: {}, active: 0, - requests: {} + queue: [] + }; + this.validate = { + // 5 days scan delta for obtaining TXs + delta: 5 * 24 * 3600, + + // Minimum verification depth + minDepth: options.minValidateDepth || 1, + + // getTx map + map: {}, + + // Validation cache + cache: [], + cacheSize: 1000 }; this.createConnection = options.createConnection; @@ -58,10 +76,19 @@ Pool.prototype._init = function _init() { var self = this; this.chain.on('missing', function(hash, range) { - self._requestBlock(hash, true); + self._request('block', hash, { force: true }); self._scheduleRequests(); self._loadRange(range); }); + + setInterval(function() { + console.log('a %d q %d o %d r %d', + self.request.active, self.request.queue.length, + self.chain.orphan.count, + self.chain.request.count); + if (self.chain.request.count === 1) + console.log(Object.keys(self.chain.request.map)); + }, 1000); }; Pool.prototype._addLoader = function _addLoader() { @@ -92,8 +119,7 @@ Pool.prototype._addLoader = function _addLoader() { function destroy() { // Chain is full and up-to-date - if (self.block.lastHash === null && - self.chain.isFull()) { + if (self.block.lastHash === null && self.chain.isFull()) { clearTimeout(timer); peer.removeListener('close', onclose); self._removePeer(peer); @@ -113,7 +139,7 @@ Pool.prototype._addLoader = function _addLoader() { // Request each block hashes.forEach(function(hash) { - self._requestBlock(hash); + self._request('block', hash); }); self._scheduleRequests(); @@ -143,7 +169,7 @@ Pool.prototype._loadRange = function _loadRange(range) { }; Pool.prototype._load = function _load() { - if (this.block.queue.length >= this.load.hwm) { + if (this.request.queue.length >= this.load.hwm) { this.load.hiReached = true; return false; } @@ -196,27 +222,20 @@ Pool.prototype._addPeer = function _addPeer() { }); peer.on('merkleblock', function(block) { - var has = self.chain.has(block); self.chain.add(block); - var req = self.block.requests[block.hash('hex')]; - if (!req) - return; - assert(!has); - req.finish(block); + self._response(block); }); peer.on('notfound', function(items) { items.forEach(function(item) { - if (item.type !== 'filtered') - return; - - var req = self.block.requests[utils.toHex(item.hash)]; + var req = self.request.map[utils.toHex(item.hash)]; if (req) req.finish(null); }); }); peer.on('tx', function(tx) { + self._response(tx); self.emit('tx', tx); }); }; @@ -235,31 +254,59 @@ Pool.prototype._removePeer = function _removePeer(peer) { }; Pool.prototype.watch = function watch(id) { - if (typeof id === 'string') - id = utils.toArray(id, 'hex'); + if (this.bloom.test(id, 'hex')) + return; + + this.watchList.push(utils.toHex(id)); if (id) - this.bloom.add(id); + this.bloom.add(id, 'hex'); + if (this.peers.load) + this.peers.load.updateWatch(); for (var i = 0; i < this.peers.block.length; i++) this.peers.block[i].updateWatch(); }; -Pool.prototype.search = function search(id, limit, cb) { +Pool.prototype.unwatch = function unwatch(id) { + if (!this.bloom.test(id, 'hex')) + return; + + id = utils.toHex(id); + var index = this.watchList.indexOf(id); + if (index === -1) + return; + this.watchList.splice(index, 1); + + // Reset bloom filter + this.bloom.reset(); + for (var i = 0; i < this.watchList.length; i++) + this.bloom.add(this.watchList[i], 'hex'); + + // Resend it to peers + if (this.peers.load) + this.peers.load.updateWatch(); + for (var i = 0; i < this.peers.block.length; i++) + this.peers.block[i].updateWatch(); +}; + +Pool.prototype.search = function search(id, range) { if (typeof id === 'string') id = utils.toArray(id, 'hex'); - // Optional limit argument - if (typeof limit === 'function') { - cb = limit; - limit = null; - } + if (range) + range = { start: range.start, end: range.end }; + else + range = { start: 0, end: 0 }; // Last 5 days by default, this covers 1000 blocks that we have in the // chain by default - if (!limit) - limit = (+new Date / 1000 - 432000); + if (!range.end) + range.end = +new Date / 1000; + if (!range.start) + range.start = +new Date / 1000 - 432000; var self = this; - var hashes = this.chain.hashesFrom(limit); + var e = new EventEmitter(); + var hashes = this.chain.hashesInRange(range.start, range.end); var waiting = hashes.length; this.watch(id); @@ -270,36 +317,63 @@ Pool.prototype.search = function search(id, limit, cb) { // Get block's prev and request it and all of it's parents up to // the next known block hash self.chain.get(block.prevBlock, function() { - if (--waiting === 0) - cb(); + waiting--; + e.emit('progress', hashes.length - waiting, hashes.length); + if (waiting === 0) { + self.unwatch(id); + e.emit('end'); + } }); }); }, this); + + // Empty search + if (hashes.length === 0) { + process.nextTick(function() { + e.emit('end', true); + }); + } + + return e; }; -Pool.prototype._requestBlock = function _requestBlock(hash, force) { +Pool.prototype._request = function _request(type, hash, options, cb) { if (typeof hash === 'string') hash = utils.toArray(hash, 'hex'); - // Block should be not in chain, or be requested - if (!force && this.chain.has(hash)) - return; + // Optional `force` + if (typeof options === 'function') { + cb = options; + options = {}; + } + if (!options) + options = {}; var hex = utils.toHex(hash); - if (this.block.requests[hex]) + if (this.request.map[hex]) + return this.request.map[hex].addCallback(cb); + + // Block should be not in chain, or be requested + if (!options.force && type === 'block' && this.chain.has(hash)) return; - var req = new BlockRequest(this, hex); - this.block.requests[hex] = req; - this.block.queue.push(req); + var req = new LoadRequest(this, type, hex, cb); + req.add(options.noQueue); +}; + +Pool.prototype._response = function _response(entity) { + var req = this.request.map[entity.hash('hex')]; + if (!req) + return; + req.finish(entity); }; Pool.prototype._scheduleRequests = function _scheduleRequests() { - if (this.block.active > this.parallel / 2) + if (this.request.active > this.parallel / 2) return; // No need to wait - already have enough data - if (this.block.queue.length > this.parallel) { + if (this.request.queue.length > this.parallel) { if (this.load.timer !== null) clearTimeout(this.load.timer); this.load.timer = null; @@ -318,17 +392,18 @@ Pool.prototype._scheduleRequests = function _scheduleRequests() { }; Pool.prototype._doRequests = function _doRequests() { - if (this.block.active >= this.parallel) + if (this.request.active >= this.parallel) return; // No peers so far if (this.peers.block.length === 0) return; - var above = this.block.queue.length >= this.load.lwm; - var items = this.block.queue.slice(0, this.parallel - this.block.active); - this.block.queue = this.block.queue.slice(items.length); - var below = this.block.queue.length < this.load.lwm; + var queue = this.request.queue; + var above = queue.length >= this.load.lwm; + var items = queue.slice(0, this.parallel - this.request.active); + this.request.queue = queue.slice(items.length); + var below = this.request.queue.length < this.load.lwm; // Watermark boundary crossed, load more blocks if (above && below && this.load.hiReached) @@ -345,9 +420,155 @@ Pool.prototype._doRequests = function _doRequests() { } }; -function BlockRequest(pool, hash) { +Pool.prototype.getTx = function getTx(hash, range, cb) { + hash = utils.toHex(hash); + + if (typeof range === 'function') { + cb = range; + range = null; + } + + // Do not perform duplicate searches + if (this.validate.map[hash]) + return this.validate.map[hash].push(cb); + + var cbs = [ cb ]; + this.validate.map[hash] = cbs; + // Add request without queueing it to get notification at the time of load + var tx = null; + var finished = false; + var req = this._request('tx', hash, { noQueue: true }, function(t) { + finished = true; + tx = t; + }); + + // Do incremental search until the TX is found + var delta = this.validate.delta; + + // Start from the existing range if given + if (range) + range = { start: range.start, end: range.end }; + else + range = { start: (+new Date / 1000) - delta, end: 0 }; + + var self = this; + doSearch(); + + function doSearch() { + console.log('Searching for ' + hash, range); + var e = self.search(hash, range); + e.on('end', function(empty) { + if (finished) { + delete self.validate.map[hash]; + cbs.forEach(function(cb) { + cb(tx, range); + }); + return; + } + + // Tried everything, but still no matches + if (empty) + return cb(null); + + // Not found yet, continue scanning + range.end = range.start; + range.start -= delta; + if (range.start < 0) + range.start = 0; + + doSearch(); + }); + } +}; + +Pool.prototype._addValidateCache = function addValidateCache(tx, result) { + this.validate.cache.push({ + hash: tx.hash('hex'), + result: result + }); + if (this.validate.cache.length > this.validate.cacheSize) + this.validate.cache = this.validate.cache.slice(-this.validate.cacheSize); +}; + +Pool.prototype._probeValidateCache = function probeValidateCache(tx) { + for (var i = 0; i < this.validate.cache.length; i++) { + var entry = this.validate.cache[i]; + if (entry.hash === tx.hash('hex')) + return entry.result; + } +}; + +Pool.prototype.validateTx = function validateTx(tx, cb, _params) { + // Probe cache first + var result = this._probeValidateCache(tx); + if (result) { + process.nextTick(function() { + cb(null, result); + }); + return; + } + + if (!_params) + _params = { depth: 0, range: null }; + + // Propagate range to improve speed of search + var depth = _params.depth; + var range = _params.range; + + var result = { + included: this.chain.hasMerkle(tx.hash()), + valid: false + }; + + console.log('validateTx: ', tx.hash('hex'), depth); + if (depth > this.validate.minDepth && result.included) { + result.valid = true; + process.nextTick(function() { + cb(null, result); + }); + return; + } + + // Load all inputs and validate them + var self = this; + async.map(tx.inputs, function(input, cb) { + var out = null; + console.log('load input: ', input.out.hash); + self.getTx(input.out.hash, range, function(t, range) { + console.log('got input for: ', tx.hash('hex'), !!t); + out = t; + self.validateTx(out, onSubvalidate, { + depth: depth + 1, + range: range + }); + }); + + function onSubvalidate(err, subres) { + if (err) + return cb(err); + + cb(null, { + input: input, + tx: out, + valid: subres.valid + }); + } + }, function(err, inputs) { + if (err) { + result.valid = false; + return cb(err, result); + } + + self._addValidateCache(tx, result); + console.log(inputs); + }); +}; + +function LoadRequest(pool, type, hash, cb) { this.pool = pool + this.type = type; this.hash = hash; + this.cbs = cb ? [ cb ] : []; this.timer = null; this.peer = null; this.ts = +new Date; @@ -359,12 +580,12 @@ function BlockRequest(pool, hash) { }; } -BlockRequest.prototype.start = function start(peer) { +LoadRequest.prototype.start = function start(peer) { var self = this; assert(!this.active); this.active = true; - this.pool.block.active++; + this.pool.request.active++; assert(!this.timer); this.timer = setTimeout(function() { @@ -376,19 +597,31 @@ BlockRequest.prototype.start = function start(peer) { this.peer = peer; this.peer.once('close', this.onclose); + var reqType; + if (this.type === 'block') + reqType = 'filtered'; + else if (this.type === 'tx') + reqType = 'tx'; + return { - type: 'filtered', + type: reqType, hash: this.hash }; }; -BlockRequest.compare = function compare(a, b) { +LoadRequest.compare = function compare(a, b) { return a.ts - b.ts; }; -BlockRequest.prototype.clear = function clear() { +LoadRequest.prototype.add = function add(noQueue) { + this.pool.request.map[this.hash] = this; + if (!noQueue) + this.pool.request.queue.push(this); +}; + +LoadRequest.prototype.clear = function clear() { assert(this.active); - this.pool.block.active--; + this.pool.request.active--; this.active = false; this.peer.removeListener('close', this.onclose); this.peer = null; @@ -396,9 +629,9 @@ BlockRequest.prototype.clear = function clear() { this.timer = null; }; -BlockRequest.prototype.retry = function retry() { +LoadRequest.prototype.retry = function retry() { // Put block into the queue, ensure that the queue is always sorted by ts - utils.binaryInsert(this.pool.block.queue, this, BlockRequest.compare); + utils.binaryInsert(this.pool.request.queue, this, LoadRequest.compare); var peer = this.peer; this.clear(); @@ -410,21 +643,29 @@ BlockRequest.prototype.retry = function retry() { this.pool._scheduleRequests(); }; -BlockRequest.prototype.finish = function finish() { +LoadRequest.prototype.finish = function finish(entity) { if (this.active) { this.clear(); } else { // It could be that request was never sent to the node, remove it from // queue and forget about it - var index = this.pool.block.queue.indexOf(this); + var index = this.pool.request.queue.indexOf(this); if (index !== -1) - this.pool.block.queue.splice(index, 1); + this.pool.request.queue.splice(index, 1); assert(!this.peer); assert(!this.timer); } - delete this.pool.block.requests[this.hash]; + delete this.pool.request.map[this.hash]; // We may have some free slots in queue now this.pool._scheduleRequests(); - this.pool.finished++; + + this.cbs.forEach(function(cb) { + cb(entity); + }); +}; + +LoadRequest.prototype.addCallback = function addCallback(cb) { + if (cb) + this.cbs.push(cb); }; diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index 4e20c148..86a79e1e 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -23,6 +23,7 @@ function TX(data) { script: bcoin.script.parse(output.script) }; }); + this.lock = data.lock; this._hash = null; this._raw = data._raw || null; @@ -43,3 +44,6 @@ TX.prototype.hash = function hash(enc) { TX.prototype.render = function render(framer) { return []; }; + +TX.prototype.verify = function verify() { +}; diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 148a82cd..25db902f 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -192,6 +192,9 @@ function zero2(word) { } function toHex(msg) { + if (typeof msg === 'string') + return msg; + var res = ''; for (var i = 0; i < msg.length; i++) res += zero2(msg[i].toString(16)); diff --git a/package.json b/package.json index d0928215..5bc27f04 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ }, "homepage": "https://github.com/indutny/bcoin", "dependencies": { + "async": "^0.8.0", "bn.js": "^0.2.0", "elliptic": "^0.7.0", "hash.js": "^0.2.0"