From 14326088133e5da5a70cf9bb82e0c397a47ff6e0 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 1 May 2014 22:43:38 +0400 Subject: [PATCH] poll: wip --- lib/bcoin/chain.js | 16 +- lib/bcoin/peer.js | 75 +------- lib/bcoin/pool.js | 353 ++++++++++++++++++++++++++--------- lib/bcoin/protocol/framer.js | 5 +- 4 files changed, 281 insertions(+), 168 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 1a1b0973..596b5508 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -11,7 +11,7 @@ function Chain() { map: {}, count: 0 }; - this.map = {}; + this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0); this.last = null; this.add(new bcoin.block(constants.genesis)); } @@ -20,7 +20,7 @@ module.exports = Chain; Chain.prototype.add = function add(block) { var res = false; do { - var hash = block.hash('hex'); + var rhash = block.hash(); var prev = utils.toHex(block.prevBlock); // No need to revalidate orphans @@ -29,14 +29,16 @@ Chain.prototype.add = function add(block) { // Add orphan if (this.last && prev !== this.last) { - if (!this.map[hash] && !this.orphan.map[prev]) { + if (!this.bloom.test(rhash) && !this.orphan.map[prev]) { this.orphan.count++; this.orphan.map[prev] = block; } break; } - this.map[hash] = block; + var hash = block.hash('hex'); + + this.bloom.add(rhash); this.chain.push(block); this.last = hash; res = true; @@ -51,13 +53,15 @@ Chain.prototype.add = function add(block) { break; } while (true); + + return res; }; Chain.prototype.getLast = function getLast() { return this.chain[this.chain.length - 1]; }; -Chain.prototype.has = function hash(hash) { +Chain.prototype.has = function has(hash) { hash = utils.toHex(hash); - return !!this.map[hash] && !!this.orphan.map[hash]; + return this.bloom.test(hash) || !!this.orphan.map[hash]; }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 821c1133..77d240a6 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -122,6 +122,7 @@ Peer.prototype.destroy = function destroy() { this.destroyed = true; this.socket.destroy(); this.socket = null; + this.emit('close'); // Clean-up timeouts Object.keys(this._broadcast.map).forEach(function(key) { @@ -193,65 +194,8 @@ Peer.prototype._res = function _res(cmd, payload) { return false; }; -Peer.prototype.getData = function getData(items, cb) { - var map = {}; - var waiting = items.length; - items.forEach(function(item) { - map[item.hash.join('.')] = { - item: item, - once: false, - result: null - }; - }); - - var self = this; - function markEntry(hash, result) { - var entry = map[hash.join('.')]; - if (!entry || entry.once) - return false; - - entry.once = true; - entry.result = result; - waiting--; - return true; - } - +Peer.prototype.getData = function getData(items) { this._write(this.framer.getData(items)); - // Process all incoming data, until all data is returned - var req = this._req(null, function(err, payload, cmd) { - if (err) - return done(err); - - var ok = false; - if (cmd === 'notfound') { - ok = true; - payload.forEach(function(item) { - if (!markEntry(item.hash, null)) - ok = false; - }); - } else if (payload.hash) { - ok = markEntry(payload.hash(), payload); - } - if (!ok) - return self._request.skip; - - if (waiting === 0) - done(); - else - return self._request.cont; - }); - - // Just for debugging purposes - req._getDataMap = map; - - function done(err) { - if (err) - return cb(err); - - cb(null, items.map(function(item) { - return map[item.hash.join('.')].result; - })); - } }; Peer.prototype._onPacket = function onPacket(packet) { @@ -277,8 +221,8 @@ Peer.prototype._onPacket = function onPacket(packet) { payload = bcoin.tx(payload); if (this._res(cmd, payload)) { return; - } else if (cmd === 'tx') { - console.log('Omg', this.socket._handle.fd, cmd, payload.hash('hex')); + } else { + this.emit(cmd, payload); } }; @@ -335,18 +279,9 @@ Peer.prototype._handleInv = function handleInv(items) { if (txs.length === 0) return; - var self = this; - this.getData(txs, function(err, data) { - if (err) - return self._error(err); - - data.forEach(function(item) { - console.log(item.type, item.hash('hex')); - }); - }); + this.getData(txs); }; Peer.prototype.loadBlocks = function loadBlocks(hash) { - console.log(this.socket._handle.fd, 'loading from: ', utils.toHex(hash), this.chain.chain.length, this.chain.orphan.count); this._write(this.framer.getBlocks([ hash ])); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index b5975d7f..233dae2a 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -8,146 +8,319 @@ function Pool(options) { this.options = options || {}; this.size = options.size || 16; - this.redundancy = options.redundancy || 3; - this.parallel = options.parallel || this.size; - this.pendingBlocks = {}; + this.parallel = options.parallel || 4000; + this.loadTimeout = options.loadTimeout || 3000; + this.loadDelay = options.loadDelay || 750; + this.loadTimer = null; + this.loadWatermark = { + lo: options.lwm || 1000, + hi: options.hwm || 4000 + }; + this.maxRetries = options.maxRetries || 1000; + this.requestTimeout = options.requestTimeout || 30000; this.chain = new bcoin.chain(); this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, (Math.random() * 0xffffffff) | 0), - this.peers = []; - this.pending = []; + this.peers = { + // Peers that are loading blocks themselves + block: [], + // Peers that are still connecting + pending: [], + // Peers that are loading block ids + load: null + }; + this.block = { + lastSeen: null, + queue: [], + active: 0, + requests: {} + }; - // Peers that are loading block ids - this.loadPeers = []; this.createConnection = options.createConnection; assert(this.createConnection); this._init(); + + var self = this; + setInterval(function() { + console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d', + self.chain.chain.length, self.chain.orphan.count, + self.block.active, + self.block.queue.length, + Object.keys(self.block.requests).length, + process.memoryUsage().heapUsed); + }, 5000); } module.exports = Pool; Pool.prototype._init = function _init() { + this._addLoader(); for (var i = 0; i < this.size; i++) this._addPeer(); }; -Pool.prototype._addPeer = function _addPeer() { - if (this.peers.length + this.pending.length >= this.size) - return; - +Pool.prototype._addLoader = function _addLoader() { + assert(this.peers.load === null); var socket = this.createConnection(); var peer = bcoin.peer(this, socket, this.options.peer); - this.pending.push(peer); + this.peers.load = peer; - var load = false; - if (this.loadPeers.length < this.redundancy) { - this.loadPeers.push(peer); - load = true; - } - - // Create new peer on failure var self = this; - peer.once('error', function(err) { + peer.once('error', function() { + // Just ignore, it will result in `close` anyway + }); + + peer.once('close', function() { + clearTimeout(timer); self._removePeer(peer); - self._addPeer(); + self._addLoader(); }); peer.once('ack', function() { - var i = self.pending.indexOf(peer); - if (i !== -1) { - self.pending.splice(i, 1); - self.peers.push(peer); - } - peer.updateWatch(); - if (load) - peer.loadBlocks(self.chain.getLast().hash()); + if (!self._load()) + clearTimeout(timer); }); + function destroy() { + peer.destroy(); + } + var timer = setTimeout(destroy, this.loadTimeout); + // Split blocks and request them using multiple peers peer.on('blocks', function(hashes) { if (hashes.length === 0) return; - self._requestBlocks(hashes); - if (load) - peer.loadBlocks(hashes[hashes.length - 1]); + + // Request each block + hashes.forEach(function(hash) { + self._requestBlock(hash); + }); + + self._scheduleRequests(); + + self.block.lastSeen = hashes[hashes.length - 1]; + + clearTimeout(timer); + // Reinstantiate timeout + if (self._load()) + timer = setTimeout(destroy, self.loadTimeout); + }); +}; + +Pool.prototype._load = function load() { + if (this.block.queue.length >= this.loadWatermark.hi) + return false; + + // Load more blocks, starting from last hash + var hash; + if (this.block.lastSeen) + hash = this.block.lastSeen; + else + hash = this.chain.getLast().hash(); + this.peers.load.loadBlocks(hash); + + return true; +}; + +Pool.prototype._addPeer = function _addPeer() { + if (this.peers.block.length + this.peers.pending.length >= this.size) + return; + + var socket = this.createConnection(); + var peer = bcoin.peer(this, socket, this.options.peer); + this.peers.pending.push(peer); + + peer._retry = 0; + + // Create new peer on failure + var self = this; + peer.once('error', function(err) { + // Just ignore, it will result in `close` anyway + }); + + peer.once('close', function() { + self._removePeer(peer); + self._addPeer(); + }); + + peer.once('ack', function() { + var i = self.peers.pending.indexOf(peer); + if (i !== -1) { + self.peers.pending.splice(i, 1); + self.peers.block.push(peer); + } + + peer.updateWatch(); + }); + + peer.on('merkleblock', function(block) { + self.chain.add(block); + var req = self.block.requests[block.hash('hex')]; + if (!req) + return; + req.finish(block); + }); + + peer.on('tx', function(tx) { + console.log('got tx', tx.hash('hex')); }); }; Pool.prototype._removePeer = function _removePeer(peer) { - var i = this.pending.indexOf(peer); + var i = this.peers.pending.indexOf(peer); if (i !== -1) - this.pending.splice(i, 1); + this.peers.pending.splice(i, 1); - i = this.peers.indexOf(peer); + i = this.peers.block.indexOf(peer); if (i !== -1) - this.peers.splice(i, 1); + this.peers.block.splice(i, 1); - i = this.loadPeers.indexOf(peer); - if (i !== -1) - this.loadPeers.splice(i, 1); + if (this.peers.load === peer) + this.peers.load = null; }; Pool.prototype.watch = function watch(id) { if (id) this.bloom.add(id); - for (var i = 0; i < this.peers.length; i++) - this.peers[i].updateWatch(); + for (var i = 0; i < this.peers.block.length; i++) + this.peers.block[i].updateWatch(); }; -Pool.prototype._getPeers = function _getPeers() { - var res = []; +Pool.prototype._requestBlock = function _requestBlock(hash) { + // Block is already in chain, or being requested + if (this.chain.has(hash) || this.block.requests[utils.toHex(hash)]) + return; - if (this.peers.length <= this.redundancy) - return this.peers.slice(); - - for (var i = 0; i < this.redundancy; i++) { - var peer = this.peers[(Math.random() * this.peers.length) | 0]; - if (res.indexOf(peer) !== -1) - continue; - - res.push(peer); - } - - return res; + var req = new BlockRequest(this, hash); + this.block.requests[utils.toHex(hash)] = req; + this.block.queue.push(req); }; -Pool.prototype._requestBlocks = function _requestBlocks(hashes, force) { - // Do not request blocks that either already in chain, or are - // already requested from some of the peers - hashes = hashes.filter(function(hash) { - return !this.chain.has(hash) && - (force || !this.pendingBlocks[utils.toHex(hash)]); - }, this).map(function(hash) { - this.pendingBlocks[utils.toHex(hash)] = true; - return { type: 'filtered', hash: hash }; - }, this); - - // Split blocks into chunks and send each chunk to some peers - var chunks = []; - var count = Math.ceil(hashes.length / this.parallel); - for (var i = 0; i < hashes.length; i += count) - chunks.push(hashes.slice(i, i + count)); +Pool.prototype._scheduleRequests = function _scheduleRequests() { + if (this.loadTimer !== null) + return; var self = this; - chunks.forEach(function(chunk) { - var peers = self._getPeers(); - peers.forEach(function(peer) { - peer.getData(chunk, function(err, blocks) { - // Re-request blocks on failure - if (err) { - return self._requestBlocks(chunk.map(function(item) { - return item.hash; - }), true); - } - - // Add blocks to chain on success - blocks.forEach(function(block) { - delete self.pendingBlocks[block.hash('hex')]; - self.chain.add(block); - }); - }); - }); - }); + this.loadTimer = setTimeout(function() { + self.loadTimer = null; + self._doRequests(); + }, this.loadDelay); +}; + +Pool.prototype._doRequests = function _doRequests() { + if (this.block.active >= this.parallel) + return; + + var above = this.block.queue.length >= this.loadWatermark.lo; + var items = this.block.queue.slice(0, this.parallel - this.block.active); + this.block.queue = this.block.queue.slice(items.length); + var below = this.block.queue.length < this.loadWatermark.lo; + + // Watermark boundary crossed, load more blocks + if (above && below) + this._load(); + + // Split list between nodes + var count = this.peers.block.length; + var split = Math.ceil(items.length / count); + for (var i = 0, off = 0; i < count; i++, off += split) { + var peer = this.peers.block[i]; + peer.getData(items.slice(off, off + split).map(function(item) { + return item.start(peer); + })); + } +}; + +function BlockRequest(pool, hash) { + this.pool = pool + this.hash = hash; + this.timer = null; + this.peer = null; + this.ts = +new Date; + this.active = false; +} + +function binaryInsert(list, item, compare) { + var start = 0, + end = list.length; + + while (start < end) { + var pos = (start + end) >> 1; + var cmp = compare(item, list[pos]); + + if (cmp === 0) { + start = pos; + end = pos; + break; + } else if (cmp < 0) { + end = pos; + } else { + start = pos + 1; + } + } + + list.splice(start, 0, item); +} + +BlockRequest.prototype.start = function start(peer) { + var self = this; + + this.peer = peer; + if (this.timer) + clearTimeout(this.timer); + this.timer = setTimeout(function() { + self.timer = null; + self.retry(); + }, this.pool.requestTimeout); + + if (!this.active) + this.pool.block.active++; + this.active = true; + + return { + type: 'filtered', + hash: this.hash + }; +}; + +BlockRequest.compare = function compare(a, b) { + return a.ts - b.ts; +}; + +BlockRequest.prototype.retry = function retry() { + assert(this.active); + this.pool.block.active--; + + // Put block into the queue, ensure that the queue is always sorted by ts + binaryInsert(this.pool.block.queue, this, BlockRequest.compare); + this.active = false; + + // And schedule requesting blocks again + this.pool._scheduleRequests(); + + // Kill peer, if it misbehaves + if (++this.peer._retry > this.pool._maxRetries) + this.peer.destroy(); +}; + +BlockRequest.prototype.finish = function finish(block) { + if (this.active) { + this.pool.block.active--; + this.active = false; + } else { + // It could be that request was never sent to the node, remove it from + // queue and forget about it + var index = this.pool.block.queue.indexOf(this); + if (index !== -1) + this.pool.block.queue.splice(index, 1); + } + delete this.pool.block.requests[block.hash('hex')]; + if (this.timer) + clearTimeout(this.timer); + this.timer = null; + + // We may have some free slots in queue now + this.pool._scheduleRequests(); }; diff --git a/lib/bcoin/protocol/framer.js b/lib/bcoin/protocol/framer.js index ff775d33..7ca55c68 100644 --- a/lib/bcoin/protocol/framer.js +++ b/lib/bcoin/protocol/framer.js @@ -75,8 +75,8 @@ Framer.prototype.version = function version(packet) { this._addr(p, 46); // Nonce, very dramatic - writeU32(p, 0xdeadbeef, 72); - writeU32(p, 0xabbadead, 76); + writeU32(p, (Math.random() * 0xffffffff) | 0, 72); + writeU32(p, (Math.random() * 0xffffffff) | 0, 76); // No user-agent p[80] = 0; @@ -120,6 +120,7 @@ function varint(arr, value, off) { Framer.prototype._inv = function _inv(command, items) { var res = []; var off = varint(res, items.length, 0); + assert(items.length <= 50000); for (var i = 0; i < items.length; i++) { // Type