wip
This commit is contained in:
parent
76be902463
commit
84f4dbf7aa
@ -3,12 +3,10 @@
|
|||||||
var p2p = require('bitcore-p2p');
|
var p2p = require('bitcore-p2p');
|
||||||
var LRU = require('lru-cache');
|
var LRU = require('lru-cache');
|
||||||
var util = require('util');
|
var util = require('util');
|
||||||
var _ = require('lodash');
|
|
||||||
var bitcore = require('bitcore-lib');
|
|
||||||
var index = require('../../');
|
var index = require('../../');
|
||||||
var log = index.log;
|
var log = index.log;
|
||||||
var BaseService = require('../../service');
|
var BaseService = require('../../service');
|
||||||
var constants = require('../../constants');
|
var assert = require('assert');
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Purpose:
|
Purpose:
|
||||||
@ -18,6 +16,7 @@ var constants = require('../../constants');
|
|||||||
4. publish new blocks (pub/sub)
|
4. publish new blocks (pub/sub)
|
||||||
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
|
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
var P2P = function(options) {
|
var P2P = function(options) {
|
||||||
|
|
||||||
if (!(this instanceof P2P)) {
|
if (!(this instanceof P2P)) {
|
||||||
@ -36,7 +35,7 @@ var P2P = function(options) {
|
|||||||
this._peerHeights = [];
|
this._peerHeights = [];
|
||||||
this._peers = [];
|
this._peers = [];
|
||||||
this._peerIndex = 0;
|
this._peerIndex = 0;
|
||||||
this._filters = {};
|
this._mempoolFilter = [];
|
||||||
};
|
};
|
||||||
|
|
||||||
util.inherits(P2P, BaseService);
|
util.inherits(P2P, BaseService);
|
||||||
@ -69,24 +68,12 @@ P2P.prototype.getPublishEvents = function() {
|
|||||||
subscribe: this.subscribe.bind(this, 'transaction'),
|
subscribe: this.subscribe.bind(this, 'transaction'),
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'transaction')
|
unsubscribe: this.unsubscribe.bind(this, 'transaction')
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: 'p2p/mempool',
|
|
||||||
scope: this,
|
|
||||||
subscribe: this.subscribe.bind(this, 'mempool'),
|
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'mempool')
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: 'p2p/block',
|
name: 'p2p/block',
|
||||||
scope: this,
|
scope: this,
|
||||||
subscribe: this.subscribe.bind(this, 'block'),
|
subscribe: this.subscribe.bind(this, 'block'),
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'block')
|
unsubscribe: this.unsubscribe.bind(this, 'block')
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: 'p2p/historicalBlock',
|
|
||||||
scope: this,
|
|
||||||
subscribe: this.subscribe.bind(this, 'historicalBlock'),
|
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'block')
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: 'p2p/header',
|
name: 'p2p/header',
|
||||||
scope: this,
|
scope: this,
|
||||||
@ -159,28 +146,27 @@ P2P.prototype._onPeerReady = function(peer, addr) {
|
|||||||
peer.subversion + ', status: ' + peer.status + ', port: ' +
|
peer.subversion + ', status: ' + peer.status + ', port: ' +
|
||||||
peer.port + ', best height: ' + peer.bestHeight);
|
peer.port + ', best height: ' + peer.bestHeight);
|
||||||
|
|
||||||
self._addPeer(peer);
|
this._addPeer(peer);
|
||||||
var bestHeight = self._getBestHeight(peer);
|
var bestHeight = this._getBestHeight(peer);
|
||||||
|
|
||||||
if (bestHeight >= 0) {
|
if (bestHeight >= 0) {
|
||||||
self.emit('bestHeight', bestHeight);
|
this.emit('bestHeight', bestHeight);
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._onPeerDisconnect = function(peer, addr) {
|
P2P.prototype._onPeerDisconnect = function(peer, addr) {
|
||||||
|
|
||||||
self._removePeer(peer);
|
this._removePeer(peer);
|
||||||
log.info('Disconnected from peer: ' + addr.ip.v4);
|
log.info('Disconnected from peer: ' + addr.ip.v4);
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._onPeerInventory = function(peer, message) {
|
P2P.prototype._onPeerInventory = function(peer, message) {
|
||||||
|
|
||||||
|
var self = this;
|
||||||
var newDataNeeded = [];
|
var newDataNeeded = [];
|
||||||
|
|
||||||
self._setFilterScalar(peer, message.inventory.length);
|
|
||||||
|
|
||||||
message.inventory.forEach(function(inv) {
|
message.inventory.forEach(function(inv) {
|
||||||
|
|
||||||
if (!self._inv.get(inv.hash)) {
|
if (!self._inv.get(inv.hash)) {
|
||||||
@ -198,9 +184,14 @@ P2P.prototype._onPeerInventory = function(peer, message) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._onPeerTx = function(peer, message) {
|
P2P.prototype._onPeerTx = function(peer, message) {
|
||||||
|
var filteredMessage = this._applyMempoolFilter(message);
|
||||||
|
if (filteredMessage) {
|
||||||
|
this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
p2p.prototype._onPeerBlock = function(peer, message) {
|
P2P.prototype._onPeerBlock = function(peer, message) {
|
||||||
|
this._broadcast(this.subscriptions.block, 'p2p/block', message.block);
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._setupListeners = function() {
|
P2P.prototype._setupListeners = function() {
|
||||||
@ -223,6 +214,38 @@ P2P.prototype._broadcast = function(subscribers, name, entity) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
P2P.prototype._applyMempoolFilter = function(message) {
|
||||||
|
if (!this._mempoolFilter) {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
var txIndex = this._mempoolFilter.indexOf(message.transaction.hash);
|
||||||
|
if (txIndex >= 0) {
|
||||||
|
this._mempoolFilter.splice(txIndex, 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return message;
|
||||||
|
};
|
||||||
|
|
||||||
|
P2P.prototype._getResourceFilter = function(filter, resource) {
|
||||||
|
|
||||||
|
// this function knows about mempool, block and header filters
|
||||||
|
// mempool filters are considered after the tx is delivered to us
|
||||||
|
// because we can't match a tx to the query params.
|
||||||
|
if (resource === 'mempool') {
|
||||||
|
this._mempoolFilter = filter;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resource === 'blocks' || resource === 'headers') {
|
||||||
|
assert(filter.newestHash, 'A "newestHash" field is required to retrieve blocks or headers');
|
||||||
|
if (!filter.oldestHash) {
|
||||||
|
filter.oldestHash = filter.newestHash;
|
||||||
|
}
|
||||||
|
return { starts: [filter.newestHash], stop: filter.oldestHash };
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
P2P.prototype.getAPIMethods = function() {
|
P2P.prototype.getAPIMethods = function() {
|
||||||
var methods = [
|
var methods = [
|
||||||
['getHeaders', this, this.getHeaders, 2],
|
['getHeaders', this, this.getHeaders, 2],
|
||||||
@ -232,61 +255,34 @@ P2P.prototype.getAPIMethods = function() {
|
|||||||
return methods;
|
return methods;
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._getResourceFilter = function(filter, resource) {
|
|
||||||
|
|
||||||
// this function knows about mempool, block and header filters
|
|
||||||
// mempool filters are considered after the tx is delivered to us
|
|
||||||
// because we can't match a tx to the query params.
|
|
||||||
if (resource === 'mempool') {
|
|
||||||
if (resource instanceof LRU) {
|
|
||||||
return filter;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
P2P.prototype._getResourceMessage = function(filter, resource) {
|
|
||||||
|
|
||||||
// filter should be a list of block hashes representing the
|
|
||||||
// resource that are -not- needed, all headers outside this list will be
|
|
||||||
// broadcast on the p2p/headers bus, meaning all subscribers to
|
|
||||||
// this event will get the results whether they asked for them
|
|
||||||
// or not.
|
|
||||||
|
|
||||||
// _getPeer can throw
|
|
||||||
var peer = this._getPeer();
|
|
||||||
|
|
||||||
return this._getResourceFilter(filter, resource);
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
P2P.prototype.getHeaders = function(filter) {
|
P2P.prototype.getHeaders = function(filter) {
|
||||||
|
|
||||||
var headerFilter = this._getResourceMessage(filter, 'headers');
|
var peer = this._getPeer();
|
||||||
peer.sendMessage(this.messages.GetHeader(headerFilter));
|
var headerFilter = this._getResourceFilter(filter, 'headers');
|
||||||
|
peer.sendMessage(this.messages.GetHeaders(headerFilter));
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getMempool = function(filter) {
|
P2P.prototype.getMempool = function(filter) {
|
||||||
|
|
||||||
|
var peer = this._getPeer();
|
||||||
|
|
||||||
// mempools can grow quite large, especially if subscribers are liberally accepting
|
// mempools can grow quite large, especially if subscribers are liberally accepting
|
||||||
// all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can
|
// all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can
|
||||||
// grow quite large over time. Care should be taken to limit the size of this filter.
|
// grow quite large over time. Care should be taken to limit the size of this filter.
|
||||||
// Even still, when a tx is broadcasted, the reference to it is dropped from the filter.
|
// Even still, when a tx is broadcasted, the reference to it is dropped from the filter.
|
||||||
this._mempoolFilter = this._getResourceMessage(filter, 'mempool');
|
this._mempoolFilter = this._getResourceFilter(filter, 'mempool');
|
||||||
|
|
||||||
peer.sendMessage(self.messages.MemPool());
|
|
||||||
|
peer.sendMessage(this.messages.MemPool());
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getBlocks = function(filter) {
|
P2P.prototype.getBlocks = function(filter) {
|
||||||
|
|
||||||
// it is on the caller to work out what block hashes are needed from the network,
|
var peer = this._getPeer();
|
||||||
// so a list of block hashes should be given. If this is an initial sync from the
|
var blockFilter = this._getResourceFilter(filter, 'blocks');
|
||||||
// genesis block, then this list will be quite large (32 bytes * current block height).
|
peer.sendMessage(this.messages.GetBlocks(blockFilter));
|
||||||
var blockFilter = this._getResourceMessage(filter, 'blocks');
|
|
||||||
peer.sendMessage(this.messages.GetBlock(blockFilter));
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "bitcore-node",
|
"name": "bitcore-node",
|
||||||
"description": "Full node with extended capabilities using Bitcore and Bitcoin Core",
|
"description": "Full node with extended capabilities using Bitcore and Bitcoin Core",
|
||||||
|
"engines": { "node": ">=6.10.3" },
|
||||||
"author": "BitPay <dev@bitpay.com>",
|
"author": "BitPay <dev@bitpay.com>",
|
||||||
"version": "4.0.0",
|
"version": "4.0.0",
|
||||||
"main": "./index.js",
|
"main": "./index.js",
|
||||||
|
|||||||
@ -51,7 +51,7 @@ var bitcore = {
|
|||||||
network: 'regtest',
|
network: 'regtest',
|
||||||
port: 53001,
|
port: 53001,
|
||||||
datadir: bitcoreDataDir,
|
datadir: bitcoreDataDir,
|
||||||
services: ['p2p', 'test-p2p'],
|
services: ['p2p', 'test-p2p', 'web'],
|
||||||
servicesConfig: {
|
servicesConfig: {
|
||||||
p2p: {
|
p2p: {
|
||||||
peers: [
|
peers: [
|
||||||
@ -101,7 +101,19 @@ var opts = {
|
|||||||
satoshisSent: 0,
|
satoshisSent: 0,
|
||||||
walletId: crypto.createHash('sha256').update('test').digest('hex'),
|
walletId: crypto.createHash('sha256').update('test').digest('hex'),
|
||||||
satoshisReceived: 0,
|
satoshisReceived: 0,
|
||||||
initialHeight: 150
|
initialHeight: 150,
|
||||||
|
path: '/test/info',
|
||||||
|
errorFilter: function(err, res) {
|
||||||
|
console.log(err, res);
|
||||||
|
try {
|
||||||
|
var info = JSON.parse(res);
|
||||||
|
if (res.result) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch(e) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
var utils = new Utils(opts);
|
var utils = new Utils(opts);
|
||||||
@ -142,12 +154,6 @@ function setupZmqSubscriber(callback) {
|
|||||||
callback();
|
callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
function waitForZmqConnection(callback) {
|
|
||||||
async.retry({ interval: 500, times: 50 }, function(next) {
|
|
||||||
return next(txs.length < utils.opts.initialTxs.length);
|
|
||||||
}, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('P2P Operations', function() {
|
describe('P2P Operations', function() {
|
||||||
|
|
||||||
this.timeout(60000);
|
this.timeout(60000);
|
||||||
@ -163,15 +169,6 @@ describe('P2P Operations', function() {
|
|||||||
], done);
|
], done);
|
||||||
});
|
});
|
||||||
|
|
||||||
//it('should get blocks from peer that we do not have on startup', function(done) {
|
|
||||||
|
|
||||||
// setTimeout(function() {
|
|
||||||
// expect(blocks.length).to.equal(10);
|
|
||||||
// done();
|
|
||||||
// }, 1000);
|
|
||||||
|
|
||||||
// done();
|
|
||||||
//});
|
|
||||||
|
|
||||||
it('should connect to the p2p network and stream the mempool to clients', function(done) {
|
it('should connect to the p2p network and stream the mempool to clients', function(done) {
|
||||||
async.series([
|
async.series([
|
||||||
@ -186,7 +183,7 @@ describe('P2P Operations', function() {
|
|||||||
|
|
||||||
utils.startBitcoreNode.bind(utils),
|
utils.startBitcoreNode.bind(utils),
|
||||||
setupZmqSubscriber,
|
setupZmqSubscriber,
|
||||||
waitForZmqConnection
|
utils.waitForBitcoreNode.bind(utils)
|
||||||
|
|
||||||
], function(err) {
|
], function(err) {
|
||||||
|
|
||||||
@ -254,12 +251,24 @@ describe('P2P Operations', function() {
|
|||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should send new blocks as they are broadcasted by our trusted peer', function(done) {
|
it('should get blocks from peer that we do not have on startup', function(done) {
|
||||||
//expect(blocks.length).to.equal(1);
|
expect(blocks.length).to.equal(151);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) {
|
||||||
|
var newTx = txs.shift();
|
||||||
|
utils.queryBitcoreNode(Object.assign({
|
||||||
|
path: '/mempool?filter=' + txs
|
||||||
|
}), bitcore.httpOpts, function(err, data) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
newTxs = JSON.parse(data).transactions;
|
||||||
|
expect(newTxs.length).to.equal(2);
|
||||||
|
expect(newTx).to.equal(newTxs[0].hash);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -15,7 +15,7 @@ var TestBusService = function(options) {
|
|||||||
|
|
||||||
inherits(TestBusService, BaseService);
|
inherits(TestBusService, BaseService);
|
||||||
|
|
||||||
TestBusService.dependencies = ['p2p'];
|
TestBusService.dependencies = ['p2p', 'web'];
|
||||||
|
|
||||||
TestBusService.prototype.start = function(callback) {
|
TestBusService.prototype.start = function(callback) {
|
||||||
|
|
||||||
@ -59,41 +59,39 @@ TestBusService.prototype.start = function(callback) {
|
|||||||
|
|
||||||
self.node.on('ready', function() {
|
self.node.on('ready', function() {
|
||||||
|
|
||||||
setTimeout(function() {
|
self._ready = true;
|
||||||
self._ready = true;
|
self.node.services.p2p.getMempool();
|
||||||
self.node.services.p2p.getMempool(function(err, mempool) {
|
self.node.services.p2p.getBlocks({ newestHash: constants.BITCOIN_GENESIS_HASH.regtest });
|
||||||
|
|
||||||
if(err) {
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
|
|
||||||
mempool.forEach(function(tx) {
|
|
||||||
self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
assert(self._bestHeight, 'best height not set on a time after ready');
|
|
||||||
self.node.services.p2p.getBlocks(
|
|
||||||
constants.BITCOIN_GENESIS_HASH.regtest,
|
|
||||||
self._bestHeight,
|
|
||||||
function(err, blocks) {
|
|
||||||
|
|
||||||
if(err) {
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
|
|
||||||
blocks.forEach(function(block) {
|
|
||||||
self.pubSocket.send([ 'block', block.toBuffer() ]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
}, 2000);
|
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
callback();
|
callback();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TestBusService.prototype.setupRoutes = function(app) {
|
||||||
|
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
app.get('/mempool', function(req, res) {
|
||||||
|
self.node.services.p2p.getMempool(req.params.filter);
|
||||||
|
res.status(200);
|
||||||
|
});
|
||||||
|
|
||||||
|
app.get('/blocks', function(req, res) {
|
||||||
|
self.node.services.p2p.getBlocks(req.params.filter);
|
||||||
|
res.status(200);
|
||||||
|
});
|
||||||
|
|
||||||
|
app.get('/info', function(req, res) {
|
||||||
|
console.log('test test test test');
|
||||||
|
res.status(200).jsonp({ result: (self._ready && (self._bestHeight >= 0))});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
TestBusService.prototype.getRoutePrefix = function() {
|
||||||
|
return 'test';
|
||||||
|
};
|
||||||
|
|
||||||
TestBusService.prototype.stop = function(callback) {
|
TestBusService.prototype.stop = function(callback) {
|
||||||
callback();
|
callback();
|
||||||
};
|
};
|
||||||
|
|||||||
@ -78,21 +78,23 @@ Utils.prototype.waitForBitcoreNode = function(callback) {
|
|||||||
|
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
var errorFilter = function(err, res) {
|
var errorFilter = self.opts.errorFilter;
|
||||||
try {
|
if (!errorFilter) {
|
||||||
var info = JSON.parse(res);
|
errorFilter = function(err, res) {
|
||||||
if (info.dbheight === self.opts.blockHeight &&
|
try {
|
||||||
info.dbheight === info.bitcoindheight &&
|
var info = JSON.parse(res);
|
||||||
info.bitcoindhash === info.dbhash) {
|
if (info.dbheight === self.opts.blockHeight &&
|
||||||
return;
|
info.dbheight === info.bitcoindheight &&
|
||||||
|
info.bitcoindhash === info.dbhash) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
} catch(e) {
|
||||||
|
return e;
|
||||||
}
|
}
|
||||||
return res;
|
};
|
||||||
} catch(e) {
|
}
|
||||||
return e;
|
var httpOpts = self.getHttpOpts({ path: opts.path || '/info', errorFilter: errorFilter });
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
var httpOpts = self.getHttpOpts({ path: '/info', errorFilter: errorFilter });
|
|
||||||
|
|
||||||
self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback);
|
self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback);
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user