diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 32e47b9f..e5bcaa61 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -28,7 +28,7 @@ var P2P = function(options) { this.options = options; this._maxPeers = this.options.maxPeers || 60; this._minPeers = this.options.minPeers || 1; - this._peers = this.options.peers; + this._configPeers = this.options.peers; this.subscriptions = {}; this.subscriptions.block = []; this.subscriptions.transaction = []; @@ -36,6 +36,7 @@ var P2P = function(options) { this._peerHeights = []; this._peers = []; this._peerIndex = 0; + this._filters = []; }; util.inherits(P2P, BaseService); @@ -80,8 +81,8 @@ P2P.prototype._initCache = function() { P2P.prototype._initPool = function() { var opts = {}; - if (this._peers) { - opts.addrs = this._peers; + if (this._configPeers) { + opts.addrs = this._configPeers; opts.dnsSeed = false; } opts.maxPeers = this._maxPeers; @@ -94,11 +95,11 @@ P2P.prototype._addPeer = function(peer) { }; P2P.prototype._removePeer = function(peer) { - this._peer.splice(peer, 1); + this._peers.splice(this._peers.indexOf(peer), 1); }; P2P.prototype._getPeer = function() { - // TODO: this logic could get complicated depending on the state of the pool + if (this._peers.length === 0) { return; } @@ -114,6 +115,18 @@ P2P.prototype._getBestHeight = function(peer) { }; +P2P.prototype._appendFilters = function(peer, filter) { + var filterList = this._filters[peer]; + + if (!filterList || filterList.length !== 0) { + return; + } +console.log('here', filter); + if (filter.length >= 10) { + this._filters[peer] = filter; + } +}; + P2P.prototype._setupListeners = function() { var self = this; @@ -142,6 +155,22 @@ P2P.prototype._setupListeners = function() { var newDataNeeded = []; + // the bitcoin p2p network can send unsolicited inventory messages + // because we've asked it to when connecting. We want to know about new + // blocks and transactions asynchronoously. The problem is that when we + // do ask for inventory synchronuously, we can't tell messages arriving + // after we've asked if they are a response to our request or unsolicited. + // There is no request id that we can match against the response. + // The safest way to get around this is to disconnect the peer and reconnect + // without relaying turned om, so the peer will not spontaneously send us inv + // messages. This way we know for sure that the inv message is a response to + // our request. + + // have we asked for inventory from a particular peer? Check our filters list + // based on filters that may have been previously set by a request from this service, + // we need to set the inventory list on the peer's filter so that when the inventory + // is retrieved, it can be returned to the caller and not sent out on the main bux + self._appendFilters(peer, message.inventory); message.inventory.forEach(function(inv) { @@ -161,7 +190,8 @@ P2P.prototype._setupListeners = function() { }); - self._pool.on('peertx', self._broadcastTx.bind(self)); + self._pool.on('peertx', self._filterTx.bind(self)); + self._pool.on('peerblock', self._broadcastBlock.bind(self)); self.node.on('ready', function() { @@ -170,6 +200,37 @@ P2P.prototype._setupListeners = function() { }; +P2P.prototype._processFilter = function(filter, message) { + var existsInFilter = false; + if (!filter) { + return existsInFilter; + } + filter.forEach(function(inv, index) { +console.log(inv.hash.toString('hex'), message.transaction.hash); + if (message.transaction.hash === inv.hash) { + existsInFilter = true; + filter.splice(index, 1); + } + }); + return existsInFilter; +}; + +P2P.prototype._filterTx = function(peer, message) { + var self = this; + + // if this tx matches any of this peer's filters, then emit the tx internally only + // and not to the external bus + var existsInFilter = self._processFilter(self._filters[peer], message); + + if (!existsInFilter) { + self._broadcastTx(peer, message); + return; + } + + self.emit('tx', message.transaction); + +}; + P2P.prototype._broadcastTx = function(peer, message) { for (var i = 0; i < this.subscriptions.transaction.length; i++) { this.subscriptions.transaction[i].emit('p2p/transaction', message.transaction); @@ -185,7 +246,7 @@ P2P.prototype._broadcastBlock = function(peer, message) { P2P.prototype.getAPIMethods = function() { var methods = [ ['createHeaderStream', this, this.getHeaders, 2], - ['createMempoolStream', this, this.getMempool, 0], + ['getMempool', this, this.getMempool, 1], ['createBlockStream', this, this.getBlocks, 2] ]; return methods; @@ -228,18 +289,27 @@ P2P.prototype.createHeaderStream = function(startBlockHash, endBlockHash) { }; -P2P.prototype.createMempoolStream = function() { +P2P.prototype.getMempool = function(callback) { var self = this; var peer = self._getPeer(); + + if (!peer) { + return callback(new Error('Could not get a peer to retrieve mempool.')); + } + + self._filters[peer] = []; + peer.sendMessage(self.messages.MemPool()); + var mempool = []; - peer.on('inv', function(message) { - }); - peer.on('tx', function(message) { - + self.on('tx', function(tx) { + mempool.push(tx); }); + self.on('end', function() { + callback(null, mempool); + }); }; P2P.prototype.createBlockStream = function(startBlockHash, endBlockHash) { diff --git a/regtest/p2p.js b/regtest/p2p.js index e570bce7..c2959ea3 100644 --- a/regtest/p2p.js +++ b/regtest/p2p.js @@ -13,7 +13,7 @@ var Transaction = bitcore.Transaction; var PrivateKey = bitcore.PrivateKey; var Unit = bitcore.Unit; -var debug = false; +var debug = true; var extraDebug = true; var bitcoreDataDir = '/tmp/bitcore'; var bitcoinDataDir = '/tmp/bitcoin'; @@ -79,9 +79,9 @@ var bitcore = { }; if (debug && extraDebug) { - bitcoin.args.printtoconsole = 1, - bitcoin.args.debug = 1, - bitcoin.args.logips = 1 + bitcoin.args.printtoconsole = 1; + bitcoin.args.debug = 1; + bitcoin.args.logips = 1; } var opts = { @@ -163,7 +163,7 @@ describe('P2P Operations', function() { ], done); }); - it('should connect to the p2p network and stream the mempool to clients', function(done) { + it.only('should connect to the p2p network and stream the mempool to clients', function(done) { async.series([ utils.unlockWallet.bind(utils), utils.setupInitialTxs.bind(utils), diff --git a/regtest/test_bus.js b/regtest/test_bus.js index 874274c4..13911250 100644 --- a/regtest/test_bus.js +++ b/regtest/test_bus.js @@ -8,6 +8,7 @@ var log = index.log; var TestBusService = function(options) { BaseService.call(this, options); + this._cache = { transaction: [], block: [], headers: [] }; }; inherits(TestBusService, BaseService); @@ -26,11 +27,10 @@ TestBusService.prototype.start = function(callback) { self.bus = self.node.openBus({ remoteAddress: 'localhost' }); self.bus.on('p2p/transaction', function(tx) { - var self = this; self._cache.transaction.push(tx); if (self._ready) { for(var i = 0; i < self._cache.transaction.length; i++) { - var transaction = self._cache.transaction.unshift(); + var transaction = self._cache.transaction.shift(); self.pubSocket.send([ 'transaction', new Buffer(transaction.uncheckedSerialize(), 'hex') ]); } return; @@ -49,28 +49,22 @@ TestBusService.prototype.start = function(callback) { } }); - self.bus.on('p2p/headers', function(headers) { - var self = this; - self._cache.headers.push(headers); - if (self._ready) { - for(var i = 0; i < self._cache.headers.length; i++) { - var hdrs = self._cache.headers.unshift(); - self.pubSocket.send([ 'headers', hdrs ]); - } - return; - } - }); - self.bus.subscribe('p2p/transaction'); self.bus.subscribe('p2p/block'); self.node.on('ready', function() { - //we could be getting events right away, so until our subscribers get connected, we need to cache - self._cache = { transaction: [], block: [], headers: [] }; + + setTimeout(function() { self._ready = true; + self.node.services.p2p.getMempool(function(err, mempool) { + +console.log(mempool); + }); }, 2000); + }); + callback(); };