From a20cd7edbe467c19a43bfbffbbd60be4011746c0 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sun, 3 Jan 2016 07:14:35 -0800 Subject: [PATCH] refactor pool. improve chains. --- lib/bcoin/chain.js | 68 ++++--- lib/bcoin/fullchain.js | 103 +++++++---- lib/bcoin/peer.js | 12 ++ lib/bcoin/pool.js | 332 +++++++++++++++++++++++------------ lib/bcoin/protocol/framer.js | 4 + 5 files changed, 349 insertions(+), 170 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 52bbc9f1..bb3e624a 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -67,6 +67,30 @@ function Chain(options) { inherits(Chain, EventEmitter); +Chain.codes = { + okay: 0, + newOrphan: 1, + knownOrphan: 2, + forked: 3, + invalid: 4, + badCheckpoint: 4, + unchanged: 5 +}; + +Chain.messages = { + 0: 'Block was added successfully', + 1: 'Block is a new orphan', + 2: 'Block is a known orphan', + 3: 'Block is a greater fork', + 4: 'Block verification failed', + 5: 'Block does not match checkpoint', + 6: 'Chain is unchanged' +}; + +Chain.msg = function msg(code) { + return new Error(Chain.messages[code] || 'Unknown'); +}; + function compareTs(a, b) { return a -b; } @@ -151,26 +175,28 @@ Chain.prototype._probeIndex = function _probeIndex(hash, ts) { Chain.prototype._addIndex = function _addIndex(hash, ts, height) { var self = this; + // Already added if (this._probeIndex(hash, ts)) - return new Error('Already added.'); + return Chain.codes.unchanged; var pos = utils.binaryInsert(this.index.ts, ts, compareTs, true); var checkpoint; - // Avoid duplicates + // Duplicate height if (this.index.hashes[pos] === hash || this.index.hashes[pos - 1] === hash || this.index.hashes[pos + 1] === hash) { - return new Error('Duplicate height.'); + return Chain.codes.unchanged; } + // Fork at checkpoint checkpoint = network.checkpoints[height]; if (checkpoint) { this.emit('checkpoint', height, hash, checkpoint); if (hash !== checkpoint) { this.resetLastCheckpoint(height); this.emit('fork', height, hash, checkpoint); - return new Error('Forked chain at checkpoint.'); + return Chain.codes.badCheckpoint; } } @@ -185,13 +211,15 @@ Chain.prototype._addIndex = function _addIndex(hash, ts, height) { ts: ts, height: height }); + + return Chain.codes.okay; }; Chain.prototype.resetLastCheckpoint = function resetLastCheckpoint(height) { var lastHeight = Object.keys(network.checkpoints).sort().indexOf(height) - 1; if (lastHeight < 0) - i = 0; + lastHeight = 0; this.resetHeight(lastHeight); }; @@ -201,8 +229,7 @@ Chain.prototype.resetHeight = function resetHeight(height) { var index = this.index.heights.indexOf(height); var ahead = this.index.hashes.slice(index + 1); - if (index < 0) - throw new Error('Cannot reset to height of ' + height); + assert(index >= 0); this.block.list.length = 0; this.block.bloom.reset(); @@ -283,15 +310,14 @@ Chain.prototype.add = function add(block) { return; } - var res = false; - var err = null; var initial = block; + var code = Chain.codes.unchanged; var hash, prev, prevProbe, range, hashes; - do { - // No need to revalidate orphans - if (!res && !block.verify()) { - err = new Error('Block verification failed.'); + for (;;) { + // Only validate the initial block (orphans were already validated) + if (block === initial && !block.verify()) { + code = Chain.codes.invalid; break; } @@ -300,7 +326,7 @@ Chain.prototype.add = function add(block) { // If the block is already known to be an orphan if (this.orphan.map[prev]) { - err = new Error('Block is a known orphan.'); + code = Chain.codes.knownOrphan; break; } @@ -308,7 +334,7 @@ Chain.prototype.add = function add(block) { // Remove forked nodes from storage, if shorter chain is detected if (this._killFork(prevProbe)) { - err = new Error('Fork found.'); + code = Chain.codes.forked; break; } @@ -322,17 +348,15 @@ Chain.prototype.add = function add(block) { hashes = this.index.hashes.slice(range.start, range.end + 1); this.emit('missing', prev, hashes, block); + code = Chain.codes.newOrphan; break; } // Validated known block at this point - add it to index - if (prevProbe) { - this._addIndex(hash, block.ts, prevProbe.height + 1); - block.height = prevProbe.height + 1; - } + if (prevProbe) + code = this._addIndex(hash, block.ts, prevProbe.height + 1); // At least one block was added - res = true; this.block.list.push(block); this.block.bloom.add(hash, 'hex'); @@ -347,7 +371,7 @@ Chain.prototype.add = function add(block) { delete this.orphan.bmap[block.hash('hex')]; delete this.orphan.map[hash]; this.orphan.count--; - } while (true); + } // Failsafe for large orphan chains if (this.orphan.count > 10000) { @@ -363,7 +387,7 @@ Chain.prototype.add = function add(block) { // Compress old blocks this._compress(); - return err; + return code; }; Chain.prototype._compress = function compress() { diff --git a/lib/bcoin/fullchain.js b/lib/bcoin/fullchain.js index f2d39cf7..a7eaf636 100644 --- a/lib/bcoin/fullchain.js +++ b/lib/bcoin/fullchain.js @@ -47,6 +47,8 @@ function Chain(options) { lastTs: 0 }; + this.request = new utils.RequestCache(); + this.fromJSON({ v: 1, type: 'chain', @@ -63,6 +65,8 @@ function Chain(options) { ] }); + this.tip = this.index.entries[this.index.entries.length - 1]; + // Last TS after preload, needed for fill percent this.index.lastTs = this.index.entries[this.index.entries.length - 1].ts; @@ -74,6 +78,30 @@ function Chain(options) { inherits(Chain, EventEmitter); +Chain.codes = { + okay: 0, + newOrphan: 1, + knownOrphan: 2, + forked: 3, + invalid: 4, + badCheckpoint: 4, + unchanged: 5 +}; + +Chain.messages = { + 0: 'Block was added successfully', + 1: 'Block is a new orphan', + 2: 'Block is a known orphan', + 3: 'Block is a greater fork', + 4: 'Block verification failed', + 5: 'Block does not match checkpoint', + 6: 'Chain is unchanged' +}; + +Chain.msg = function msg(code) { + return new Error(Chain.messages[code] || 'Unknown'); +}; + Chain.prototype._init = function _init() { var self = this; var s; @@ -111,19 +139,24 @@ Chain.prototype._init = function _init() { Chain.prototype._addIndex = function _addIndex(entry) { var self = this; - if (this.index.entries[entry.hash]) - return new Error('Already added.'); + // Already added + if (this.index.heights[entry.hash] != null) { + assert(this.index.heights[entry.hash] === entry.height); + return Chain.codes.unchanged; + } + // Duplcate height if (this.index.hashes[entry.height] === entry.hash) - return new Error('Duplicate height.'); + return Chain.codes.unchanged; + // Fork at checkpoint checkpoint = network.checkpoints[entry.height]; if (checkpoint) { this.emit('checkpoint', entry.height, entry.hash, checkpoint); if (hash !== checkpoint) { this.resetLastCheckpoint(entry.height); this.emit('fork', entry.height, entry.hash, checkpoint); - return new Error('Forked chain at checkpoint.'); + return Chain.codes.badCheckpoint; } } @@ -131,14 +164,18 @@ Chain.prototype._addIndex = function _addIndex(entry) { this.index.hashes[entry.height] = entry.hash; this.index.heights[entry.hash] = entry.height; + this.tip = this.index.entries[this.index.entries.length - 1]; + this._save(entry); + + return Chain.codes.okay; }; Chain.prototype.resetLastCheckpoint = function resetLastCheckpoint(height) { var lastHeight = Object.keys(network.checkpoints).sort().indexOf(height) - 1; if (lastHeight < 0) - i = 0; + lastHeight = 0; this.resetHeight(lastHeight); }; @@ -147,8 +184,7 @@ Chain.prototype.resetHeight = function resetHeight(height) { var self = this; var ahead = this.index.entries.slice(height + 1); - if (height >= this.index.entries - 1) - throw new Error('Cannot reset to height of ' + height); + assert(height < this.index.entries - 1); this.orphan.map = {}; this.orphan.bmap = {}; @@ -160,6 +196,8 @@ Chain.prototype.resetHeight = function resetHeight(height) { }, {}); this.index.hashes.length = height + 1; + this.tip = this.index.entries[this.index.entries.length - 1]; + this.index.lastTs = Math.min( this.index.lastTs, this.index.entries[this.index.entries.length - 1].ts @@ -185,15 +223,15 @@ Chain.prototype.add = function add(block) { return; } - var res = false; - var err = null; var initial = block; + var code = Chain.codes.unchanged; + var length = this.index.entries.length; var hash, prev, prevProbe, range, i, entry; for (;;) { - // No need to revalidate orphans - if (!res && !block.verify()) { - err = new Error('Block verification failed.'); + // Only validate the initial block (orphans were already validated) + if (block === initial && !block.verify()) { + code = Chain.codes.invalid; break; } @@ -202,7 +240,7 @@ Chain.prototype.add = function add(block) { // If the block is already known to be an orphan if (this.orphan.map[prev]) { - err = new Error('Block is a known orphan.'); + code = Chain.codes.knownOrphan; break; } @@ -213,6 +251,7 @@ Chain.prototype.add = function add(block) { this.orphan.count++; this.orphan.map[prev] = block; this.orphan.bmap[hash] = block; + code = Chain.codes.newOrphan; break; } @@ -225,28 +264,28 @@ Chain.prototype.add = function add(block) { height: i + 1 }); - // Already have block - if (this.index.hashes[entry.height] === hash) - break; + // Add entry if we do not have it (or there is another hash at its height) + if (this.index.hashes[entry.height] !== hash) { + assert(this.index.heights[entry.hash] == null); - assert(this.index.heights[entry.hash] == null); - - // Remove forked nodes from storage, if shorter chain is detected - if (this.index.hashes[entry.height]) { - if (!this.tip) { - this.tip = entry; - } else if (this.tip.chainwork.cmp(entry.chainwork) < 0) { - // Found fork - this.resetHeight(entry.height - 1); - this.tip = entry; + // If we have a block at the same height, use chain with higher work + if (this.index.hashes[entry.height]) { + if (this.tip.chainwork.cmp(entry.chainwork) < 0) { + this.resetHeight(entry.height - 1); + this._addIndex(entry); + code = Chain.codes.forked; + // Breaking here only works because + // we deleted the orphan map in resetHeight. + break; + } } + + // Validated known block at this point - add it to index + code = this._addIndex(entry); } - // Validated known block at this point - add it to index - this._addIndex(entry); - - // At least one block was added - res = true; + // Fullfill request + this.request.fullfill(hash, block); if (!this.orphan.map[hash]) break; @@ -265,7 +304,7 @@ Chain.prototype.add = function add(block) { this.orphan.count = 0; } - return err; + return code; }; Chain.prototype.has = function has(hash, noIndex, cb) { diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index ef6c7878..230bbc49 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -530,13 +530,25 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) { }; Peer.prototype.loadHeaders = function loadHeaders(hashes, stop) { + this.emit('debug', + 'Requesting headers packet from %s with getheaders', + this.address); this._write(this.framer.getHeaders(hashes, stop)); }; Peer.prototype.loadBlocks = function loadBlocks(hashes, stop) { + this.emit('debug', + 'Requesting inv packet from %s with getblocks', + this.address); this._write(this.framer.getBlocks(hashes, stop)); }; +Peer.prototype.loadItems = function loadItems(hashes, stop) { + if (this.pool.options.headers) + return this.loadHeaders(hashes, stop); + return this.loadBlocks(hashes, stop); +}; + /** * Expose */ diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 33ead1d1..66e564c0 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -32,6 +32,7 @@ function Pool(options) { network.set(this.options.network); this.options.fullNode = !!this.options.fullNode; + this.options.headers = !!this.options.headers; this.options.relay = this.options.relay == null ? (this.options.fullNode ? true : false) : this.options.relay; @@ -47,7 +48,9 @@ function Pool(options) { }; this.load = { - timeout: options.loadTimeout || 3000, + // timeout: options.loadTimeout || 3000, + // interval: options.loadInterval || 5000, + timeout: options.loadTimeout || 30000, interval: options.loadInterval || 5000, window: options.loadWindow || 250, lastRange: null, @@ -58,13 +61,6 @@ function Pool(options) { hiReached: false }; - if (this.options.fullNode) { - if (!options.loadTimeout) - this.load.timeout = 30000; - if (!options.loadInterval) - this.load.interval = 5000; - } - this.maxRetries = options.maxRetries || 42; this.requestTimeout = options.requestTimeout || 10000; @@ -179,9 +175,58 @@ Pool.prototype._init = function _init() { }); }; +Pool.prototype._startTimer = function _startTimer() { + var self = this; + + this._stopTimer(); + + function destroy() { + // Chain is full and up-to-date + if (self.chain.isFull()) { + self._stopTimer(); + self.emit('full'); + return; + } + + if (self.peers.load) + self.peers.load.destroy(); + } + + this._timer = setTimeout(destroy, this.load.timeout); +}; + +Pool.prototype._stopTimer = function _stopTimer() { + if (!this._timer) + return; + + clearTimeout(this._timer); + delete this._timer; +}; + +Pool.prototype._startInterval = function _startInterval() { + var self = this; + + this._stopInterval(); + + function load() { + // self._load(); + } + + this._interval = setInterval(load, this.load.interval); +}; + +Pool.prototype._stopInterval = function _stopInterval() { + if (!this._interval) + return; + + clearInterval(this._interval); + delete this._interval; +}; + + Pool.prototype._addLoader = function _addLoader() { var self = this; - var peer, interval, timer; + var peer; if (this.destroyed) return; @@ -201,20 +246,24 @@ Pool.prototype._addLoader = function _addLoader() { self.emit('error', err, peer); }); - peer.once('close', onclose); - function onclose() { - clearTimeout(timer); - clearInterval(interval); + peer.on('debug', function() { + var args = Array.prototype.slice.call(arguments); + self.emit.apply(self, ['debug'].concat(args)); + }); + + peer.once('close', function() { + self._stopInterval(); + self._stopTimer(); self._removePeer(peer); if (self.destroyed) return; self._addLoader(); - } + }); peer.once('ack', function() { peer.updateWatch(); if (!self._load()) - clearTimeout(timer); + self._stopTimer(); }); peer.on('version', function(version) { @@ -223,25 +272,6 @@ Pool.prototype._addLoader = function _addLoader() { self.emit('version', version, peer); }); - function load() { - // self._load(); - } - - interval = setInterval(load, this.load.interval); - - function destroy() { - // Chain is full and up-to-date - if (self.chain.isFull()) { - clearTimeout(timer); - self.emit('full'); - return; - } - - peer.destroy(); - } - - timer = setTimeout(destroy, this.load.timeout); - peer.on('merkleblock', function(block) { self._handleBlock(block, peer); }); @@ -250,67 +280,126 @@ Pool.prototype._addLoader = function _addLoader() { self._handleBlock(block, peer); }); - peer.on('blocks', function(hashes) { - self._handleInv(hashes, peer); - }); + if (self.options.headers) { + peer.on('blocks', function(hashes) { + self._handleInv(hashes, peer); + }); - // Split blocks and request them using multiple peers - peer.on('headers', function(headers) { - var i, header, last, block; + peer.on('headers', function(headers) { + self._handleHeaders(headers, peer); + }); + } else { + peer.on('blocks', function(hashes) { + self._handleBlocks(hashes, peer); + }); + } - if (headers.length === 0) - return; + this._startInterval(); + this._startTimer(); +}; - self.emit('debug', - 'Recieved %s headers from %s', - headers.length, - peer.address); +Pool.prototype._handleHeaders = function _handleHeaders(hashes, peer) { + var i, header, last, block; - self.emit('headers', headers); + assert(this.options.headers); - for (i = 0; i < headers.length; i++) { - block = bcoin.block(headers[i], 'header'); + if (headers.length === 0) + return; - if (last && block.prevBlock !== last.hash('hex')) - break; + this.emit('debug', + 'Recieved %s headers from %s', + headers.length, + peer.address); - if (!block.verify()) - break; + this.emit('headers', headers); - if (!self.chain.hasBlock(block) && !self.chain.hasOrphan(block)) - self._request(self.block.type, block.hash('hex')); + for (i = 0; i < headers.length; i++) { + block = bcoin.block(headers[i], 'header'); - // We could do headers-first: - // self._addIndex(block, peer); + if (last && block.prevBlock !== last.hash('hex')) + break; - // Resolve orphan chain - // if (self.chain.hasOrphan(block)) { - // peer.loadHeaders( - // self.chain.locatorHashes(), - // self.chain.getOrphanRoot(block) - // ); - // continue; - // } + if (!block.verify()) + break; - last = block; + if (!this.chain.has(block)) + this._request(this.block.type, block.hash('hex')); + + // For headers-first: + // this._addIndex(block, peer); + + last = block; + } + + // Restart the getheaders process + if (last && headers.length >= 1999) + peer.loadHeaders(this.chain.locatorHashes(last), null); + + // Push our getdata packet + this._scheduleRequests(); + + // Reset interval to avoid calling getheaders unnecessarily + this._startInterval(); + + // Reset timeout to avoid killing the loader + this._startTimer(); + + this.emit('debug', + 'Requesting %s block packets from %s with getdata', + this.request.active, + peer.address + ); +}; + +Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { + var i, hash; + + assert(!this.options.headers); + + if (hashes.length === 0) + return; + + this.emit('blocks', hashes); + + this.emit('debug', + 'Recieved %s block hashes from %s', + hashes.length, + peer.address); + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + + // Resolve orphan chain + if (this.chain.hasOrphan(hash)) { + peer.loadBlocks( + this.chain.locatorHashes(), + this.chain.getOrphanRoot(hash) + ); + continue; } - // Restart the getheaders process - if (last && headers.length >= 1999) - peer.loadHeaders(self.chain.locatorHashes(last), 0); - // peer.loadHeaders([last.hash('hex')], 0); + // Request block if we don't have it + if (!this.chain.has(hash)) + this._request(this.block.type, hash); + } - // Schedule our requests - self._scheduleRequests(); + // Restart the entire getblocks process + peer.loadBlocks(this.chain.locatorHashes(), null); - // Reset interval to avoid calling getheaders unnecessarily - clearInterval(interval); - interval = setInterval(load, self.load.interval); + // Push our getdata packet + this._scheduleRequests(); - // Reset timeout to avoid killing the loader - clearTimeout(timer); - timer = setTimeout(destroy, self.load.timeout); - }); + // Reset interval to avoid calling getblocks unnecessarily + this._startInterval(); + + // Reset timeout to avoid killing the loader + this._startTimer(); + + this.emit('debug', + 'Requesting %s block packets from %s with getdata', + this.request.active, + peer.address + ); }; Pool.prototype._handleInv = function _handleInv(hashes, peer) { @@ -318,9 +407,16 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) { for (i = 0; i < hashes.length; i++) { hash = utils.toHex(hashes[i]); - if (!this.chain.hasBlock(hash) && !this.chain.hasOrphan(hash)) - this.peers.load.loadHeaders(this.chain.locatorHashes(), hash); + if (!this.chain.has(hash)) { + if (this.options.headers) + this.peers.load.loadHeaders(this.chain.locatorHashes(), hash); + else + this._request(this.block.type, hash); + } } + + if (!this.options.headers) + this._scheduleRequests(); }; Pool.prototype._handleBlock = function _handleBlock(block, peer) { @@ -342,6 +438,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer) { }); } + // Do not use with headers-first: if (!this._addIndex(block, peer)) return; @@ -350,25 +447,25 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer) { Pool.prototype._addIndex = function _addIndex(block, peer) { var self = this; - var hash, size, orphan, err; + var hash, size, orphan, res; hash = block.hash('hex'); size = this.chain.size(); orphan = this.chain.hasOrphan(block); - err = this.chain.add(block); - if (err) - this.emit('chain-error', err, peer); + res = this.chain.add(block); + if (res) + this.emit('chain-error', bcoin.chain.msg(res), peer); if (this.chain.hasOrphan(block)) { // Resolve orphan chain - // peer.loadHeaders( - // this.chain.locatorHashes(), - // this.chain.getOrphanRoot(block) - // ); + peer.loadBlocks( + this.chain.locatorHashes(), + this.chain.getOrphanRoot(block) + ); // Emit our orphan if it is new if (!orphan) - this.emit('orphan', block, peer); + return true; return false; } @@ -425,18 +522,15 @@ Pool.prototype._load = function _load() { this.load.hiReached = false; - if (this.peers.load) { - this.emit('debug', - 'Requesting headers packet from %s with getheaders', - this.peers.load.address); - } - if (!this.peers.load) { this._addLoader(); return true; } - this.peers.load.loadHeaders(this.chain.locatorHashes(), 0); + if (this.options.headers) + this.peers.load.loadHeaders(this.chain.locatorHashes(), null); + else + this.peers.load.loadBlocks(this.chain.locatorHashes(), null); return true; }; @@ -466,6 +560,11 @@ Pool.prototype._addPeer = function _addPeer(backoff) { self.emit('error', err, peer); }); + peer.on('debug', function() { + var args = Array.prototype.slice.call(arguments); + self.emit.apply(self, ['debug'].concat(args)); + }); + peer.once('close', function() { self._removePeer(peer); if (self.destroyed) @@ -925,8 +1024,9 @@ Pool.prototype._request = function _request(type, hash, options, cb) { } // Block should be not in chain, or be requested - // if (!options.force && (type === 'block' || type === 'filtered')) - // return this.chain.has(hash, true, next); + // Do not use with headers-first + if (!options.force && (type === 'block' || type === 'filtered')) + return this.chain.has(hash, true, next); return next(false); }; @@ -988,29 +1088,29 @@ Pool.prototype._doRequests = function _doRequests() { if (above && below && this.load.hiReached) this._load(); - if (this.options.fullNode) { - req = items.map(function(item) { - return item.start(this.peers.load); - }, this); + if (this.options.multiplePeers) { + mapReq = function(item) { + return item.start(this.peers.block[i]); + }; - this.peers.load.getData(req); + // Split list between peers + red = this.redundancy; + count = this.peers.block.length; + split = Math.ceil(items.length * red / count); + for (i = 0, off = 0; i < count; i += red, off += split) { + req = items.slice(off, off + split).map(mapReq, this); + for (j = 0; j < red && i + j < count; j++) + this.peers.block[i + j].getData(req); + } return; } - mapReq = function(item) { - return item.start(this.peers.block[i]); - }; + req = items.map(function(item) { + return item.start(this.peers.load); + }, this); - // Split list between peers - red = this.redundancy; - count = this.peers.block.length; - split = Math.ceil(items.length * red / count); - for (i = 0, off = 0; i < count; i += red, off += split) { - req = items.slice(off, off + split).map(mapReq, this); - for (j = 0; j < red && i + j < count; j++) - this.peers.block[i + j].getData(req); - } + this.peers.load.getData(req); }; Pool.prototype.getBlock = function getBlock(hash, cb) { diff --git a/lib/bcoin/protocol/framer.js b/lib/bcoin/protocol/framer.js index 424fcfa4..55045d3c 100644 --- a/lib/bcoin/protocol/framer.js +++ b/lib/bcoin/protocol/framer.js @@ -201,6 +201,10 @@ Framer.prototype._getBlocks = function _getBlocks(cmd, hashes, stop) { var p = []; var off, i, hash, len; + // getheaders can have a null hash + if (cmd === 'getheaders' && !hashes) + hashes = []; + writeU32(p, constants.version, 0); off = 4 + utils.writeIntv(p, hashes.length, 4); p.length = off + 32 * (hashes.length + 1);