From 019851e48f6db5a6051de04cfade3275fe7edf47 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 1 Jun 2017 08:50:22 -0400 Subject: [PATCH] wip --- lib/services/p2p/index.js | 107 +++++++++++++++++++++----------------- regtest/p2p.js | 21 +++++--- regtest/test_bus.js | 34 ++++++++++-- 3 files changed, 102 insertions(+), 60 deletions(-) diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index e5bcaa61..e738af36 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -36,7 +36,7 @@ var P2P = function(options) { this._peerHeights = []; this._peers = []; this._peerIndex = 0; - this._filters = []; + this._filters = {}; }; util.inherits(P2P, BaseService); @@ -108,26 +108,28 @@ P2P.prototype._getPeer = function() { }; P2P.prototype._getBestHeight = function(peer) { + this._peerHeights.push(peer.bestHeight); - if (this._peerHeights >= this._minPeers) { - return Math.max(this._bestHeights); + + if (this._peerHeights.length >= this._minPeers) { + // spread operator '...', fancy way of converting an array arg to discrete args + return Math.max(...this._peerHeights); } + }; +P2P.prototype._setFilterScalar = function(peer, scalar) { -P2P.prototype._appendFilters = function(peer, filter) { - var filterList = this._filters[peer]; - - if (!filterList || filterList.length !== 0) { + if (!this._filters[peer]) { return; } -console.log('here', filter); - if (filter.length >= 10) { - this._filters[peer] = filter; - } + + this._filters[peer] = scalar; + }; P2P.prototype._setupListeners = function() { + var self = this; self._pool.on('peerready', function(peer, addr) { @@ -139,6 +141,7 @@ P2P.prototype._setupListeners = function() { self._addPeer(peer); var bestHeight = self._getBestHeight(peer); + if (bestHeight >= 0) { self.emit('bestHeight', bestHeight); } @@ -155,22 +158,23 @@ P2P.prototype._setupListeners = function() { var newDataNeeded = []; - // the bitcoin p2p network can send unsolicited inventory messages - // because we've asked it to when connecting. We want to know about new - // blocks and transactions asynchronoously. The problem is that when we - // do ask for inventory synchronuously, we can't tell messages arriving - // after we've asked if they are a response to our request or unsolicited. - // There is no request id that we can match against the response. - // The safest way to get around this is to disconnect the peer and reconnect - // without relaying turned om, so the peer will not spontaneously send us inv - // messages. This way we know for sure that the inv message is a response to - // our request. + // The bitcoin p2p network can send unsolicited inventory messages for + // entities such as transactions and blocks. This behavior is configurable + // via the 'relay' field in the version message that we send when making + // outbound connections. The relay option is important to keep enabled so + // that we can reduce the chatter on the network, overall. + // The problem is that when we do ask for inventory synchronuously, + // such as when our indexes are syncing or catching up, we can't tell the + // the difference between unsolicited new messages and actual responses. + // There does not seem to be identifying information to link requests and + // responses. - // have we asked for inventory from a particular peer? Check our filters list - // based on filters that may have been previously set by a request from this service, - // we need to set the inventory list on the peer's filter so that when the inventory - // is retrieved, it can be returned to the caller and not sent out on the main bux - self._appendFilters(peer, message.inventory); + // What we will do is set a filter scalar on the peer being queried. We + // will have to assume that the next message from that peer will be our + // response. The length of the inventory vector will then become our filter + // scalar. As we respond to the inbound inventory (getdata), we will decrement + // this filter scalar. This is how we will match request anf response. + self._setFilterScalar(peer, message.inventory.length); message.inventory.forEach(function(inv) { @@ -200,35 +204,25 @@ P2P.prototype._setupListeners = function() { }; -P2P.prototype._processFilter = function(filter, message) { - var existsInFilter = false; - if (!filter) { - return existsInFilter; - } - filter.forEach(function(inv, index) { -console.log(inv.hash.toString('hex'), message.transaction.hash); - if (message.transaction.hash === inv.hash) { - existsInFilter = true; - filter.splice(index, 1); - } - }); - return existsInFilter; -}; - P2P.prototype._filterTx = function(peer, message) { var self = this; // if this tx matches any of this peer's filters, then emit the tx internally only // and not to the external bus - var existsInFilter = self._processFilter(self._filters[peer], message); + var filterExists = self._filters[peer]; - if (!existsInFilter) { + if (!filterExists) { self._broadcastTx(peer, message); return; } self.emit('tx', message.transaction); + if (--self._filters[peer] === 0) { + self._filters[peer] = false; + self.emit('end'); + } + }; P2P.prototype._broadcastTx = function(peer, message) { @@ -247,7 +241,7 @@ P2P.prototype.getAPIMethods = function() { var methods = [ ['createHeaderStream', this, this.getHeaders, 2], ['getMempool', this, this.getMempool, 1], - ['createBlockStream', this, this.getBlocks, 2] + ['getBlocks', this, this.getBlocks, 2] ]; return methods; }; @@ -298,7 +292,7 @@ P2P.prototype.getMempool = function(callback) { return callback(new Error('Could not get a peer to retrieve mempool.')); } - self._filters[peer] = []; + self._filters[peer] = true; peer.sendMessage(self.messages.MemPool()); var mempool = []; @@ -312,15 +306,32 @@ P2P.prototype.getMempool = function(callback) { }); }; -P2P.prototype.createBlockStream = function(startBlockHash, endBlockHash) { +P2P.prototype.getBlocks = function(startBlockHash, endBlockHash) { + var self = this; if (!endBlockHash) { endBlockHash = startBlockHash; } - var message = this.messages.GetBlocks(startBlockHash, endBlockHash); - var peer = this._getPeer(); + var peer = self._getPeer(); + + if (!peer) { + return callback(new Error('Could not get a peer to retrieve blocks.')); + } + + self._filters[peer] = true; + + var message = this.messages.GetBlocks({ starts: [startBlockHash] }); peer.sendMessage(message); + var blocks = []; + + self.on('block', function(block) { + blocks.push(block); + }); + + self.on('end', function() { + callback(null, blocks); + }); }; diff --git a/regtest/p2p.js b/regtest/p2p.js index c2959ea3..4472e370 100644 --- a/regtest/p2p.js +++ b/regtest/p2p.js @@ -163,7 +163,17 @@ describe('P2P Operations', function() { ], done); }); - it.only('should connect to the p2p network and stream the mempool to clients', function(done) { + //it('should get blocks from peer that we do not have on startup', function(done) { + + // setTimeout(function() { + // expect(blocks.length).to.equal(10); + // done(); + // }, 1000); + + // done(); + //}); + + it('should connect to the p2p network and stream the mempool to clients', function(done) { async.series([ utils.unlockWallet.bind(utils), utils.setupInitialTxs.bind(utils), @@ -203,7 +213,7 @@ describe('P2P Operations', function() { }); }); - it('should send new transactions as they are broadcasted by our trusted peer', function(done) { + it('should send new transactions as they are broadcasted by our trusted peer (unsoliticted)', function(done) { var tx; async.series([ @@ -244,13 +254,8 @@ describe('P2P Operations', function() { }); - it('should get blocks from peer that we do not have on startup', function(done) { - - done(); - }); - it('should send new blocks as they are broadcasted by our trusted peer', function(done) { - expect(blocks.length).to.equal(1); + //expect(blocks.length).to.equal(1); done(); }); diff --git a/regtest/test_bus.js b/regtest/test_bus.js index 13911250..41d326ae 100644 --- a/regtest/test_bus.js +++ b/regtest/test_bus.js @@ -5,6 +5,8 @@ var inherits = require('util').inherits; var zmq = require('zmq'); var index = require('../lib'); var log = index.log; +var constants = require('../lib/constants'); +var assert = require('assert'); var TestBusService = function(options) { BaseService.call(this, options); @@ -37,12 +39,15 @@ TestBusService.prototype.start = function(callback) { } }); + self.node.services.p2p.on('bestHeight', function(height) { + self._bestHeight = height; + }); + self.bus.on('p2p/block', function(block) { - var self = this; self._cache.block.push(block); if (self._ready) { for(var i = 0; i < self._cache.block.length; i++) { - var blk = self._cache.block.unshift(); + var blk = self._cache.block.shift(); self.pubSocket.send([ 'block', blk.toBuffer() ]); } return; @@ -54,13 +59,34 @@ TestBusService.prototype.start = function(callback) { self.node.on('ready', function() { - setTimeout(function() { self._ready = true; self.node.services.p2p.getMempool(function(err, mempool) { -console.log(mempool); + if(err) { + throw err; + } + + mempool.forEach(function(tx) { + self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]); + }); }); + + assert(self._bestHeight, 'best height not set on a time after ready'); + self.node.services.p2p.getBlocks( + constants.BITCOIN_GENESIS_HASH.regtest, + self._bestHeight, + function(err, blocks) { + + if(err) { + throw err; + } + + blocks.forEach(function(block) { + self.pubSocket.send([ 'block', block.toBuffer() ]); + }); + }); + }, 2000); });