diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index d4e0517d..32e47b9f 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -27,12 +27,15 @@ var P2P = function(options) { BaseService.call(this, options); this.options = options; this._maxPeers = this.options.maxPeers || 60; + this._minPeers = this.options.minPeers || 1; this._peers = this.options.peers; this.subscriptions = {}; this.subscriptions.block = []; this.subscriptions.transaction = []; this.messages = new p2p.Messages({ network: this.node.network }); - + this._peerHeights = []; + this._peers = []; + this._peerIndex = 0; }; util.inherits(P2P, BaseService); @@ -86,6 +89,31 @@ P2P.prototype._initPool = function() { this._pool = new p2p.Pool(opts); }; +P2P.prototype._addPeer = function(peer) { + this._peers.push(peer); +}; + +P2P.prototype._removePeer = function(peer) { + this._peer.splice(peer, 1); +}; + +P2P.prototype._getPeer = function() { + // TODO: this logic could get complicated depending on the state of the pool + if (this._peers.length === 0) { + return; + } + var index = this._peerIndex++ % this._peers.length; + return this._peers[index]; +}; + +P2P.prototype._getBestHeight = function(peer) { + this._peerHeights.push(peer.bestHeight); + if (this._peerHeights >= this._minPeers) { + return Math.max(this._bestHeights); + } +}; + + P2P.prototype._setupListeners = function() { var self = this; @@ -95,11 +123,17 @@ P2P.prototype._setupListeners = function() { peer.network.alias + ', version: ' + peer.version + ', subversion: ' + peer.subversion + ', status: ' + peer.status + ', port: ' + peer.port + ', best height: ' + peer.bestHeight); - peer.sendMessage(self.messages.MemPool()); + + self._addPeer(peer); + var bestHeight = self._getBestHeight(peer); + if (bestHeight >= 0) { + self.emit('bestHeight', bestHeight); + } }); self._pool.on('peerdisconnect', function(peer, addr) { + self._removePeer(peer); log.info('Disconnected from peer: ' + addr.ip.v4); }); @@ -115,7 +149,6 @@ P2P.prototype._setupListeners = function() { self._inv.set(inv.hash, true); - //self._generateRetrievalMessage() newDataNeeded.push(inv); } @@ -132,9 +165,7 @@ P2P.prototype._setupListeners = function() { self._pool.on('peerblock', self._broadcastBlock.bind(self)); self.node.on('ready', function() { - setTimeout(function() { - self._pool.connect(); - }, 1000); + self._pool.connect(); }); }; @@ -152,7 +183,12 @@ P2P.prototype._broadcastBlock = function(peer, message) { }; P2P.prototype.getAPIMethods = function() { - return []; + var methods = [ + ['createHeaderStream', this, this.getHeaders, 2], + ['createMempoolStream', this, this.getMempool, 0], + ['createBlockStream', this, this.getBlocks, 2] + ]; + return methods; }; P2P.prototype.getPublishEvents = function() { @@ -168,8 +204,54 @@ P2P.prototype.getPublishEvents = function() { scope: this, subscribe: this.subscribe.bind(this, 'block'), unsubscribe: this.unsubscribe.bind(this, 'block') - }, + } ]; }; + +P2P.prototype.createHeaderStream = function(startBlockHash, endBlockHash) { + + var self = this; + if (!endBlockHash) { + endBlockHash = startBlockHash; + } + + var message = self.messages.GetHeaders(startBlockHash, endBlockHash); + var peer = self._getPeer(); + + peer.on('peerheaders', function(peer, message) { + self.emit('headers', message.headers); + }); + + peer.sendMessage(message); + return self; + +}; + +P2P.prototype.createMempoolStream = function() { + + var self = this; + var peer = self._getPeer(); + peer.sendMessage(self.messages.MemPool()); + + peer.on('inv', function(message) { + }); + peer.on('tx', function(message) { + + }); + +}; + +P2P.prototype.createBlockStream = function(startBlockHash, endBlockHash) { + + if (!endBlockHash) { + endBlockHash = startBlockHash; + } + + var message = this.messages.GetBlocks(startBlockHash, endBlockHash); + var peer = this._getPeer(); + peer.sendMessage(message); + +}; + module.exports = P2P; diff --git a/regtest/test_bus.js b/regtest/test_bus.js index 2a9d3472..874274c4 100644 --- a/regtest/test_bus.js +++ b/regtest/test_bus.js @@ -26,16 +26,51 @@ TestBusService.prototype.start = function(callback) { self.bus = self.node.openBus({ remoteAddress: 'localhost' }); self.bus.on('p2p/transaction', function(tx) { - self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]); + var self = this; + self._cache.transaction.push(tx); + if (self._ready) { + for(var i = 0; i < self._cache.transaction.length; i++) { + var transaction = self._cache.transaction.unshift(); + self.pubSocket.send([ 'transaction', new Buffer(transaction.uncheckedSerialize(), 'hex') ]); + } + return; + } }); self.bus.on('p2p/block', function(block) { - self.pubSocket.send([ 'block', block.toBuffer() ]); + var self = this; + self._cache.block.push(block); + if (self._ready) { + for(var i = 0; i < self._cache.block.length; i++) { + var blk = self._cache.block.unshift(); + self.pubSocket.send([ 'block', blk.toBuffer() ]); + } + return; + } + }); + + self.bus.on('p2p/headers', function(headers) { + var self = this; + self._cache.headers.push(headers); + if (self._ready) { + for(var i = 0; i < self._cache.headers.length; i++) { + var hdrs = self._cache.headers.unshift(); + self.pubSocket.send([ 'headers', hdrs ]); + } + return; + } }); self.bus.subscribe('p2p/transaction'); self.bus.subscribe('p2p/block'); + self.node.on('ready', function() { + //we could be getting events right away, so until our subscribers get connected, we need to cache + self._cache = { transaction: [], block: [], headers: [] }; + setTimeout(function() { + self._ready = true; + }, 2000); + }); callback(); };