diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index 62ddcd39..af56cb6b 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -7,31 +7,28 @@ function Block(data) { this.type = 'block'; this.version = data.version; - this.prevBlock = data.prevBlock; - this.merkleRoot = data.merkleRoot; + this.prevBlock = utils.toHex(data.prevBlock); + this.merkleRoot = utils.toHex(data.merkleRoot); this.ts = data.ts; this.bits = data.bits; this.nonce = data.nonce; this._hash = null; - this._hexHash = null; } module.exports = Block; Block.prototype.hash = function hash(enc) { // Hash it - if (!this._hash) { - this._hash = utils.dsha256(this.abbr()); - this._hexHash = utils.toHex(this._hash); - } - return enc === 'hex' ? this._hexHash : this._hash; + if (!this._hash) + this._hash = utils.toHex(utils.dsha256(this.abbr())); + return enc === 'hex' ? this._hash : utils.toArray(this._hash, 'hex'); }; Block.prototype.abbr = function abbr() { var res = new Array(80); utils.writeU32(res, this.version, 0); - utils.copy(this.prevBlock, res, 4); - utils.copy(this.merkleRoot, res, 36); + utils.copy(utils.toArray(this.prevBlock, 'hex'), res, 4); + utils.copy(utils.toArray(this.merkleRoot, 'hex'), res, 36); utils.writeU32(res, this.ts, 68); utils.writeU32(res, this.bits, 72); utils.writeU32(res, this.nonce, 76); diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 3933d323..5be6f75d 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -2,11 +2,14 @@ var bcoin = require('../bcoin'); var constants = bcoin.protocol.constants; var utils = bcoin.utils; -function Chain() { +function Chain(options) { if (!(this instanceof Chain)) - return new Chain(); + return new Chain(options); - this.chain = []; + this.options = options || {}; + this.blocks = []; + this.hashes = []; + this.ts = []; this.orphan = { map: {}, count: 0 @@ -25,7 +28,7 @@ Chain.prototype.add = function add(block) { break; var rhash = block.hash(); - var prev = utils.toHex(block.prevBlock); + var prev = block.prevBlock; // Add orphan if (this.last && prev !== this.last) { @@ -40,10 +43,15 @@ Chain.prototype.add = function add(block) { var hash = block.hash('hex'); this.bloom.add(rhash); - this.chain.push(block); + this.blocks.push(block); + this.hashes.push(hash); + this.ts.push(block.ts); this.last = hash; res = true; + // Compress old blocks + this._compress(); + // We have orphan child for this block - add it to chain if (this.orphan.map[hash]) { block = this.orphan.map[hash]; @@ -58,11 +66,18 @@ Chain.prototype.add = function add(block) { return res; }; +Chain.prototype._compress = function compress() { + // Store only last 1000 blocks, others will be requested if needed + if (this.blocks.length < 1000) + return; + + this.blocks = this.blocks.slice(this.blocks.length - 1000); +}; + Chain.prototype.getLast = function getLast() { - return this.chain[this.chain.length - 1]; + return this.blocks[this.blocks.length - 1]; }; Chain.prototype.has = function has(hash) { - hash = utils.toHex(hash); return this.bloom.test(hash); }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 5fa7f772..ba362b3c 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -84,6 +84,8 @@ Peer.prototype._init = function init() { self.ack = true; self.emit('ack'); }); + + this.setMaxListeners(4000); }; Peer.prototype.broadcast = function broadcast(items) { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 1543b78a..a3f6ad79 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -7,18 +7,18 @@ function Pool(options) { return new Pool(options); this.options = options || {}; - this.size = options.size || 3; - this.parallel = options.parallel || 1200; + this.size = options.size || 64; + this.parallel = options.parallel || 8000; this.load = { timeout: options.loadTimeout || 5000, window: options.loadWindow || 2500, timer: null, - lwm: options.lwm || 1000, - hwm: options.hwm || 32000, + lwm: options.lwm || this.parallel * 2, + hwm: options.hwm || this.parallel * 8, hiReached: false }; - this.maxRetries = options.maxRetries || 3000; - this.requestTimeout = options.requestTimeout || 30000; + this.maxRetries = options.maxRetries || 50; + this.requestTimeout = options.requestTimeout || 10000; this.chain = new bcoin.chain(); this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, @@ -47,13 +47,20 @@ function Pool(options) { var self = this; setInterval(function() { console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d d %d', - self.chain.chain.length, self.chain.orphan.count, + self.chain.ts.length, self.chain.orphan.count, self.block.active, self.block.queue.length, Object.keys(self.block.requests).length, process.memoryUsage().heapUsed, - self.finished - self.chain.chain.length - self.chain.orphan.count); + self.finished - self.chain.ts.length - self.chain.orphan.count); }, 5000); + + process.on('SIGUSR2', function() { + require('fs').writeFileSync('/tmp/1.json', JSON.stringify({ + hashes: self.chain.hashes, + ts: self.chain.ts + })); + }); } module.exports = Pool; @@ -87,6 +94,7 @@ Pool.prototype._addLoader = function _addLoader() { }); function destroy() { + self.block.lastSeen = null; peer.destroy(); } var timer = setTimeout(destroy, this.load.timeout); @@ -128,6 +136,7 @@ Pool.prototype._load = function load() { else hash = this.chain.getLast().hash(); + console.log('Loading from: ' + utils.toHex(hash.slice().reverse())); this.peers.load.loadBlocks(hash); return true; @@ -165,10 +174,12 @@ Pool.prototype._addPeer = function _addPeer() { }); peer.on('merkleblock', function(block) { + var has = self.chain.has(block); self.chain.add(block); var req = self.block.requests[block.hash('hex')]; if (!req) return; + assert(!has); req.finish(block); }); @@ -211,11 +222,15 @@ Pool.prototype.watch = function watch(id) { Pool.prototype._requestBlock = function _requestBlock(hash) { // Block is already in chain, or being requested - if (this.chain.has(hash) || this.block.requests[utils.toHex(hash)]) + if (this.chain.has(hash)) return; - var req = new BlockRequest(this, hash); - this.block.requests[utils.toHex(hash)] = req; + var hex = utils.toHex(hash); + if (this.block.requests[hex]) + return; + + var req = new BlockRequest(this, hex); + this.block.requests[hex] = req; this.block.queue.push(req); }; @@ -273,6 +288,11 @@ function BlockRequest(pool, hash) { this.peer = null; this.ts = +new Date; this.active = false; + + var self = this; + this.onclose = function onclose() { + self.retry(); + }; } function binaryInsert(list, item, compare) { @@ -299,18 +319,20 @@ function binaryInsert(list, item, compare) { BlockRequest.prototype.start = function start(peer) { var self = this; + assert(!this.active); - this.peer = peer; - if (this.timer) - clearTimeout(this.timer); + this.active = true; + this.pool.block.active++; + + assert(!this.timer); this.timer = setTimeout(function() { self.timer = null; self.retry(); }, this.pool.requestTimeout); - if (!this.active) - this.pool.block.active++; - this.active = true; + assert(!this.peer); + this.peer = peer; + this.peer.once('close', this.onclose); return { type: 'filtered', @@ -322,37 +344,43 @@ BlockRequest.compare = function compare(a, b) { return a.ts - b.ts; }; -BlockRequest.prototype.retry = function retry() { +BlockRequest.prototype.clear = function clear() { assert(this.active); this.pool.block.active--; + this.active = false; + this.peer.removeListener('close', this.onclose); + this.peer = null; + clearTimeout(this.timer); + this.timer = null; +}; +BlockRequest.prototype.retry = function retry() { // Put block into the queue, ensure that the queue is always sorted by ts binaryInsert(this.pool.block.queue, this, BlockRequest.compare); - this.active = false; + var peer = this.peer; + this.clear(); + + // Kill peer, if it misbehaves + if (++peer._retry > this.pool.maxRetries) + peer.destroy(); // And schedule requesting blocks again this.pool._scheduleRequests(); - - // Kill peer, if it misbehaves - if (++this.peer._retry > this.pool._maxRetries) - this.peer.destroy(); }; BlockRequest.prototype.finish = function finish() { if (this.active) { - this.pool.block.active--; - this.active = false; + this.clear(); } else { // It could be that request was never sent to the node, remove it from // queue and forget about it var index = this.pool.block.queue.indexOf(this); if (index !== -1) this.pool.block.queue.splice(index, 1); + assert(!this.peer); + assert(!this.timer); } - delete this.pool.block.requests[utils.toHex(this.hash)]; - if (this.timer) - clearTimeout(this.timer); - this.timer = null; + delete this.pool.block.requests[this.hash]; // We may have some free slots in queue now this.pool._scheduleRequests(); diff --git a/lib/bcoin/protocol/framer.js b/lib/bcoin/protocol/framer.js index 7ca55c68..f5e8a764 100644 --- a/lib/bcoin/protocol/framer.js +++ b/lib/bcoin/protocol/framer.js @@ -128,6 +128,8 @@ Framer.prototype._inv = function _inv(command, items) { // Hash var hash = items[i].hash; + if (typeof hash === 'string') + hash = utils.toArray(hash, 'hex'); assert.equal(hash.length, 32); res = res.concat(hash); diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 0c5727f5..7a6a135f 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -25,8 +25,18 @@ function toArray(msg, enc) { msg = msg.replace(/[^a-z0-9]+/ig, ''); if (msg.length % 2 != 0) msg = '0' + msg; - for (var i = 0; i < msg.length; i += 2) - res.push(parseInt(msg[i] + msg[i + 1], 16)); + for (var i = 0; i < msg.length; i += 8) { + var slice = msg.slice(i, i + 8); + var num = parseInt(slice, 16); + + if (slice.length === 8) + res.push((num >>> 24) & 0xff); + if (slice.length >= 6) + res.push((num >>> 16) & 0xff); + if (slice.length >= 4) + res.push((num >>> 8) & 0xff); + res.push(num & 0xff); + } } } else { for (var i = 0; i < msg.length; i++)