From c3985640366db620897e3bb48e240aa217ffd3f8 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sat, 21 May 2016 09:51:33 -0700 Subject: [PATCH] refactor peer and pool. --- lib/bcoin/abstractblock.js | 12 +++ lib/bcoin/block.js | 4 +- lib/bcoin/compactblock.js | 2 +- lib/bcoin/merkleblock.js | 2 +- lib/bcoin/mtx.js | 5 +- lib/bcoin/peer.js | 163 +++++++++++++++++++++---------------- lib/bcoin/pool.js | 136 +++++++++++++++++-------------- lib/bcoin/tx.js | 18 +++- lib/bcoin/types.js | 6 ++ 9 files changed, 204 insertions(+), 144 deletions(-) diff --git a/lib/bcoin/abstractblock.js b/lib/bcoin/abstractblock.js index 590e2999..c1823547 100644 --- a/lib/bcoin/abstractblock.js +++ b/lib/bcoin/abstractblock.js @@ -157,6 +157,18 @@ AbstractBlock.prototype.__defineGetter__('rhash', function() { return utils.revHex(this.hash('hex')); }); +/** + * Convert the block to an inv item. + * @returns {InvItem} + */ + +AbstractBlock.prototype.toInv = function toInv() { + return { + type: constants.inv.BLOCK, + hash: this.hash() + }; +}; + /* * Expose */ diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index f4b0117b..5efd0fcc 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -689,7 +689,7 @@ Block.prototype.toMerkle = function toMerkle(filter) { }; /** - * Test an object to see if it is a Block. + * Test whether an object is a Block. * @param {Object} obj * @returns {Boolean} */ @@ -697,7 +697,7 @@ Block.prototype.toMerkle = function toMerkle(filter) { Block.isBlock = function isBlock(obj) { return obj && typeof obj.merkleRoot === 'string' - && typeof obj.toCompact === 'function'; + && typeof obj.getCommitmentHash === 'function'; }; /* diff --git a/lib/bcoin/compactblock.js b/lib/bcoin/compactblock.js index 874a3b89..3289951f 100644 --- a/lib/bcoin/compactblock.js +++ b/lib/bcoin/compactblock.js @@ -107,7 +107,7 @@ CompactBlock.prototype.toBlock = function toBlock() { }; /** - * Test an object to see if it is a CompactBlock. + * Test whether an object is a CompactBlock. * @param {Object} obj * @returns {Boolean} */ diff --git a/lib/bcoin/merkleblock.js b/lib/bcoin/merkleblock.js index 4affab7e..b9bc8b52 100644 --- a/lib/bcoin/merkleblock.js +++ b/lib/bcoin/merkleblock.js @@ -391,7 +391,7 @@ MerkleBlock.fromBlock = function fromBlock(block, bloom) { }; /** - * Test an object to see if it is a MerkleBlock object. + * Test whether an object is a MerkleBlock. * @param {Object} obj * @returns {Boolean} */ diff --git a/lib/bcoin/mtx.js b/lib/bcoin/mtx.js index 105d9c91..8d789d78 100644 --- a/lib/bcoin/mtx.js +++ b/lib/bcoin/mtx.js @@ -1275,7 +1275,7 @@ MTX.prototype.toTX = function toTX() { }; /** - * Test an object to see if it is a TX. + * Test whether an object is an MTX. * @param {Object} obj * @returns {Boolean} */ @@ -1283,8 +1283,7 @@ MTX.prototype.toTX = function toTX() { MTX.isMTX = function isMTX(obj) { return obj && Array.isArray(obj.inputs) - && typeof obj.ps === 'number' - && typeof obj.changeIndex === 'number' + && typeof obj.locktime === 'number' && typeof obj.scriptInput === 'function'; }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 486b9954..e5cceca4 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -24,7 +24,6 @@ var constants = bcoin.protocol.constants; * priority (i.e. a loader). * @param {Chain} options.chain * @param {Mempool} options.mempool - * @param {Bloom} options.bloom - The _local_ bloom filter. * @param {Number?} options.ts - Time at which peer was discovered (unix time). * @param {net.Socket?} options.socket * @param {Seed?} options.seed - Host to connect to. @@ -37,7 +36,6 @@ var constants = bcoin.protocol.constants; * @property {Framer} framer * @property {Chain} chain * @property {Mempool} mempool - * @property {Bloom} bloom * @property {Object?} version - Version packet payload. * @property {Boolean} destroyed * @property {Boolean} ack - Whether verack has been received. @@ -49,7 +47,7 @@ var constants = bcoin.protocol.constants; * either notified via service bits or deprecated `havewitness` packet. * @property {Hash?} hashContinue - The block hash at which to continue * the sync for the peer. - * @property {Bloom?} filter - The _peer's_ bloom filter. + * @property {Bloom?} spvFilter - The _peer's_ bloom spvFilter. * @property {Boolean} relay - Whether to relay transactions * immediately to the peer. * @property {BN} challenge - Local nonce. @@ -84,7 +82,6 @@ function Peer(pool, options) { this.priority = this.options.priority; this.chain = this.pool.chain; this.mempool = this.pool.mempool; - this.bloom = this.pool.bloom; this.network = this.chain.network; this.parser = new bcoin.protocol.parser({ network: this.network }); this.framer = new bcoin.protocol.framer({ network: this.network }); @@ -96,10 +93,10 @@ function Peer(pool, options) { this.sendHeaders = false; this.haveWitness = false; this.hashContinue = null; - this.filter = null; + this.spvFilter = null; this.relay = true; this.localNonce = utils.nonce(); - this.filterRate = -1; + this.feeRate = -1; this.addrFilter = new bcoin.bloom.rolling(5000, 0.001); this.invFilter = new bcoin.bloom.rolling(50000, 0.000001); @@ -187,52 +184,71 @@ Peer.prototype._init = function init() { }); this.parser.on('error', function(err) { - bcoin.debug('Parse error:'); - bcoin.debug(err.stack + ''); - self.sendReject(null, 'malformed', 'error parsing message', 100); + self.sendReject(null, 'malformed', 'error parsing message', 1); self._error(err); }); - this.request('verack', function(err, payload) { + this.request('verack', function callee(err) { if (err) { self._error(err); self.destroy(); return; } - self.ack = true; - self.emit('ack'); - self.ts = utils.now(); + // Wait for _their_ version. + if (!self.version) { + bcoin.debug( + 'Peer sent a verack without a version (%s).', + self.hostname); + self.request('version', callee); + return; + } + // Setup the ping interval. self.ping.timer = setInterval(function() { self.sendPing(); }, self.ping.interval); + // Ask for headers-only. if (self.options.headers) { if (self.version && self.version.version > 70012) self.write(self.framer.sendHeaders()); } + // Let them know we support segwit (old + // segwit3 nodes require this instead + // of service bits). if (self.options.witness) { if (self.version && self.version.version >= 70012) self.write(self.framer.haveWitness()); } + // Find some more peers. self.write(self.framer.getAddr()); + // Relay our spv filter if we have one. + self.updateWatch(); + + // Announce our currently broadcasted items. self.sendInv(self.pool.inv.items); - if (self.pool.options.filterRate != null) { - self.write(self.framer.feeFilter({ - rate: self.pool.options.filterRate - })); - } + // Set a fee rate filter. + if (self.pool.feeRate !== -1) + self.setFeeRate(self.pool.feeRate); - if (self.chain.isFull()) + // If we're fully synced, see + // what we missed out on. + if (self.pool.synced) self.getMempool(); + + // Finally we can let the pool know + // that this peer is ready to go. + self.ack = true; + self.ts = utils.now(); + self.emit('ack'); }); - // Send hello + // Say hello. this.write(this.framer.version({ height: this.chain.height, relay: this.options.relay, @@ -277,11 +293,13 @@ Peer.prototype.createSocket = function createSocket(port, host) { /** * Broadcast items to peer (transactions or blocks). - * @param {BroadcastEntry[]} items + * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items */ Peer.prototype.sendInv = function sendInv(items) { var self = this; + var inv = []; + var i, item, chunk; if (this.destroyed) return; @@ -295,33 +313,25 @@ Peer.prototype.sendInv = function sendInv(items) { if (!Array.isArray(items)) items = [items]; - if (items.length === 0) - return; + for (i = 0; i < items.length; i++) { + item = items[i]; - if (items.length > 50000) - items = items.slice(0, 50000); + if (!this.isWatched(item)) + continue; - if (this.filter) - items = items.filter(this.isWatched, this); + if (item.toInv) + item = item.toInv(); - items = items.map(function(msg) { - if (!(msg instanceof bcoin.tx) - && !(msg instanceof bcoin.block)) { - return msg; - } - return { - type: (msg instanceof bcoin.tx) - ? constants.inv.TX - : constants.inv.BLOCK, - hash: msg.hash() - }; - }); + if (!this.invFilter.added(item.hash, 'hex')) + continue; - items = items.filter(function(msg) { - return self.invFilter.added(msg.hash, 'hex'); - }); + inv.push(item); + } - this.write(this.framer.inv(items)); + for (i = 0; i < inv.length; i += 50000) { + chunk = inv.slice(i, i + 50000); + this.write(this.framer.inv(chunk)); + } }; /** @@ -357,19 +367,19 @@ Peer.prototype.sendPing = function sendPing() { */ Peer.prototype.isWatched = function isWatched(item) { - if (!this.filter) + if (!this.spvFilter) return true; if (!item) return true; if (item instanceof bcoin.tx) - return item.isWatched(this.filter); + return item.isWatched(this.spvFilter); - if (!(item.msg instanceof bcoin.tx)) - return true; + if (item.msg instanceof bcoin.tx) + return item.msg.isWatched(this.spvFilter); - return item.msg.isWatched(this.filter); + return true; }; /** @@ -381,7 +391,18 @@ Peer.prototype.updateWatch = function updateWatch() { return; if (this.ack) - this.write(this.framer.filterLoad(this.bloom)); + this.write(this.framer.filterLoad(this.pool.spvFilter)); +}; + +/** + * Set a fee rate filter for the peer. + * @param {Rate} rate + */ + +Peer.prototype.setFeeRate = function setFeeRate(rate) { + this.write(this.framer.feeFilter({ + rate: rate + })); }; /** @@ -596,11 +617,11 @@ Peer.prototype._onPacket = function onPacket(packet) { break; case 'sendheaders': this.sendHeaders = true; - this.response(cmd, payload); + this.fire(cmd, payload); break; case 'havewitness': this.haveWitness = true; - this.response(cmd, payload); + this.fire(cmd, payload); break; case 'verack': this.fire(cmd, payload); @@ -616,9 +637,7 @@ Peer.prototype._onPacket = function onPacket(packet) { }; Peer.prototype.fire = function fire(cmd, payload) { - if (this.response(cmd, payload)) - return; - + this.response(cmd, payload); this.emit(cmd, payload); }; @@ -629,15 +648,15 @@ Peer.prototype._flushMerkle = function _flushMerkle() { }; Peer.prototype._handleFilterLoad = function _handleFilterLoad(payload) { - this.filter = new bcoin.bloom( + this.spvFilter = new bcoin.bloom( payload.filter, payload.n, payload.tweak, payload.update ); - if (!this.filter.isWithinConstraints()) { - delete this.filter; + if (!this.spvFilter.isWithinConstraints()) { + this.spvFilter = null; this.setMisbehavior(100); return; } @@ -651,15 +670,15 @@ Peer.prototype._handleFilterAdd = function _handleFilterAdd(payload) { return; } - if (this.filter) - this.filter.add(payload.data); + if (this.spvFilter) + this.spvFilter.add(payload.data); this.relay = true; }; Peer.prototype._handleFilterClear = function _handleFilterClear(payload) { - if (this.filter) - this.filter.reset(); + if (this.spvFilter) + this.spvFilter.reset(); this.relay = true; }; @@ -679,7 +698,7 @@ Peer.prototype._handleFeeFilter = function _handleFeeFilter(payload) { return; } - this.filterRate = payload.rate; + this.feeRate = payload.rate; this.fire('feefilter', payload); }; @@ -1012,7 +1031,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) { // ACK this.write(this.framer.verack()); this.version = payload; - this.emit('version', payload); + this.fire('version', payload); }; Peer.prototype._handleMempool = function _handleMempool() { @@ -1103,8 +1122,8 @@ Peer.prototype._handleGetData = function handleGetData(items) { // We should technically calculate this in // the `mempool` handler, but it would be // too slow. - if (self.filterRate !== -1) { - if (bcoin.tx.getRate(entry.size, entry.fees) < self.filterRate) + if (self.feeRate !== -1) { + if (bcoin.tx.getRate(entry.size, entry.fees) < self.feeRate) return next(); } @@ -1174,7 +1193,7 @@ Peer.prototype._handleGetData = function handleGetData(items) { return next(); } - block = block.toMerkle(self.filter); + block = block.toMerkle(self.spvFilter); self.write(self.framer.merkleBlock(block)); @@ -1255,7 +1274,7 @@ Peer.prototype._handleAddr = function handleAddr(addrs) { Peer.prototype._handlePing = function handlePing(data) { this.write(this.framer.pong(data)); - this.emit('ping', this.minPing); + this.fire('ping', this.minPing); }; Peer.prototype._handlePong = function handlePong(data) { @@ -1287,7 +1306,7 @@ Peer.prototype._handlePong = function handlePong(data) { this.challenge = null; - this.emit('pong', this.minPing); + this.fire('pong', this.minPing); }; Peer.prototype._handleGetAddr = function handleGetAddr() { @@ -1341,7 +1360,7 @@ Peer.prototype._handleInv = function handleInv(items) { var txs = []; var i, item, unknown; - this.emit('inv', items); + this.fire('inv', items); for (i = 0; i < items.length; i++) { item = items[i]; @@ -1371,13 +1390,13 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) { return new bcoin.headers(header); }); - this.emit('headers', headers); + this.fire('headers', headers); }; Peer.prototype._handleReject = function handleReject(payload) { var hash, entry; - this.emit('reject', payload); + this.fire('reject', payload); if (!payload.data) return; @@ -1403,7 +1422,7 @@ Peer.prototype._handleAlert = function handleAlert(details) { return; } - this.emit('alert', details); + this.fire('alert', details); }; /** diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index f3723e1a..bd64b5b0 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -25,6 +25,7 @@ var VerifyError = bcoin.errors.VerifyError; * for relayed transactions. * @param {Boolean?} options.headers - Whether * to use `getheaders` for sync. + * @param {Number?} [options.feeRate] - Fee filter rate. * @param {Number?} [options.loadTimeout=120000] - Sync timeout before * finding a new loader peer. * @param {Number?} [options.loadInterval=20000] - Timeout before attempting to @@ -92,19 +93,11 @@ function Pool(options) { this.network = this.chain.network; - if (options.relay == null) { - if (options.spv) - options.relay = false; - else - options.relay = true; - } + if (options.relay == null) + options.relay = !options.spv; - if (options.headers == null) { - if (options.spv) - options.headers = true; - else - options.headers = false; - } + if (options.headers == null) + options.headers = options.spv; seeds = (options.seeds || this.network.seeds).slice(); @@ -128,6 +121,7 @@ function Pool(options) { this.syncing = false; this.synced = false; this._scheduled = false; + this._pendingWatch = false; this.load = { timeout: options.loadTimeout || 120000, @@ -138,7 +132,11 @@ function Pool(options) { this.watchMap = {}; - this.bloom = bcoin.bloom.fromRate(10000, 0.01, constants.bloom.NONE); + this.feeRate = options.feeRate != null ? options.feeRate : -1; + + this.spvFilter = options.spv + ? bcoin.bloom.fromRate(10000, 0.01, constants.bloom.NONE) + : null; this.peers = { // Peers that are loading blocks themselves @@ -236,7 +234,7 @@ Pool.prototype.connect = function connect() { if (this.connected) return; - if (this.options.broadcast) { + if (!this.options.selfish && !this.options.spv) { if (this.mempool) { this.mempool.on('tx', function(tx) { self.announce(tx); @@ -249,13 +247,11 @@ Pool.prototype.connect = function connect() { // miner sends us an invalid competing // chain that we can't connect and // verify yet. - if (!this.options.spv) { - this.chain.on('block', function(block) { - if (!self.synced) - return; - self.announce(block); - }); - } + this.chain.on('block', function(block) { + if (!self.synced) + return; + self.announce(block); + }); } if (this.originalSeeds.length > 0) { @@ -550,7 +546,6 @@ Pool.prototype._addLoader = function _addLoader() { }); peer.once('ack', function() { - peer.updateWatch(); if (!self.syncing) return; self._load(); @@ -1119,13 +1114,6 @@ Pool.prototype._addLeech = function _addLeech(socket) { self._removePeer(peer); }); - peer.once('ack', function() { - if (self.destroyed) - return; - - peer.updateWatch(); - }); - peer.on('merkleblock', function(block) { if (!self.options.spv) return; @@ -1198,8 +1186,6 @@ Pool.prototype._addPeer = function _addPeer() { if (utils.binaryRemove(self.peers.pending, peer, compare)) utils.binaryInsert(self.peers.regular, peer, compare); - - peer.updateWatch(); }); peer.on('merkleblock', function(block) { @@ -1255,58 +1241,57 @@ Pool.prototype._removePeer = function _removePeer(peer) { /** * Watch a piece of data (filterload, SPV-only). - * @param {Buffer} id + * @param {Buffer|Hash} data */ -Pool.prototype.watch = function watch(id) { +Pool.prototype.watch = function watch(data) { var self = this; - var hid, i; - if (id instanceof bcoin.wallet) { - this.watchWallet(id); + if (Buffer.isBuffer(data)) + data = data.toString('hex'); + + if (data instanceof bcoin.wallet) { + this.watchWallet(data); return; } - if (id) { - hid = id.toString('hex'); - - if (this.watchMap[hid]) { - this.watchMap[hid]++; - return; - } - - this.watchMap[hid] = 1; - - this.bloom.add(id); + if (this.watchMap[data]) { + this.watchMap[data]++; + return; } - // Send it to peers + this.watchMap[data] = 1; + + this.spvFilter.add(data, 'hex'); + this.updateWatch(); }; /** * Unwatch a piece of data (filterload, SPV-only). - * @param {Buffer} id + * @param {Buffer|Hash} data */ -Pool.prototype.unwatch = function unwatch(id) { +Pool.prototype.unwatch = function unwatch(data) { var self = this; - var i, hid; - hid = id.toString('hex'); + if (Buffer.isBuffer(data)) + data = data.toString('hex'); - if (!this.watchMap[hid] || --this.watchMap[hid] !== 0) + if (!this.watchMap[data]) return; - delete this.watchMap[hid]; + if (--this.watchMap[data] !== 0) + return; - // Reset bloom filter - this.bloom.reset(); - Object.keys(this.watchMap).forEach(function(id) { - this.bloom.add(id); + delete this.watchMap[data]; + + this.spvFilter.reset(); + + Object.keys(this.watchMap).forEach(function(data) { + this.spvFilter.add(data, 'hex'); }, this); - // Resend it to peers this.updateWatch(); }; @@ -1316,6 +1301,7 @@ Pool.prototype.unwatch = function unwatch(id) { Pool.prototype.updateWatch = function updateWatch() { var self = this; + var i; if (this._pendingWatch) return; @@ -1353,7 +1339,7 @@ Pool.prototype.addWallet = function addWallet(wallet, callback) { return callback(err); for (i = 0; i < txs.length; i++) - self.sendTX(txs[i]); + self.broadcast(txs[i]); if (!self.options.spv) return callback(); @@ -1401,7 +1387,7 @@ Pool.prototype.unwatchAddress = function unwatchAddress(address) { Pool.prototype.watchWallet = function watchWallet(wallet) { Object.keys(wallet.addressMap).forEach(function(address) { - this.watch(new Buffer(address, 'hex')); + this.watch(address); }, this); }; @@ -1412,7 +1398,7 @@ Pool.prototype.watchWallet = function watchWallet(wallet) { Pool.prototype.unwatchWallet = function unwatchWallet(wallet) { Object.keys(wallet.addressMap).forEach(function(address) { - this.unwatch(new Buffer(address, 'hex')); + this.unwatch(address); }, this); }; @@ -1729,6 +1715,23 @@ Pool.prototype.announce = function announce(msg) { this.peers.regular[i].sendInv(msg); }; +/** + * Set a fee rate filter for all peers. + * @param {Rate} rate + */ + +Pool.prototype.setFeeRate = function setFeeRate(rate) { + var i; + + this.feeRate = rate; + + if (this.peers.load) + this.peers.load.setFeeRate(rate); + + for (i = 0; i < this.peers.regular.length; i++) + this.peers.regular[i].setFeeRate(rate); +}; + /** * Close and destroy the pool. * @method @@ -2327,6 +2330,15 @@ BroadcastItem.prototype.reject = function reject(peer) { this.callback.length = 0; }; +/** + * Convert to an InvItem. + * @returns {InvItem} + */ + +BroadcastItem.prototype.toInv = function toInv() { + return this; +}; + /** * Inspect the broadcast item. */ diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index cac9a33d..09ce95ec 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -1700,6 +1700,18 @@ TX.prototype.__defineGetter__('wtxid', function() { return this.rwhash; }); +/** + * Convert the tx to an inv item. + * @returns {InvItem} + */ + +TX.prototype.toInv = function toInv() { + return { + type: constants.inv.TX, + hash: this.hash() + }; +}; + /** * Calculate minimum fee based on rate and size. * @param {Number?} size @@ -1954,7 +1966,7 @@ TX.prototype.toCoins = function toCoins() { }; /** - * Test an object to see if it is a TX. + * Test whether an object is a TX. * @param {Object} obj * @returns {Boolean} */ @@ -1962,8 +1974,8 @@ TX.prototype.toCoins = function toCoins() { TX.isTX = function isTX(obj) { return obj && Array.isArray(obj.inputs) - && typeof obj.ps === 'number' - && typeof obj.getVirtualSize === 'function'; + && typeof obj.locktime === 'number' + && typeof obj.witnessHash === 'function'; }; /* diff --git a/lib/bcoin/types.js b/lib/bcoin/types.js index 04ff8aa7..130f7b35 100644 --- a/lib/bcoin/types.js +++ b/lib/bcoin/types.js @@ -11,6 +11,12 @@ * @global */ +/** + * One of {@link module:constants.inv}. + * @typedef {Number|String} InvType + * @global + */ + /** * @typedef {Object} Outpoint * @property {Hash} hash