From 07517ecd041a431df2cf7f928b36ad1b9e409ab0 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 18 May 2016 11:42:00 -0700 Subject: [PATCH] rewrite broadcast system. --- lib/bcoin/block.js | 13 +- lib/bcoin/bst.js | 6 + lib/bcoin/chaindb.js | 2 +- lib/bcoin/fullnode.js | 6 +- lib/bcoin/peer.js | 227 ++++++++++----------------- lib/bcoin/pool.js | 347 +++++++++++++++++++++++++++++------------- lib/bcoin/timedata.js | 3 +- lib/bcoin/types.js | 8 - lib/bcoin/utils.js | 37 +++++ 9 files changed, 371 insertions(+), 278 deletions(-) diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index e62473b1..2d0cffbe 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -514,10 +514,10 @@ Block.reward = function reward(height, network) { network = bcoin.network.get(network); halvings = height / network.halvingInterval | 0; - // BIP 42 (not technically necessary since we - // don't suffer from the undefined behavior of C++). + // BIP 42 (well, our own version of it, + // since we can only handle 32 bit shifts). // https://github.com/bitcoin/bips/blob/master/bip-0042.mediawiki - if (halvings >= 64) + if (halvings >= 33) return 0; // We need to shift right by `halvings`, @@ -527,13 +527,6 @@ Block.reward = function reward(height, network) { if (halvings === 0) return 5000000000; - // We can't shift right by 32 bits. - // 25m bitcoin is 32 bits, so we - // can safely return zero if the - // shift will be 32. - if (halvings >= 33) - return 0; - return 2500000000 >>> (halvings - 1); }; diff --git a/lib/bcoin/bst.js b/lib/bcoin/bst.js index da6a8a31..5cc947cd 100644 --- a/lib/bcoin/bst.js +++ b/lib/bcoin/bst.js @@ -673,6 +673,7 @@ Iterator.prototype.next = function(callback) { Iterator.prototype.seek = function seek(key) { var self = this; + var item; assert(!this.ended, 'Already ended.'); @@ -682,6 +683,11 @@ Iterator.prototype.seek = function seek(key) { this.index = utils.binarySearch(this.items, key, true, function(a, b) { return self.tree.compare(a.key, b); }); + + item = this.items[this.index]; + + if (!item || this.tree.compare(item.key, key) !== 0) + this.index += 1; }; Iterator.prototype._end = function end(callback) { diff --git a/lib/bcoin/chaindb.js b/lib/bcoin/chaindb.js index 88534398..5750f9bf 100644 --- a/lib/bcoin/chaindb.js +++ b/lib/bcoin/chaindb.js @@ -285,7 +285,7 @@ ChainDB.prototype.getHeight = function getHeight(hash, callback) { this.db.fetch('h/' + hash, function(data) { assert(data.length === 4, 'Database corruption.'); - return utils.readU32(data, 0); + return data.readUInt32LE(0, true); }, function(err, height) { if (err) return callback(err); diff --git a/lib/bcoin/fullnode.js b/lib/bcoin/fullnode.js index 342dbeb4..b203ffd7 100644 --- a/lib/bcoin/fullnode.js +++ b/lib/bcoin/fullnode.js @@ -179,7 +179,7 @@ Fullnode.prototype._init = function _init() { }); this.miner.on('block', function(block) { - self.pool.sendBlock(block); + self.pool.announce(block); }); function load(err) { @@ -262,11 +262,11 @@ Fullnode.prototype.sendTX = function sendTX(item, wait, callback) { return callback(err); if (!wait) { - self.pool.broadcast(item); + self.pool.announce(item); return callback(); } - self.pool.broadcast(item, callback); + self.pool.announce(item, callback); }); }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 683470af..5ee5b27e 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -113,19 +113,13 @@ function Peer(pool, options) { if (!this.socket) throw new Error('No socket'); - this._broadcast = { - timeout: this.options.broadcastTimeout || 30000, - interval: this.options.broadcastInterval || 3000, + this.requests = { + timeout: this.options.requestTimeout || 10000, + skip: {}, map: {} }; - this._request = { - timeout: this.options.requestTimeout || 10000, - skip: {}, - queue: {} - }; - - this._ping = { + this.pings = { timer: null, interval: this.options.pingInterval || 30000 }; @@ -135,9 +129,8 @@ function Peer(pool, options) { tx: [] }; - Peer.uid.iaddn(1); - - this.id = Peer.uid.toString(10); + this.uid = 0; + this.id = Peer.uid++; this.setMaxListeners(10000); @@ -146,7 +139,7 @@ function Peer(pool, options) { utils.inherits(Peer, EventEmitter); -Peer.uid = new bn(0); +Peer.uid = 0; Peer.prototype._init = function init() { var self = this; @@ -193,13 +186,13 @@ Peer.prototype._init = function init() { this.challenge = utils.nonce(); - this._ping.timer = setInterval(function() { + this.pings.timer = setInterval(function() { self.write(self.framer.ping({ nonce: self.challenge })); - }, this._ping.interval); + }, this.pings.interval); - this._req('verack', function(err, payload) { + this.request('verack', function(err, payload) { if (err) { self._error(err); self.destroy(); @@ -275,97 +268,45 @@ Peer.prototype.createSocket = function createSocket(port, host) { /** * Broadcast items to peer (transactions or blocks). - * @param {TX|Block|TX[]|Block[]} items - * @returns {BroadcastPromise[]} + * @param {BroadcastEntry[]} items */ -Peer.prototype.broadcast = function broadcast(items) { - var self = this; - var result = []; - var payload = []; - - if (!this.relay) +Peer.prototype.sendInv = function sendInv(items) { + if (this.destroyed) return; - if (this.destroyed) + if (!this.relay) return; if (!Array.isArray(items)) items = [items]; - items.forEach(function(item) { - var key, old, type, entry, packetType; + if (this.filter) + items = items.map(this.isWatched, this); - if (item.mutable) - item = item.toTX(); + this.write(this.framer.inv(items)); +}; - key = item.hash('hex'); - old = this._broadcast.map[key]; - type = item.type; +/** + * Test whether an is being watched by the peer. + * @param {BroadcastItem|TX} item + * @returns {Boolean} + */ - if (typeof type === 'string') - type = constants.inv[type.toUpperCase()]; +Peer.prototype.isWatched = function isWatched(item) { + if (!this.filter) + return true; - // INV does not set the witness - // mask (only GETDATA does this). - type &= ~constants.WITNESS_MASK; + if (!item) + return true; - if (type === constants.inv.BLOCK) - packetType = 'block'; - else if (type === constants.inv.TX) - packetType = 'tx'; - else - assert(false, 'Bad type.'); + if (item instanceof bcoin.tx) + return item.isWatched(this.filter); - if (self.filter && type === constants.inv.TX) { - if (!item.isWatched(self.filter)) - return; - } + if (!(item.msg instanceof bcoin.tx)) + return true; - if (old) { - clearTimeout(old.timer); - clearInterval(old.interval); - } - - // Auto-cleanup broadcast map after timeout - entry = { - e: new EventEmitter(), - timeout: setTimeout(function() { - entry.e.emit('timeout'); - clearInterval(entry.interval); - delete self._broadcast.map[key]; - }, this._broadcast.timeout), - - // Retransmit - interval: setInterval(function() { - self.write(entry.inv); - }, this._broadcast.interval), - - inv: this.framer.inv([{ - type: type, - hash: item.hash() - }]), - - packetType: packetType, - type: type, - hash: item.hash(), - value: item.renderNormal(), - witnessValue: item.renderWitness() - }; - - this._broadcast.map[key] = entry; - - result.push(entry.e); - - payload.push({ - type: entry.type, - hash: entry.hash - }); - }, this); - - this.write(this.framer.inv(payload)); - - return result; + return item.msg.isWatched(this.filter); }; /** @@ -399,17 +340,11 @@ Peer.prototype.destroy = function destroy() { this.socket = null; this.emit('close'); - // Clean-up timeouts - Object.keys(this._broadcast.map).forEach(function(key) { - clearTimeout(this._broadcast.map[key].timer); - clearInterval(this._broadcast.map[key].interval); - }, this); + clearInterval(this.pings.timer); + this.pings.timer = null; - clearInterval(this._ping.timer); - this._ping.timer = null; - - Object.keys(this._request).forEach(function(cmd) { - var queue = this._request[cmd]; + Object.keys(this.requests.map).forEach(function(cmd) { + var queue = this.requests.map[cmd]; var i; for (i = 0; i < queue.length; i++) @@ -457,52 +392,51 @@ Peer.prototype._error = function error(err) { * Executed on timeout or once packet is received. */ -Peer.prototype._req = function _req(cmd, cb) { +Peer.prototype.request = function request(cmd, callback) { var self = this; var entry; if (this.destroyed) - return utils.asyncify(cb)(new Error('Destroyed, sorry')); + return utils.asyncify(callback)(new Error('Destroyed, sorry')); entry = { cmd: cmd, - cb: cb, + callback: callback, + id: this.uid++, ontimeout: function() { - var queue = self._request.queue[cmd]; - var i; + var queue = self.requests.map[cmd]; if (!queue) return; - i = queue.indexOf(entry); - - if (i !== -1) { - queue.splice(i, 1); - cb(new Error('Timed out: ' + cmd), null); + if (utils.binaryRemove(queue, entry, compare)) { + if (queue.length === 0) + delete self.requests.map[cmd]; + callback(new Error('Timed out: ' + cmd)); } }, timer: null }; - entry.timer = setTimeout(entry.ontimeout, this._request.timeout); + entry.timer = setTimeout(entry.ontimeout, this.requests.timeout); - if (!this._request.queue[cmd]) - this._request.queue[cmd] = []; + if (!this.requests.map[cmd]) + this.requests.map[cmd] = []; - this._request.queue[cmd].push(entry); + this.requests.map[cmd].push(entry); return entry; }; /** - * Fulfill awaiting requests created with {@link Peer#_req}. + * Fulfill awaiting requests created with {@link Peer#request}. * @private * @param {String} cmd - Packet name. * @param {Object} payload */ -Peer.prototype._res = function _res(cmd, payload) { - var queue = this._request.queue[cmd]; +Peer.prototype.response = function response(cmd, payload) { + var queue = this.requests.map[cmd]; var entry, res; if (!queue) @@ -513,12 +447,12 @@ Peer.prototype._res = function _res(cmd, payload) { if (!entry) return false; - res = entry.cb(null, payload, cmd); + res = entry.callback(null, payload, cmd); - if (res !== this._request.skip) { + if (res !== this.requests.skip) { queue.shift(); if (queue.length === 0) - delete this._request.queue[cmd]; + delete this.requests.map[cmd]; clearTimeout(entry.timer); entry.timer = null; return true; @@ -601,11 +535,11 @@ Peer.prototype._onPacket = function onPacket(packet) { break; case 'sendheaders': this.sendHeaders = true; - this._res(cmd, payload); + this.response(cmd, payload); break; case 'havewitness': this.haveWitness = true; - this._res(cmd, payload); + this.response(cmd, payload); break; case 'verack': this._emit(cmd, payload); @@ -621,7 +555,7 @@ Peer.prototype._onPacket = function onPacket(packet) { }; Peer.prototype._emit = function _emit(cmd, payload) { - if (this._res(cmd, payload)) + if (this.response(cmd, payload)) return; this.emit(cmd, payload); @@ -706,7 +640,7 @@ Peer.prototype._getUTXOs = function getUTXOs(utxos, callback) { var index = 0; var i, prevout, coin; - this._req('utxos', function(err, payload) { + this.request('utxos', function(err, payload) { if (err) return callback(err); @@ -904,7 +838,7 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) { function done(err) { if (err) return self.emit('error', err); - self.write(self.framer.inv(blocks)); + self.sendInv(blocks); } this.chain.findLocator(payload.locator, function(err, tip) { @@ -974,7 +908,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) { if (this.options.witness) { if (!(services & constants.services.WITNESS)) { - this._req('havewitness', function(err) { + this.request('havewitness', function(err) { if (err) { self._error('Peer does not support segregated witness.'); self.setMisbehavior(100); @@ -1015,7 +949,7 @@ Peer.prototype._handleMempool = function _handleMempool() { bcoin.debug('Sending mempool snapshot to %s.', self.host); - self.write(self.framer.inv(items)); + self.sendInv(items); }); }; @@ -1032,8 +966,8 @@ Peer.prototype._handleGetData = function handleGetData(items) { item = items[i]; hash = item.hash; - entry = this._broadcast.map[hash]; - isWitness = item.type & constants.WITNESS_MASK; + entry = this.pool.inv.map[hash]; + isWitness = (item.type & constants.WITNESS_MASK) !== 0; value = null; if (!entry) { @@ -1052,15 +986,10 @@ Peer.prototype._handleGetData = function handleGetData(items) { 'Peer %s requested %s:%s as a %s packet.', this.host, entry.packetType, - utils.revHex(entry.hash), + utils.revHex(entry.key), isWitness ? 'witness' : 'normal'); - if (isWitness) - this.write(this.framer.packet(entry.packetType, entry.witnessValue)); - else - this.write(this.framer.packet(entry.packetType, entry.value)); - - entry.e.emit('request'); + entry.sendTo(peer, isWitness); } if (this.pool.options.selfish) @@ -1068,7 +997,7 @@ Peer.prototype._handleGetData = function handleGetData(items) { utils.forEachSerial(check, function(item, next) { var isWitness = item.type & constants.WITNESS_MASK; - var type = item.type & ~constants.WITNESS_MASK; + var type = (item.type & ~constants.WITNESS_MASK) !== 0; var hash = item.hash; var i, tx, data; @@ -1123,10 +1052,10 @@ Peer.prototype._handleGetData = function handleGetData(items) { self.write(self.framer.packet('block', data)); if (hash === self.hashContinue) { - self.write(self.framer.inv([{ + self.sendInv({ type: constants.inv.BLOCK, hash: self.chain.tip.hash - }])); + }); self.hashContinue = null; } @@ -1168,10 +1097,10 @@ Peer.prototype._handleGetData = function handleGetData(items) { } if (hash === self.hashContinue) { - self.write(self.framer.inv([{ + self.sendInv({ type: constants.inv.BLOCK, hash: self.chain.tip.hash - }])); + }); self.hashContinue = null; } @@ -1324,12 +1253,12 @@ Peer.prototype._handleReject = function handleReject(payload) { return; hash = payload.data; - entry = this._broadcast.map[hash]; + entry = this.pool.inv.map[hash]; if (!entry) return; - entry.e.emit('reject', payload); + entry.reject(this); }; Peer.prototype._handleAlert = function handleAlert(details) { @@ -1442,6 +1371,14 @@ Peer.prototype.sendReject = function sendReject(obj, code, reason, score) { return this.pool.reject(this, obj, code, reason, score); }; +/* + * Helpers + */ + +function compare(a, b) { + return a.id - b.id; +} + /* * Expose */ diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 48b82c04..78ec1bd1 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -113,6 +113,7 @@ function Pool(options) { this.loaded = false; this.size = options.size || 8; this.connected = false; + this.uid = 0; this.syncing = false; this.synced = false; @@ -145,7 +146,9 @@ function Pool(options) { // All peers all: [], // Misbehaving hosts - misbehaving: {} + misbehaving: {}, + // Map of hosts + map: {} }; this.block = { @@ -184,7 +187,9 @@ function Pool(options) { // Currently broadcasted objects this.inv = { list: [], - timeout: options.invTimeout || 60000 + map: {}, + timeout: options.invTimeout || 60000, + interval: options.invInterval || 3000 }; function done(err) { @@ -232,7 +237,7 @@ Pool.prototype.connect = function connect() { if (this.options.broadcast) { if (this.mempool) { this.mempool.on('tx', function(tx) { - self.broadcast(tx); + self.announce(tx); }); } @@ -242,11 +247,13 @@ Pool.prototype.connect = function connect() { // miner sends us an invalid competing // chain that we can't connect and // verify yet. - this.chain.on('block', function(block) { - if (!self.synced) - return; - self.broadcast(block); - }); + if (!this.options.spv) { + this.chain.on('block', function(block) { + if (!self.synced) + return; + self.announce(block); + }); + } } if (this.originalSeeds.length > 0) { @@ -524,6 +531,7 @@ Pool.prototype._addLoader = function _addLoader() { this.peers.load = peer; this.peers.all.push(peer); + this.peers.map[peer.host] = peer; peer.once('close', function() { self._stopInterval(); @@ -1010,6 +1018,10 @@ Pool.prototype._createPeer = function _createPeer(options) { self.emit('version', version, peer); }); + peer.on('ack', function() { + peer.sendInv(self.inv.list); + }); + return peer; }; @@ -1070,6 +1082,7 @@ Pool.prototype._addLeech = function _addLeech(socket) { this.peers.leeches.push(peer); this.peers.all.push(peer); + this.peers.map[peer.host] = peer; peer.once('close', function() { self._removePeer(peer); @@ -1080,20 +1093,6 @@ Pool.prototype._addLeech = function _addLeech(socket) { return; peer.updateWatch(); - - self.inv.list.forEach(function(entry) { - var result = peer.broadcast(entry.msg); - if (!result) - return; - - result[0].once('request', function() { - entry.e.emit('ack', peer); - }); - - result[0].once('reject', function(payload) { - entry.e.emit('reject', payload, peer); - }); - }); }); peer.on('merkleblock', function(block) { @@ -1151,6 +1150,7 @@ Pool.prototype._addPeer = function _addPeer() { this.peers.pending.push(peer); this.peers.all.push(peer); + this.peers.map[peer.host] = peer; peer.once('close', function() { self._removePeer(peer); @@ -1165,27 +1165,10 @@ Pool.prototype._addPeer = function _addPeer() { if (self.destroyed) return; - i = self.peers.pending.indexOf(peer); - if (i !== -1) { - self.peers.pending.splice(i, 1); - self.peers.regular.push(peer); - } + if (utils.binaryRemove(self.peers.pending, peer, compare)) + utils.binaryInsert(self.peers.regular, peer, compare); peer.updateWatch(); - - self.inv.list.forEach(function(entry) { - var result = peer.broadcast(entry.msg); - if (!result) - return; - - result[0].once('request', function() { - entry.e.emit('ack', peer); - }); - - result[0].once('reject', function(payload) { - entry.e.emit('reject', payload, peer); - }); - }); }); peer.on('merkleblock', function(block) { @@ -1245,21 +1228,11 @@ Pool.prototype.bestPeer = function bestPeer() { }; Pool.prototype._removePeer = function _removePeer(peer) { - var i = this.peers.pending.indexOf(peer); - if (i !== -1) - this.peers.pending.splice(i, 1); - - i = this.peers.regular.indexOf(peer); - if (i !== -1) - this.peers.regular.splice(i, 1); - - i = this.peers.leeches.indexOf(peer); - if (i !== -1) - this.peers.leeches.splice(i, 1); - - i = this.peers.all.indexOf(peer); - if (i !== -1) - this.peers.all.splice(i, 1); + utils.binaryRemove(this.peers.pending, peer); + utils.binaryRemove(this.peers.regular, peer); + utils.binaryRemove(this.peers.leeches, peer); + utils.binaryRemove(this.peers.all, peer); + delete this.peers.map[peer.host]; if (this.peers.load === peer) { Object.keys(this.request.map).forEach(function(hash) { @@ -1867,51 +1840,44 @@ Pool.prototype.sendTX = function sendTX(tx, callback) { * @param {TX|Block} msg * @param {Function} callback - Returns [Error]. Executes on request, reject, * or timeout. + * @returns {BroadcastItem} */ Pool.prototype.broadcast = function broadcast(msg, callback) { - var self = this; - var e = new EventEmitter(); - var entry; + var hash = msg.hash('hex'); + var item = this.inv.map[hash]; - callback = utils.once(callback); + if (item) { + item.refresh(msg); + item.addCallback(callback); + return item; + } - if (msg.mutable) - msg = msg.toTX(); + item = new BroadcastItem(this, msg, callback); - entry = { - msg: msg, - e: e, - timer: setTimeout(function() { - var i = self.inv.list.indexOf(entry); - if (i !== -1) - self.inv.list.splice(i, 1); - callback(new Error('Timed out.')); - }, this.inv.timeout) + // return item.start(); + return item; +}; + +/** + * Announce an item by sending an inv to all + * peers. This does not add it to the broadcast + * queue. + * @param {TX|Block} tx + */ + +Pool.prototype.announce = function announce(msg) { + var i, msg; + + msg = { + type: (msg instanceof bcoin.tx) + ? constants.inv.TX + : constants.inv.BLOCK, + hash: msg.hash() }; - this.inv.list.push(entry); - - this.peers.all.forEach(function(peer) { - var result = peer.broadcast(msg); - if (!result) - return; - - result[0].once('request', function() { - e.emit('ack', peer); - // Give them a chance to send a reject. - setTimeout(function() { - callback(); - }, 100); - }); - - result[0].once('reject', function(payload) { - e.emit('reject', payload, peer); - callback(new Error('TX was rejected: ' + payload.reason)); - }); - }); - - return e; + for (i = 0; i < this.peers.all.length; i++) + this.peers.all[i].sendInv(msg); }; /** @@ -1965,18 +1931,12 @@ Pool.prototype.destroy = function destroy(callback) { */ Pool.prototype.getPeer = function getPeer(addr) { - var i, peer; - if (!addr) return; addr = utils.parseHost(addr); - for (i = 0; i < this.peers.all.length; i++) { - peer = this.peers.all[i]; - if (peer.host === addr.host) - return peer; - } + return this.peers.map[addr.host]; }; /** @@ -2258,12 +2218,16 @@ Pool.prototype.reject = function reject(peer, obj, code, reason, score) { */ function LoadRequest(pool, peer, type, hash, callback) { + if (!(this instanceof LoadRequest)) + return new LoadRequest(pool, peer, type, hash, callback); + this.pool = pool; this.peer = peer; this.type = type; this.hash = hash; this.callback = []; this.active = false; + this.id = this.pool.uid++; if (callback) this.callback.push(callback); @@ -2312,15 +2276,10 @@ LoadRequest.prototype.finish = function finish() { } } - if (this.type === this.pool.tx.type) { - index = this.peer.queue.tx.indexOf(this); - if (index !== -1) - this.peer.queue.tx.splice(index, 1); - } else { - index = this.peer.queue.block.indexOf(this); - if (index !== -1) - this.peer.queue.block.splice(index, 1); - } + if (this.type === this.pool.tx.type) + utils.binaryRemove(this.peer.queue.tx, this, compare); + else + utils.binaryRemove(this.peer.queue.block, this, compare); this.peer.removeListener('close', this._finish); @@ -2330,6 +2289,176 @@ LoadRequest.prototype.finish = function finish() { } }; +/** + * Represents an item that is broadcasted via an inv/getdata cycle. + * @exports BroadcastItem + * @constructor + * @private + * @param {Pool} pool + * @param {TX|Block} item + * @param {Function?} callback + * @emits BroadcastItem#ack + * @emits BroadcastItem#reject + * @emits BroadcastItem#timeout + */ + +function BroadcastItem(pool, item, callback) { + if (!(this instanceof BroadcastItem)) + return new BroadcastItem(pool, item); + + if (item instanceof bcoin.tx) { + if (item.mutable) + item = item.toTX(); + } + + this.pool = pool; + this.callback = []; + + this.id = this.pool.uid++; + this.key = item.hash('hex'); + this.type = (item instanceof bcoin.tx) + ? constants.inv.TX + : constants.inv.BLOCK; + // this.msg = item; + this.hash = item.hash(); + this.normalValue = item.renderNormal(); + this.witnessValue = item.render(); + + // INV does not set the witness + // mask (only GETDATA does this). + assert((this.type & constants.WITNESS_MASK) === 0); + + this.addCallback(callback); + + this.start(item); +}; + +utils.inherits(BroadcastItem, EventEmitter); + +/** + * Add a callback to be executed on ack, timeout, or reject. + * @param { + */ + +BroadcastItem.prototype.addCallback = function addCallback(callback) { + if (callback) + this.callback.push(callback); +}; + +/** + * Start the broadcast. + * @param {TX|Block} item + */ + +BroadcastItem.prototype.start = function start(item) { + var self = this; + var i; + + assert(!this.timeout, 'Already started.'); + assert(!this.pool.inv.map[this.key], 'Already started.'); + + this.pool.inv.map[this.key] = this; + utils.binaryInsert(this.pool.inv.list, this, compare); + + this.refresh(item); + + return this; +}; + +/** + * Refresh the timeout on the broadcast. + */ + +BroadcastItem.prototype.refresh = function refresh(item) { + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = null; + } + + this.timeout = setTimeout(function() { + self.emit('timeout'); + self.finish(new Error('Timed out.')); + }, this.pool.inv.timeout); + + for (i = 0; i < this.pool.peers.all.length; i++) { + if (this.pool.peers.all[i].isWatched(item)) + this.pool.peers.all[i].sendInv(this); + } +}; + +/** + * Finish the broadcast, potentially with an error. + * @param {Error?} err + */ + +BroadcastItem.prototype.finish = function finish(err) { + var i; + + assert(this.timeout, 'Already finished.'); + assert(this.pool.inv.map[this.key], 'Already finished.'); + + clearInterval(this.timeout); + this.timeout = null; + + delete this.pool.inv.map[this.key]; + utils.binaryRemove(this.pool.inv.list, this, compare); + + for (i = 0; i < this.callback.length; i++) + this.callback[i](err); + + this.callback.length = 0; +}; + +/** + * Send the item to a peer. + * @param {Peer} peer + * @param {Boolean} witness - Whether to use the witness serialization. + */ + +BroadcastItem.prototype.sendTo = function sendTo(peer, witness) { + var self = this; + var value = witness ? this.witnessValue : this.normalValue; + var packetType = this.type === constants.inv.TX ? 'tx' : 'block'; + var i; + + peer.write(peer.framer.packet(packetType, value)); + + setTimeout(function() { + self.emit('ack', peer); + + for (i = 0; i < this.callback.length; i++) + self.callback[i](); + + self.callback.length = 0; + }, 1000); +}; + +/** + * Handle a reject from a peer. + * @param {Peer} peer + */ + +BroadcastItem.prototype.reject = function reject(peer) { + var i, err; + + this.emit('reject', peer); + + err = new Error('Rejected by ' + peer.host); + + for (i = 0; i < this.callback.length; i++) + this.callback[i](err); + + this.callback.length = 0; +}; + +/* + * Helpers + */ + +function compare(a, b) { + return a.id - b.id; +} + /* * Expose */ diff --git a/lib/bcoin/timedata.js b/lib/bcoin/timedata.js index fae2ce13..bbdd78c7 100644 --- a/lib/bcoin/timedata.js +++ b/lib/bcoin/timedata.js @@ -60,8 +60,7 @@ TimeData.prototype.add = function add(host, time) { this.known[host] = sample; - i = utils.binarySearch(this.samples, sample, true, compare); - this.samples.splice(i + 1, 0, sample); + utils.binaryInsert(this.samples, sample, compare); bcoin.debug('Added time data: samples=%d, offset=%d (%d minutes)', this.samples.length, sample, sample / 60 | 0); diff --git a/lib/bcoin/types.js b/lib/bcoin/types.js index 17382deb..7584ec2d 100644 --- a/lib/bcoin/types.js +++ b/lib/bcoin/types.js @@ -47,14 +47,6 @@ * @global */ -/** - * @typedef {EventEmitter} BroadcastPromise - * @emits BroadcastPromise#ack - * @emits BroadcastPromise#timeout - * @emits BroadcastPromise#reject - * @global - */ - /** * @typedef {Object} ParsedURI * @property {Base58Address} address diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index ca281047..1491d329 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -831,6 +831,13 @@ utils.parseHost = function parseHost(addr) { if (typeof addr === 'object') return addr; + if (!/[\[:]/.test(addr)) { + return { + host: addr, + port: 0 + }; + } + if (addr.indexOf(']') !== -1) parts = addr.split(/\]:?/); else @@ -2417,6 +2424,36 @@ utils.binarySearch = function binarySearch(items, key, insert, compare) { return start - 1; }; +/** + * Perform a binary insert on a sorted array. + * @param {Array} items + * @param {Object} item + * @param {Function?} compare + * @returns {Number} Length. + */ + +utils.binaryInsert = function binaryInsert(items, item, compare) { + var i = utils.binarySearch(items, item, true, compare); + items.splice(i + 1, 0, item); + return items.length; +}; + +/** + * Perform a binary removal on a sorted array. + * @param {Array} items + * @param {Object} item + * @param {Function?} compare + * @returns {Number} Length. + */ + +utils.binaryRemove = function binaryRemove(items, item, compare) { + var i = utils.binarySearch(items, item, false, compare); + if (i === -1) + return false; + items.splice(i, 1); + return true; +}; + /** * Reference to the global object. * @const {Object}