diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index e738af36..52cba33b 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -12,11 +12,11 @@ var constants = require('../../constants'); /* Purpose: - 1. join a P2P - 2. relay messages - 3. publish new transactions (pub/sub) - 4. publish new blocks (pub/sub) - 5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters) + 1. join a P2P + 2. relay messages + 3. publish new transactions (pub/sub) + 4. publish new blocks (pub/sub) + 5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters) */ var P2P = function(options) { @@ -61,6 +61,41 @@ P2P.prototype.stop = function(callback) { }; +P2P.prototype.getPublishEvents = function() { + return [ + { + name: 'p2p/transaction', + scope: this, + subscribe: this.subscribe.bind(this, 'transaction'), + unsubscribe: this.unsubscribe.bind(this, 'transaction') + }, + { + name: 'p2p/mempool', + scope: this, + subscribe: this.subscribe.bind(this, 'mempool'), + unsubscribe: this.unsubscribe.bind(this, 'mempool') + }, + { + name: 'p2p/block', + scope: this, + subscribe: this.subscribe.bind(this, 'block'), + unsubscribe: this.unsubscribe.bind(this, 'block') + }, + { + name: 'p2p/historicalBlock', + scope: this, + subscribe: this.subscribe.bind(this, 'historicalBlock'), + unsubscribe: this.unsubscribe.bind(this, 'block') + }, + { + name: 'p2p/header', + scope: this, + subscribe: this.subscribe.bind(this, 'header'), + unsubscribe: this.unsubscribe.bind(this, 'header') + } + ]; +}; + P2P.prototype.subscribe = function(name, emitter) { this.subscriptions[name].push(emitter); log.info(emitter.remoteAddress, 'subscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length); @@ -112,226 +147,146 @@ P2P.prototype._getBestHeight = function(peer) { this._peerHeights.push(peer.bestHeight); if (this._peerHeights.length >= this._minPeers) { - // spread operator '...', fancy way of converting an array arg to discrete args return Math.max(...this._peerHeights); } }; -P2P.prototype._setFilterScalar = function(peer, scalar) { +P2P.prototype._onPeerReady = function(peer, addr) { - if (!this._filters[peer]) { - return; + log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' + + peer.network.alias + ', version: ' + peer.version + ', subversion: ' + + peer.subversion + ', status: ' + peer.status + ', port: ' + + peer.port + ', best height: ' + peer.bestHeight); + + self._addPeer(peer); + var bestHeight = self._getBestHeight(peer); + + if (bestHeight >= 0) { + self.emit('bestHeight', bestHeight); } - this._filters[peer] = scalar; +}; +P2P.prototype._onPeerDisconnect = function(peer, addr) { + + self._removePeer(peer); + log.info('Disconnected from peer: ' + addr.ip.v4); + +}; + +P2P.prototype._onPeerInventory = function(peer, message) { + + var newDataNeeded = []; + + self._setFilterScalar(peer, message.inventory.length); + + message.inventory.forEach(function(inv) { + + if (!self._inv.get(inv.hash)) { + + self._inv.set(inv.hash, true); + + newDataNeeded.push(inv); + + } + }); + + if (newDataNeeded.length > 0) { + peer.sendMessage(self.messages.GetData(newDataNeeded)); + } +}; + +P2P.prototype._onPeerTx = function(peer, message) { +}; + +p2p.prototype._onPeerBlock = function(peer, message) { }; P2P.prototype._setupListeners = function() { var self = this; - - self._pool.on('peerready', function(peer, addr) { - - log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' + - peer.network.alias + ', version: ' + peer.version + ', subversion: ' + - peer.subversion + ', status: ' + peer.status + ', port: ' + - peer.port + ', best height: ' + peer.bestHeight); - - self._addPeer(peer); - var bestHeight = self._getBestHeight(peer); - - if (bestHeight >= 0) { - self.emit('bestHeight', bestHeight); - } - }); - - self._pool.on('peerdisconnect', function(peer, addr) { - - self._removePeer(peer); - log.info('Disconnected from peer: ' + addr.ip.v4); - - }); - - self._pool.on('peerinv', function(peer, message) { - - var newDataNeeded = []; - - // The bitcoin p2p network can send unsolicited inventory messages for - // entities such as transactions and blocks. This behavior is configurable - // via the 'relay' field in the version message that we send when making - // outbound connections. The relay option is important to keep enabled so - // that we can reduce the chatter on the network, overall. - // The problem is that when we do ask for inventory synchronuously, - // such as when our indexes are syncing or catching up, we can't tell the - // the difference between unsolicited new messages and actual responses. - // There does not seem to be identifying information to link requests and - // responses. - - // What we will do is set a filter scalar on the peer being queried. We - // will have to assume that the next message from that peer will be our - // response. The length of the inventory vector will then become our filter - // scalar. As we respond to the inbound inventory (getdata), we will decrement - // this filter scalar. This is how we will match request anf response. - self._setFilterScalar(peer, message.inventory.length); - - message.inventory.forEach(function(inv) { - - if (!self._inv.get(inv.hash)) { - - self._inv.set(inv.hash, true); - - newDataNeeded.push(inv); - - } - - }); - - if (newDataNeeded.length > 0) { - peer.sendMessage(self.messages.GetData(newDataNeeded)); - } - - }); - - self._pool.on('peertx', self._filterTx.bind(self)); - - self._pool.on('peerblock', self._broadcastBlock.bind(self)); - + self._pool.on('peerready', self._onPeerReady.bind(self)); + self._pool.on('peerdisconnect', self._onPeerDisconnect.bind(self)); + self._pool.on('peerinv', self._onPeerInventory.bind(self)); + self._pool.on('peertx', self._onPeerTx.bind(self)); + self._pool.on('peerblock', self._onPeerBlock.bind(self)); self.node.on('ready', function() { self._pool.connect(); }); }; -P2P.prototype._filterTx = function(peer, message) { - var self = this; - - // if this tx matches any of this peer's filters, then emit the tx internally only - // and not to the external bus - var filterExists = self._filters[peer]; - - if (!filterExists) { - self._broadcastTx(peer, message); - return; - } - - self.emit('tx', message.transaction); - - if (--self._filters[peer] === 0) { - self._filters[peer] = false; - self.emit('end'); - } - -}; - -P2P.prototype._broadcastTx = function(peer, message) { - for (var i = 0; i < this.subscriptions.transaction.length; i++) { - this.subscriptions.transaction[i].emit('p2p/transaction', message.transaction); - } -}; - -P2P.prototype._broadcastBlock = function(peer, message) { - for (var i = 0; i < this.subscriptions.block.length; i++) { - this.subscriptions.block[i].emit('p2p/block', message.block); +P2P.prototype._broadcast = function(subscribers, name, entity) { + for (var i = 0; i < subscribers.length; i++) { + subscribers[i].emit(name, entity); } }; P2P.prototype.getAPIMethods = function() { var methods = [ - ['createHeaderStream', this, this.getHeaders, 2], + ['getHeaders', this, this.getHeaders, 2], ['getMempool', this, this.getMempool, 1], ['getBlocks', this, this.getBlocks, 2] ]; return methods; }; -P2P.prototype.getPublishEvents = function() { - return [ - { - name: 'p2p/transaction', - scope: this, - subscribe: this.subscribe.bind(this, 'transaction'), - unsubscribe: this.unsubscribe.bind(this, 'transaction') - }, - { - name: 'p2p/block', - scope: this, - subscribe: this.subscribe.bind(this, 'block'), - unsubscribe: this.unsubscribe.bind(this, 'block') +P2P.prototype._getResourceFilter = function(filter, resource) { + + // this function knows about mempool, block and header filters + // mempool filters are considered after the tx is delivered to us + // because we can't match a tx to the query params. + if (resource === 'mempool') { + if (resource instanceof LRU) { + return filter; } - ]; -}; - -P2P.prototype.createHeaderStream = function(startBlockHash, endBlockHash) { - - var self = this; - if (!endBlockHash) { - endBlockHash = startBlockHash; } - var message = self.messages.GetHeaders(startBlockHash, endBlockHash); - var peer = self._getPeer(); - - peer.on('peerheaders', function(peer, message) { - self.emit('headers', message.headers); - }); - - peer.sendMessage(message); - return self; - }; -P2P.prototype.getMempool = function(callback) { +P2P.prototype._getResourceMessage = function(filter, resource) { - var self = this; - var peer = self._getPeer(); + // filter should be a list of block hashes representing the + // resource that are -not- needed, all headers outside this list will be + // broadcast on the p2p/headers bus, meaning all subscribers to + // this event will get the results whether they asked for them + // or not. - if (!peer) { - return callback(new Error('Could not get a peer to retrieve mempool.')); - } + // _getPeer can throw + var peer = this._getPeer(); - self._filters[peer] = true; + return this._getResourceFilter(filter, resource); + +}; + +P2P.prototype.getHeaders = function(filter) { + + var headerFilter = this._getResourceMessage(filter, 'headers'); + peer.sendMessage(this.messages.GetHeader(headerFilter)); + +}; + +P2P.prototype.getMempool = function(filter) { + + // mempools can grow quite large, especially if subscribers are liberally accepting + // all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can + // grow quite large over time. Care should be taken to limit the size of this filter. + // Even still, when a tx is broadcasted, the reference to it is dropped from the filter. + this._mempoolFilter = this._getResourceMessage(filter, 'mempool'); peer.sendMessage(self.messages.MemPool()); - var mempool = []; - self.on('tx', function(tx) { - mempool.push(tx); - }); - - self.on('end', function() { - callback(null, mempool); - }); }; -P2P.prototype.getBlocks = function(startBlockHash, endBlockHash) { +P2P.prototype.getBlocks = function(filter) { - var self = this; - if (!endBlockHash) { - endBlockHash = startBlockHash; - } - - var peer = self._getPeer(); - - if (!peer) { - return callback(new Error('Could not get a peer to retrieve blocks.')); - } - - self._filters[peer] = true; - - var message = this.messages.GetBlocks({ starts: [startBlockHash] }); - peer.sendMessage(message); - var blocks = []; - - self.on('block', function(block) { - blocks.push(block); - }); - - self.on('end', function() { - callback(null, blocks); - }); + // it is on the caller to work out what block hashes are needed from the network, + // so a list of block hashes should be given. If this is an initial sync from the + // genesis block, then this list will be quite large (32 bytes * current block height). + var blockFilter = this._getResourceMessage(filter, 'blocks'); + peer.sendMessage(this.messages.GetBlock(blockFilter)); };