diff --git a/lib/bcoin/chaindb.js b/lib/bcoin/chaindb.js index 5751da29..e06d2135 100644 --- a/lib/bcoin/chaindb.js +++ b/lib/bcoin/chaindb.js @@ -96,7 +96,7 @@ function ChainDB(chain, options) { // Key size: 68b (* 2) this.coinWindow = ((165 * 1024 + 5000 * 9) + (5000 * 68 * 2)) * 5; - this.coinCache = new bcoin.lru(this.coinWindow); + this.coinCache = new NullCache(this.coinWindow); this.cacheHash = new bcoin.lru(this.cacheWindow, 1); this.cacheHeight = new bcoin.lru(this.cacheWindow, 1); diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 4f2f288c..0912f463 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -95,6 +95,8 @@ function Peer(pool, options) { this.invFilter = new bcoin.bloom.rolling(50000, 0.000001); this.lastBlock = null; this.waiting = 0; + this.syncSent = false; + this.loader = options.loader || false; this.challenge = null; this.lastPong = -1; @@ -178,7 +180,7 @@ Peer.prototype._init = function init() { }); this.parser.on('error', function(err) { - self.sendReject(null, 'malformed', 'error parsing message', 10); + self.reject(null, 'malformed', 'error parsing message', 10); self._error(err, true); }); }; @@ -237,6 +239,13 @@ Peer.prototype._onconnect = function _onconnect() { if (self.pool.feeRate !== -1) self.setFeeRate(self.pool.feeRate); + // Start syncing the chain. + self.sync(); + + // Ask for the mempool if we're synced. + if (self.loader && self.pool.synced) + self.sendMempool(); + // Finally we can let the pool know // that this peer is ready to go. self.emit('ack'); @@ -476,8 +485,7 @@ Peer.prototype.updateWatch = function updateWatch() { if (!this.pool.options.spv) return; - if (this.ack) - this.write(this.framer.filterLoad(this.pool.spvFilter)); + this.write(this.framer.filterLoad(this.pool.spvFilter)); }; /** @@ -1581,7 +1589,7 @@ Peer.prototype.sendAlert = function sendAlert(details) { * @param {Hash?} stop - Hash to stop at. */ -Peer.prototype.getHeaders = function getHeaders(locator, stop) { +Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) { bcoin.debug( 'Requesting headers packet from peer with getheaders (%s).', this.hostname); @@ -1600,7 +1608,7 @@ Peer.prototype.getHeaders = function getHeaders(locator, stop) { * @param {Hash?} stop - Hash to stop at. */ -Peer.prototype.getBlocks = function getBlocks(locator, stop) { +Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) { bcoin.debug( 'Requesting inv packet from peer with getblocks (%s).', this.hostname); @@ -1617,7 +1625,7 @@ Peer.prototype.getBlocks = function getBlocks(locator, stop) { * Send `mempool` to peer. */ -Peer.prototype.getMempool = function getMempool() { +Peer.prototype.sendMempool = function sendMempool() { bcoin.debug( 'Requesting inv packet from peer with mempool (%s).', this.hostname); @@ -1630,7 +1638,7 @@ Peer.prototype.getMempool = function getMempool() { * @param {Object} details - See {@link Framer.reject}. */ -Peer.prototype.reject = function reject(details) { +Peer.prototype.sendReject = function sendReject(details) { bcoin.debug( 'Sending reject packet to peer (%s).', this.hostname); @@ -1665,8 +1673,144 @@ Peer.prototype.setMisbehavior = function setMisbehavior(score) { * @param {Number} score */ -Peer.prototype.sendReject = function sendReject(obj, code, reason, score) { - return this.pool.reject(this, obj, code, reason, score); +Peer.prototype.reject = function reject(obj, code, reason, score) { + var type; + + if (obj) { + type = (obj instanceof bcoin.tx) ? 'tx' : 'block'; + + bcoin.debug('Rejecting %s %s (%s): ccode=%s reason=%s.', + type, obj.rhash, this.hostname, code, reason); + + this.sendReject({ + message: type, + ccode: code, + reason: reason, + data: obj.hash() + }); + + this.pool.rejects.add(obj.hash()); + } else { + bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.', + this.hostname, code, reason); + + this.sendReject({ + ccode: code, + reason: reason, + data: null + }); + } + + if (score != null) + this.setMisbehavior(score); +}; + +/** + * Send `getblocks` to peer after building + * locator and resolving orphan root. + * @param {Hash} tip - Tip to build chain locator from. + * @param {Hash} orphan - Orphan hash to resolve. + * @param {Function} callback + */ + +Peer.prototype.resolveOrphan = function resolveOrphan(tip, orphan, callback) { + var self = this; + + callback = utils.ensure(callback); + + assert(orphan); + + this.chain.getLocator(tip, function(err, locator) { + if (err) + return callback(err); + + orphan = self.chain.getOrphanRoot(orphan); + + // Was probably resolved. + if (!orphan) { + bcoin.debug('Orphan root was already resolved.'); + return callback(); + } + + self.sendGetBlocks(locator, orphan.root); + + callback(); + }); +}; + +/** + * Send `getheaders` to peer after building locator. + * @param {Hash} tip - Tip to build chain locator from. + * @param {Hash?} stop + * @param {Function} callback + */ + +Peer.prototype.getHeaders = function getHeaders(tip, stop, callback) { + var self = this; + + callback = utils.ensure(callback); + + this.chain.getLocator(tip, function(err, locator) { + if (err) + return callback(err); + + self.sendGetHeaders(locator, stop); + + callback(); + }); +}; + +/** + * Send `getblocks` to peer after building locator. + * @param {Hash} tip - Tip hash to build chain locator from. + * @param {Hash?} stop + * @param {Function} callback + */ + +Peer.prototype.getBlocks = function getBlocks(tip, stop, callback) { + var self = this; + + callback = utils.ensure(callback); + + this.chain.getLocator(tip, function(err, locator) { + if (err) + return callback(err); + + self.sendGetBlocks(locator, stop); + + callback(); + }); +}; + +/** + * Start syncing from peer. + * @param {Function} callback + */ + +Peer.prototype.sync = function sync(callback) { + var tip; + + if (!this.pool.syncing) + return; + + if (this.syncSent) + return; + + if (!this.loader) { + if (this.chain.tip.ts <= bcoin.now() - 24 * 60 * 60) + return; + } + + this.syncSent = true; + + if (this.options.headers) { + if (!this.chain.tip.isGenesis()) + tip = this.chain.tip.prevBlock; + + return this.getHeaders(tip, null, callback); + } + + return this.getBlocks(null, null, callback); }; /** diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 255e9ca6..da5a86a6 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -292,10 +292,19 @@ Pool.prototype._init = function _init() { this.chain.on('full', function() { self._stopTimer(); self._stopInterval(); - if (!self.synced && self.peers.load) - self.peers.load.getMempool(); + + if (!self.synced) { + // Ask loader for a mempool snapshot. + if (self.peers.load) + self.peers.load.sendMempool(); + + // Ask all peers for their latest blocks. + self.sync(); + } + self.synced = true; self.emit('full'); + bcoin.debug('Chain is fully synced (height=%d).', self.chain.height); }); @@ -323,82 +332,6 @@ Pool.prototype._init = function _init() { }); }; -/** - * Send `getblocks` to peer after building locator. - * @param {Peer} peer - * @param {Hash} top - Top hash to build chain locator from. - * @param {Hash?} stop - * @param {Function} callback - */ - -Pool.prototype.getBlocks = function getBlocks(peer, top, stop, callback) { - callback = utils.ensure(callback); - - this.chain.getLocator(top, function(err, locator) { - if (err) - return callback(err); - - peer.getBlocks(locator, stop); - - callback(); - }); -}; - -/** - * Send `getblocks` to peer after building - * locator and resolving orphan root. - * @param {Peer} peer - * @param {Hash} top - Top hash to build chain locator from. - * @param {Hash} orphan - Orphan hash to resolve. - * @param {Function} callback - */ - -Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan, callback) { - var self = this; - - callback = utils.ensure(callback); - - assert(orphan); - - this.chain.getLocator(top, function(err, locator) { - if (err) - return callback(err); - - orphan = self.chain.getOrphanRoot(orphan); - - // Was probably resolved. - if (!orphan) { - bcoin.debug('Orphan root was already resolved.'); - return callback(); - } - - peer.getBlocks(locator, orphan.root); - - callback(); - }); -}; - -/** - * Send `getheaders` to peer after building locator. - * @param {Peer} peer - * @param {Hash} top - Top hash to build chain locator from. - * @param {Hash?} stop - * @param {Function} callback - */ - -Pool.prototype.getHeaders = function getHeaders(peer, top, stop, callback) { - callback = utils.ensure(callback); - - this.chain.getLocator(top, function(err, locator) { - if (err) - return callback(err); - - peer.getHeaders(locator, stop); - - callback(); - }); -}; - /** * Start listening on a server socket. * @param {Function} callback @@ -549,6 +482,7 @@ Pool.prototype._addLoader = function _addLoader() { peer = this._createPeer({ host: this.getLoaderHost(), + loader: true, network: true, spv: this.options.spv, witness: this.options.witness @@ -577,16 +511,6 @@ Pool.prototype._addLoader = function _addLoader() { self._addLoader(); }); - peer.once('ack', function() { - if (self.synced) - peer.getMempool(); - - if (!self.syncing) - return; - - self._load(); - }); - peer.on('merkleblock', function(block) { if (!self.options.spv) return; @@ -673,8 +597,15 @@ Pool.prototype.startSync = function startSync() { return; } - if (this.peers.load.ack) - this._load(); + this.sync(); +}; + +Pool.prototype.sync = function sync() { + if (this.peers.load) + this.peers.load.sync(); + + for (i = 0; i < this.peers.regular.length; i++) + this.peers.regular[i].sync(); }; /** @@ -692,6 +623,12 @@ Pool.prototype.stopSync = function stopSync() { this._stopInterval(); this._stopTimer(); + + if (this.peers.load) + this.peers.load.syncSent = false; + + for (i = 0; i < this.peers.regular.length; i++) + this.peers.regular[i].syncSent = false; }; Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) { @@ -725,12 +662,12 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) var hash = header.hash('hex'); if (last && header.prevBlock !== last) { - peer.sendReject(header, 'invalid', 'bad-prevblk', 100); + peer.reject(header, 'invalid', 'bad-prevblk', 100); return next(new Error('Bad header chain.')); } if (!header.verify(ret)) { - peer.sendReject(header, 'invalid', ret.reason, 100); + peer.reject(header, 'invalid', ret.reason, 100); self.rejects.add(header.hash()); return next(new VerifyError(header, 'invalid', ret.reason, ret.score)); } @@ -753,7 +690,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) // simply tries to find the latest block in // the peer's chain. if (last && headers.length === 2000) - self.getHeaders(peer, last, null, callback); + peer.getHeaders(last, null, callback); else callback(); }); @@ -793,7 +730,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { // Resolve orphan chain. if (self.chain.hasOrphan(hash)) { bcoin.debug('Peer sent a hash that is already a known orphan.'); - return self.resolveOrphan(peer, null, hash, next); + return peer.resolveOrphan(null, hash, next); } self.getData(peer, self.block.type, hash, function(err, exists) { @@ -809,7 +746,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { // the hashContinue on the remote node). if (exists && i === hashes.length - 1) { // Request more hashes: - self.getBlocks(peer, hash, null, next); + peer.getBlocks(hash, null, next); // Re-download the block (traditional method): // self.getData(peer, self.block.type, hash, true, next); return; @@ -840,7 +777,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) { return this._handleBlocks(hashes, peer, callback); utils.forEachSerial(hashes, function(hash, next) { - self.getHeaders(peer, null, hash, next); + peer.getHeaders(null, hash, next); }, function(err) { if (err) return callback(err); @@ -878,14 +815,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { } if (err.score !== -1) - peer.sendReject(block, err.code, err.reason, err.score); + peer.reject(block, err.code, err.reason, err.score); if (err.reason === 'bad-prevblk') { if (self.options.headers) { peer.setMisbehavior(10); return callback(err); } - return self.resolveOrphan(peer, null, block.hash('hex'), function(e) { + return peer.resolveOrphan(null, block.hash('hex'), function(e) { self.scheduleRequests(peer); return callback(e || err); }); @@ -925,21 +862,6 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { }); }; -Pool.prototype._load = function _load() { - if (!this.syncing) - return; - - if (!this.peers.load) { - this._addLoader(); - return; - } - - if (this.options.headers) - this.getHeaders(this.peers.load, null, null); - else - this.getBlocks(this.peers.load, null, null); -}; - /** * Send `mempool` to all peers. */ @@ -948,10 +870,10 @@ Pool.prototype.getMempool = function getMempool() { var i; if (this.peers.load) - this.peers.load.getMempool(); + this.peers.load.sendMempool(); for (i = 0; i < this.peers.regular.length; i++) - this.peers.regular[i].getMempool(); + this.peers.regular[i].sendMempool(); }; Pool.prototype._createPeer = function _createPeer(options) { @@ -960,6 +882,7 @@ Pool.prototype._createPeer = function _createPeer(options) { var peer = new bcoin.peer(this, { host: options.host, createSocket: this.options.createSocket, + loader: options.loader, relay: this.options.relay, socket: options.socket, network: options.network, @@ -1153,7 +1076,7 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { if (err) { if (err.type === 'VerifyError') { if (err.score !== -1) - peer.sendReject(tx, err.code, err.reason, err.score); + peer.reject(tx, err.code, err.reason, err.score); self.rejects.add(tx.hash()); return callback(err); } @@ -1282,18 +1205,6 @@ Pool.prototype._addPeer = function _addPeer() { }); }; -Pool.prototype.bestPeer = function bestPeer() { - return this.peers.regular.reduce(function(best, peer) { - if (!peer.version || !peer.socket) - return; - - if (!best || peer.version.height > best.version.height) - return peer; - - return best; - }, null); -}; - Pool.prototype._removePeer = function _removePeer(peer) { utils.binaryRemove(this.peers.pending, peer, compare); utils.binaryRemove(this.peers.regular, peer, compare); @@ -1302,14 +1213,15 @@ Pool.prototype._removePeer = function _removePeer(peer) { delete this.peers.map[peer.host]; if (this.peers.load === peer) { - Object.keys(this.request.map).forEach(function(hash) { - var item = this.request.map[hash]; - if (item.peer === peer) - item.finish(new Error('Peer closed.')); - }, this); bcoin.debug('Removed loader peer (%s).', peer.hostname); this.peers.load = null; } + + Object.keys(this.request.map).forEach(function(hash) { + var item = this.request.map[hash]; + if (item.peer === peer) + item.finish(new Error('Peer closed.')); + }, this); }; /** @@ -1926,48 +1838,6 @@ Pool.prototype.isMisbehaving = function isMisbehaving(host) { return false; }; -/** - * Send a `reject` packet to peer. - * @see Framer.reject - * @param {Peer} peer - * @param {(TX|Block)?} obj - * @param {String} code - cccode. - * @param {String} reason - * @param {Number} score - */ - -Pool.prototype.reject = function reject(peer, obj, code, reason, score) { - var type; - - if (obj) { - type = (obj instanceof bcoin.tx) ? 'tx' : 'block'; - - bcoin.debug('Rejecting %s %s (%s): ccode=%s reason=%s.', - type, obj.rhash, peer.hostname, code, reason); - - peer.reject({ - message: type, - ccode: code, - reason: reason, - data: obj.hash() - }); - - this.rejects.add(obj.hash()); - } else { - bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.', - peer.hostname, code, reason); - - peer.reject({ - ccode: code, - reason: reason, - data: null - }); - } - - if (score != null) - peer.setMisbehavior(score); -}; - /** * Attempt to retrieve external IP from icanhazip.com. * @param {Function} callback