From 84f4dbf7aac1ead54c22a4ff5106e267832e871d Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 2 Jun 2017 15:03:39 -0400 Subject: [PATCH] wip --- lib/services/p2p/index.js | 118 ++++++++++++++++++-------------------- package.json | 1 + regtest/p2p.js | 51 +++++++++------- regtest/test_bus.js | 58 +++++++++---------- regtest/utils.js | 30 +++++----- 5 files changed, 132 insertions(+), 126 deletions(-) diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 52cba33b..4ceb79ab 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -3,12 +3,10 @@ var p2p = require('bitcore-p2p'); var LRU = require('lru-cache'); var util = require('util'); -var _ = require('lodash'); -var bitcore = require('bitcore-lib'); var index = require('../../'); var log = index.log; var BaseService = require('../../service'); -var constants = require('../../constants'); +var assert = require('assert'); /* Purpose: @@ -18,6 +16,7 @@ var constants = require('../../constants'); 4. publish new blocks (pub/sub) 5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters) */ + var P2P = function(options) { if (!(this instanceof P2P)) { @@ -36,7 +35,7 @@ var P2P = function(options) { this._peerHeights = []; this._peers = []; this._peerIndex = 0; - this._filters = {}; + this._mempoolFilter = []; }; util.inherits(P2P, BaseService); @@ -69,24 +68,12 @@ P2P.prototype.getPublishEvents = function() { subscribe: this.subscribe.bind(this, 'transaction'), unsubscribe: this.unsubscribe.bind(this, 'transaction') }, - { - name: 'p2p/mempool', - scope: this, - subscribe: this.subscribe.bind(this, 'mempool'), - unsubscribe: this.unsubscribe.bind(this, 'mempool') - }, { name: 'p2p/block', scope: this, subscribe: this.subscribe.bind(this, 'block'), unsubscribe: this.unsubscribe.bind(this, 'block') }, - { - name: 'p2p/historicalBlock', - scope: this, - subscribe: this.subscribe.bind(this, 'historicalBlock'), - unsubscribe: this.unsubscribe.bind(this, 'block') - }, { name: 'p2p/header', scope: this, @@ -159,28 +146,27 @@ P2P.prototype._onPeerReady = function(peer, addr) { peer.subversion + ', status: ' + peer.status + ', port: ' + peer.port + ', best height: ' + peer.bestHeight); - self._addPeer(peer); - var bestHeight = self._getBestHeight(peer); + this._addPeer(peer); + var bestHeight = this._getBestHeight(peer); if (bestHeight >= 0) { - self.emit('bestHeight', bestHeight); + this.emit('bestHeight', bestHeight); } }; P2P.prototype._onPeerDisconnect = function(peer, addr) { - self._removePeer(peer); + this._removePeer(peer); log.info('Disconnected from peer: ' + addr.ip.v4); }; P2P.prototype._onPeerInventory = function(peer, message) { + var self = this; var newDataNeeded = []; - self._setFilterScalar(peer, message.inventory.length); - message.inventory.forEach(function(inv) { if (!self._inv.get(inv.hash)) { @@ -198,9 +184,14 @@ P2P.prototype._onPeerInventory = function(peer, message) { }; P2P.prototype._onPeerTx = function(peer, message) { + var filteredMessage = this._applyMempoolFilter(message); + if (filteredMessage) { + this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction); + } }; -p2p.prototype._onPeerBlock = function(peer, message) { +P2P.prototype._onPeerBlock = function(peer, message) { + this._broadcast(this.subscriptions.block, 'p2p/block', message.block); }; P2P.prototype._setupListeners = function() { @@ -223,6 +214,38 @@ P2P.prototype._broadcast = function(subscribers, name, entity) { } }; +P2P.prototype._applyMempoolFilter = function(message) { + if (!this._mempoolFilter) { + return message; + } + var txIndex = this._mempoolFilter.indexOf(message.transaction.hash); + if (txIndex >= 0) { + this._mempoolFilter.splice(txIndex, 1); + return; + } + return message; +}; + +P2P.prototype._getResourceFilter = function(filter, resource) { + + // this function knows about mempool, block and header filters + // mempool filters are considered after the tx is delivered to us + // because we can't match a tx to the query params. + if (resource === 'mempool') { + this._mempoolFilter = filter; + return; + } + + if (resource === 'blocks' || resource === 'headers') { + assert(filter.newestHash, 'A "newestHash" field is required to retrieve blocks or headers'); + if (!filter.oldestHash) { + filter.oldestHash = filter.newestHash; + } + return { starts: [filter.newestHash], stop: filter.oldestHash }; + } + +}; + P2P.prototype.getAPIMethods = function() { var methods = [ ['getHeaders', this, this.getHeaders, 2], @@ -232,61 +255,34 @@ P2P.prototype.getAPIMethods = function() { return methods; }; -P2P.prototype._getResourceFilter = function(filter, resource) { - - // this function knows about mempool, block and header filters - // mempool filters are considered after the tx is delivered to us - // because we can't match a tx to the query params. - if (resource === 'mempool') { - if (resource instanceof LRU) { - return filter; - } - - } - -}; - -P2P.prototype._getResourceMessage = function(filter, resource) { - - // filter should be a list of block hashes representing the - // resource that are -not- needed, all headers outside this list will be - // broadcast on the p2p/headers bus, meaning all subscribers to - // this event will get the results whether they asked for them - // or not. - - // _getPeer can throw - var peer = this._getPeer(); - - return this._getResourceFilter(filter, resource); - -}; - P2P.prototype.getHeaders = function(filter) { - var headerFilter = this._getResourceMessage(filter, 'headers'); - peer.sendMessage(this.messages.GetHeader(headerFilter)); + var peer = this._getPeer(); + var headerFilter = this._getResourceFilter(filter, 'headers'); + peer.sendMessage(this.messages.GetHeaders(headerFilter)); }; P2P.prototype.getMempool = function(filter) { + var peer = this._getPeer(); + // mempools can grow quite large, especially if subscribers are liberally accepting // all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can // grow quite large over time. Care should be taken to limit the size of this filter. // Even still, when a tx is broadcasted, the reference to it is dropped from the filter. - this._mempoolFilter = this._getResourceMessage(filter, 'mempool'); + this._mempoolFilter = this._getResourceFilter(filter, 'mempool'); - peer.sendMessage(self.messages.MemPool()); + + peer.sendMessage(this.messages.MemPool()); }; P2P.prototype.getBlocks = function(filter) { - // it is on the caller to work out what block hashes are needed from the network, - // so a list of block hashes should be given. If this is an initial sync from the - // genesis block, then this list will be quite large (32 bytes * current block height). - var blockFilter = this._getResourceMessage(filter, 'blocks'); - peer.sendMessage(this.messages.GetBlock(blockFilter)); + var peer = this._getPeer(); + var blockFilter = this._getResourceFilter(filter, 'blocks'); + peer.sendMessage(this.messages.GetBlocks(blockFilter)); }; diff --git a/package.json b/package.json index b1dda387..6f8b3825 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,7 @@ { "name": "bitcore-node", "description": "Full node with extended capabilities using Bitcore and Bitcoin Core", + "engines": { "node": ">=6.10.3" }, "author": "BitPay ", "version": "4.0.0", "main": "./index.js", diff --git a/regtest/p2p.js b/regtest/p2p.js index 4472e370..27da1c6c 100644 --- a/regtest/p2p.js +++ b/regtest/p2p.js @@ -51,7 +51,7 @@ var bitcore = { network: 'regtest', port: 53001, datadir: bitcoreDataDir, - services: ['p2p', 'test-p2p'], + services: ['p2p', 'test-p2p', 'web'], servicesConfig: { p2p: { peers: [ @@ -101,7 +101,19 @@ var opts = { satoshisSent: 0, walletId: crypto.createHash('sha256').update('test').digest('hex'), satoshisReceived: 0, - initialHeight: 150 + initialHeight: 150, + path: '/test/info', + errorFilter: function(err, res) { +console.log(err, res); + try { + var info = JSON.parse(res); + if (res.result) { + return; + } + } catch(e) { + return e; + } + } }; var utils = new Utils(opts); @@ -142,12 +154,6 @@ function setupZmqSubscriber(callback) { callback(); } -function waitForZmqConnection(callback) { - async.retry({ interval: 500, times: 50 }, function(next) { - return next(txs.length < utils.opts.initialTxs.length); - }, callback); -} - describe('P2P Operations', function() { this.timeout(60000); @@ -163,15 +169,6 @@ describe('P2P Operations', function() { ], done); }); - //it('should get blocks from peer that we do not have on startup', function(done) { - - // setTimeout(function() { - // expect(blocks.length).to.equal(10); - // done(); - // }, 1000); - - // done(); - //}); it('should connect to the p2p network and stream the mempool to clients', function(done) { async.series([ @@ -186,7 +183,7 @@ describe('P2P Operations', function() { utils.startBitcoreNode.bind(utils), setupZmqSubscriber, - waitForZmqConnection + utils.waitForBitcoreNode.bind(utils) ], function(err) { @@ -254,12 +251,24 @@ describe('P2P Operations', function() { }); - it('should send new blocks as they are broadcasted by our trusted peer', function(done) { - //expect(blocks.length).to.equal(1); + it('should get blocks from peer that we do not have on startup', function(done) { + expect(blocks.length).to.equal(151); done(); }); - + it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) { + var newTx = txs.shift(); + utils.queryBitcoreNode(Object.assign({ + path: '/mempool?filter=' + txs + }), bitcore.httpOpts, function(err, data) { + if (err) { + return done(err); + } + newTxs = JSON.parse(data).transactions; + expect(newTxs.length).to.equal(2); + expect(newTx).to.equal(newTxs[0].hash); + }); + }); }); diff --git a/regtest/test_bus.js b/regtest/test_bus.js index 41d326ae..17b1437a 100644 --- a/regtest/test_bus.js +++ b/regtest/test_bus.js @@ -15,7 +15,7 @@ var TestBusService = function(options) { inherits(TestBusService, BaseService); -TestBusService.dependencies = ['p2p']; +TestBusService.dependencies = ['p2p', 'web']; TestBusService.prototype.start = function(callback) { @@ -59,41 +59,39 @@ TestBusService.prototype.start = function(callback) { self.node.on('ready', function() { - setTimeout(function() { - self._ready = true; - self.node.services.p2p.getMempool(function(err, mempool) { - - if(err) { - throw err; - } - - mempool.forEach(function(tx) { - self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]); - }); - }); - - assert(self._bestHeight, 'best height not set on a time after ready'); - self.node.services.p2p.getBlocks( - constants.BITCOIN_GENESIS_HASH.regtest, - self._bestHeight, - function(err, blocks) { - - if(err) { - throw err; - } - - blocks.forEach(function(block) { - self.pubSocket.send([ 'block', block.toBuffer() ]); - }); - }); - - }, 2000); + self._ready = true; + self.node.services.p2p.getMempool(); + self.node.services.p2p.getBlocks({ newestHash: constants.BITCOIN_GENESIS_HASH.regtest }); }); callback(); }; +TestBusService.prototype.setupRoutes = function(app) { + + var self = this; + + app.get('/mempool', function(req, res) { + self.node.services.p2p.getMempool(req.params.filter); + res.status(200); + }); + + app.get('/blocks', function(req, res) { + self.node.services.p2p.getBlocks(req.params.filter); + res.status(200); + }); + + app.get('/info', function(req, res) { +console.log('test test test test'); + res.status(200).jsonp({ result: (self._ready && (self._bestHeight >= 0))}); + }); +}; + +TestBusService.prototype.getRoutePrefix = function() { + return 'test'; +}; + TestBusService.prototype.stop = function(callback) { callback(); }; diff --git a/regtest/utils.js b/regtest/utils.js index fcc6d337..6f440851 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -78,21 +78,23 @@ Utils.prototype.waitForBitcoreNode = function(callback) { var self = this; - var errorFilter = function(err, res) { - try { - var info = JSON.parse(res); - if (info.dbheight === self.opts.blockHeight && - info.dbheight === info.bitcoindheight && - info.bitcoindhash === info.dbhash) { - return; + var errorFilter = self.opts.errorFilter; + if (!errorFilter) { + errorFilter = function(err, res) { + try { + var info = JSON.parse(res); + if (info.dbheight === self.opts.blockHeight && + info.dbheight === info.bitcoindheight && + info.bitcoindhash === info.dbhash) { + return; + } + return res; + } catch(e) { + return e; } - return res; - } catch(e) { - return e; - } - }; - - var httpOpts = self.getHttpOpts({ path: '/info', errorFilter: errorFilter }); + }; + } + var httpOpts = self.getHttpOpts({ path: opts.path || '/info', errorFilter: errorFilter }); self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback); };