wip
This commit is contained in:
parent
019851e48f
commit
76be902463
@ -12,11 +12,11 @@ var constants = require('../../constants');
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
Purpose:
|
Purpose:
|
||||||
1. join a P2P
|
1. join a P2P
|
||||||
2. relay messages
|
2. relay messages
|
||||||
3. publish new transactions (pub/sub)
|
3. publish new transactions (pub/sub)
|
||||||
4. publish new blocks (pub/sub)
|
4. publish new blocks (pub/sub)
|
||||||
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
|
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
|
||||||
*/
|
*/
|
||||||
var P2P = function(options) {
|
var P2P = function(options) {
|
||||||
|
|
||||||
@ -61,6 +61,41 @@ P2P.prototype.stop = function(callback) {
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
P2P.prototype.getPublishEvents = function() {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
name: 'p2p/transaction',
|
||||||
|
scope: this,
|
||||||
|
subscribe: this.subscribe.bind(this, 'transaction'),
|
||||||
|
unsubscribe: this.unsubscribe.bind(this, 'transaction')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'p2p/mempool',
|
||||||
|
scope: this,
|
||||||
|
subscribe: this.subscribe.bind(this, 'mempool'),
|
||||||
|
unsubscribe: this.unsubscribe.bind(this, 'mempool')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'p2p/block',
|
||||||
|
scope: this,
|
||||||
|
subscribe: this.subscribe.bind(this, 'block'),
|
||||||
|
unsubscribe: this.unsubscribe.bind(this, 'block')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'p2p/historicalBlock',
|
||||||
|
scope: this,
|
||||||
|
subscribe: this.subscribe.bind(this, 'historicalBlock'),
|
||||||
|
unsubscribe: this.unsubscribe.bind(this, 'block')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'p2p/header',
|
||||||
|
scope: this,
|
||||||
|
subscribe: this.subscribe.bind(this, 'header'),
|
||||||
|
unsubscribe: this.unsubscribe.bind(this, 'header')
|
||||||
|
}
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
P2P.prototype.subscribe = function(name, emitter) {
|
P2P.prototype.subscribe = function(name, emitter) {
|
||||||
this.subscriptions[name].push(emitter);
|
this.subscriptions[name].push(emitter);
|
||||||
log.info(emitter.remoteAddress, 'subscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length);
|
log.info(emitter.remoteAddress, 'subscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length);
|
||||||
@ -112,226 +147,146 @@ P2P.prototype._getBestHeight = function(peer) {
|
|||||||
this._peerHeights.push(peer.bestHeight);
|
this._peerHeights.push(peer.bestHeight);
|
||||||
|
|
||||||
if (this._peerHeights.length >= this._minPeers) {
|
if (this._peerHeights.length >= this._minPeers) {
|
||||||
// spread operator '...', fancy way of converting an array arg to discrete args
|
|
||||||
return Math.max(...this._peerHeights);
|
return Math.max(...this._peerHeights);
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._setFilterScalar = function(peer, scalar) {
|
P2P.prototype._onPeerReady = function(peer, addr) {
|
||||||
|
|
||||||
if (!this._filters[peer]) {
|
log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' +
|
||||||
return;
|
peer.network.alias + ', version: ' + peer.version + ', subversion: ' +
|
||||||
|
peer.subversion + ', status: ' + peer.status + ', port: ' +
|
||||||
|
peer.port + ', best height: ' + peer.bestHeight);
|
||||||
|
|
||||||
|
self._addPeer(peer);
|
||||||
|
var bestHeight = self._getBestHeight(peer);
|
||||||
|
|
||||||
|
if (bestHeight >= 0) {
|
||||||
|
self.emit('bestHeight', bestHeight);
|
||||||
}
|
}
|
||||||
|
|
||||||
this._filters[peer] = scalar;
|
};
|
||||||
|
|
||||||
|
P2P.prototype._onPeerDisconnect = function(peer, addr) {
|
||||||
|
|
||||||
|
self._removePeer(peer);
|
||||||
|
log.info('Disconnected from peer: ' + addr.ip.v4);
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
P2P.prototype._onPeerInventory = function(peer, message) {
|
||||||
|
|
||||||
|
var newDataNeeded = [];
|
||||||
|
|
||||||
|
self._setFilterScalar(peer, message.inventory.length);
|
||||||
|
|
||||||
|
message.inventory.forEach(function(inv) {
|
||||||
|
|
||||||
|
if (!self._inv.get(inv.hash)) {
|
||||||
|
|
||||||
|
self._inv.set(inv.hash, true);
|
||||||
|
|
||||||
|
newDataNeeded.push(inv);
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (newDataNeeded.length > 0) {
|
||||||
|
peer.sendMessage(self.messages.GetData(newDataNeeded));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
P2P.prototype._onPeerTx = function(peer, message) {
|
||||||
|
};
|
||||||
|
|
||||||
|
p2p.prototype._onPeerBlock = function(peer, message) {
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._setupListeners = function() {
|
P2P.prototype._setupListeners = function() {
|
||||||
|
|
||||||
var self = this;
|
var self = this;
|
||||||
|
self._pool.on('peerready', self._onPeerReady.bind(self));
|
||||||
self._pool.on('peerready', function(peer, addr) {
|
self._pool.on('peerdisconnect', self._onPeerDisconnect.bind(self));
|
||||||
|
self._pool.on('peerinv', self._onPeerInventory.bind(self));
|
||||||
log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' +
|
self._pool.on('peertx', self._onPeerTx.bind(self));
|
||||||
peer.network.alias + ', version: ' + peer.version + ', subversion: ' +
|
self._pool.on('peerblock', self._onPeerBlock.bind(self));
|
||||||
peer.subversion + ', status: ' + peer.status + ', port: ' +
|
|
||||||
peer.port + ', best height: ' + peer.bestHeight);
|
|
||||||
|
|
||||||
self._addPeer(peer);
|
|
||||||
var bestHeight = self._getBestHeight(peer);
|
|
||||||
|
|
||||||
if (bestHeight >= 0) {
|
|
||||||
self.emit('bestHeight', bestHeight);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
self._pool.on('peerdisconnect', function(peer, addr) {
|
|
||||||
|
|
||||||
self._removePeer(peer);
|
|
||||||
log.info('Disconnected from peer: ' + addr.ip.v4);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
self._pool.on('peerinv', function(peer, message) {
|
|
||||||
|
|
||||||
var newDataNeeded = [];
|
|
||||||
|
|
||||||
// The bitcoin p2p network can send unsolicited inventory messages for
|
|
||||||
// entities such as transactions and blocks. This behavior is configurable
|
|
||||||
// via the 'relay' field in the version message that we send when making
|
|
||||||
// outbound connections. The relay option is important to keep enabled so
|
|
||||||
// that we can reduce the chatter on the network, overall.
|
|
||||||
// The problem is that when we do ask for inventory synchronuously,
|
|
||||||
// such as when our indexes are syncing or catching up, we can't tell the
|
|
||||||
// the difference between unsolicited new messages and actual responses.
|
|
||||||
// There does not seem to be identifying information to link requests and
|
|
||||||
// responses.
|
|
||||||
|
|
||||||
// What we will do is set a filter scalar on the peer being queried. We
|
|
||||||
// will have to assume that the next message from that peer will be our
|
|
||||||
// response. The length of the inventory vector will then become our filter
|
|
||||||
// scalar. As we respond to the inbound inventory (getdata), we will decrement
|
|
||||||
// this filter scalar. This is how we will match request anf response.
|
|
||||||
self._setFilterScalar(peer, message.inventory.length);
|
|
||||||
|
|
||||||
message.inventory.forEach(function(inv) {
|
|
||||||
|
|
||||||
if (!self._inv.get(inv.hash)) {
|
|
||||||
|
|
||||||
self._inv.set(inv.hash, true);
|
|
||||||
|
|
||||||
newDataNeeded.push(inv);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
if (newDataNeeded.length > 0) {
|
|
||||||
peer.sendMessage(self.messages.GetData(newDataNeeded));
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
self._pool.on('peertx', self._filterTx.bind(self));
|
|
||||||
|
|
||||||
self._pool.on('peerblock', self._broadcastBlock.bind(self));
|
|
||||||
|
|
||||||
self.node.on('ready', function() {
|
self.node.on('ready', function() {
|
||||||
self._pool.connect();
|
self._pool.connect();
|
||||||
});
|
});
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype._filterTx = function(peer, message) {
|
P2P.prototype._broadcast = function(subscribers, name, entity) {
|
||||||
var self = this;
|
for (var i = 0; i < subscribers.length; i++) {
|
||||||
|
subscribers[i].emit(name, entity);
|
||||||
// if this tx matches any of this peer's filters, then emit the tx internally only
|
|
||||||
// and not to the external bus
|
|
||||||
var filterExists = self._filters[peer];
|
|
||||||
|
|
||||||
if (!filterExists) {
|
|
||||||
self._broadcastTx(peer, message);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.emit('tx', message.transaction);
|
|
||||||
|
|
||||||
if (--self._filters[peer] === 0) {
|
|
||||||
self._filters[peer] = false;
|
|
||||||
self.emit('end');
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
P2P.prototype._broadcastTx = function(peer, message) {
|
|
||||||
for (var i = 0; i < this.subscriptions.transaction.length; i++) {
|
|
||||||
this.subscriptions.transaction[i].emit('p2p/transaction', message.transaction);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
P2P.prototype._broadcastBlock = function(peer, message) {
|
|
||||||
for (var i = 0; i < this.subscriptions.block.length; i++) {
|
|
||||||
this.subscriptions.block[i].emit('p2p/block', message.block);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getAPIMethods = function() {
|
P2P.prototype.getAPIMethods = function() {
|
||||||
var methods = [
|
var methods = [
|
||||||
['createHeaderStream', this, this.getHeaders, 2],
|
['getHeaders', this, this.getHeaders, 2],
|
||||||
['getMempool', this, this.getMempool, 1],
|
['getMempool', this, this.getMempool, 1],
|
||||||
['getBlocks', this, this.getBlocks, 2]
|
['getBlocks', this, this.getBlocks, 2]
|
||||||
];
|
];
|
||||||
return methods;
|
return methods;
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getPublishEvents = function() {
|
P2P.prototype._getResourceFilter = function(filter, resource) {
|
||||||
return [
|
|
||||||
{
|
// this function knows about mempool, block and header filters
|
||||||
name: 'p2p/transaction',
|
// mempool filters are considered after the tx is delivered to us
|
||||||
scope: this,
|
// because we can't match a tx to the query params.
|
||||||
subscribe: this.subscribe.bind(this, 'transaction'),
|
if (resource === 'mempool') {
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'transaction')
|
if (resource instanceof LRU) {
|
||||||
},
|
return filter;
|
||||||
{
|
|
||||||
name: 'p2p/block',
|
|
||||||
scope: this,
|
|
||||||
subscribe: this.subscribe.bind(this, 'block'),
|
|
||||||
unsubscribe: this.unsubscribe.bind(this, 'block')
|
|
||||||
}
|
}
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
P2P.prototype.createHeaderStream = function(startBlockHash, endBlockHash) {
|
|
||||||
|
|
||||||
var self = this;
|
|
||||||
if (!endBlockHash) {
|
|
||||||
endBlockHash = startBlockHash;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var message = self.messages.GetHeaders(startBlockHash, endBlockHash);
|
|
||||||
var peer = self._getPeer();
|
|
||||||
|
|
||||||
peer.on('peerheaders', function(peer, message) {
|
|
||||||
self.emit('headers', message.headers);
|
|
||||||
});
|
|
||||||
|
|
||||||
peer.sendMessage(message);
|
|
||||||
return self;
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getMempool = function(callback) {
|
P2P.prototype._getResourceMessage = function(filter, resource) {
|
||||||
|
|
||||||
var self = this;
|
// filter should be a list of block hashes representing the
|
||||||
var peer = self._getPeer();
|
// resource that are -not- needed, all headers outside this list will be
|
||||||
|
// broadcast on the p2p/headers bus, meaning all subscribers to
|
||||||
|
// this event will get the results whether they asked for them
|
||||||
|
// or not.
|
||||||
|
|
||||||
if (!peer) {
|
// _getPeer can throw
|
||||||
return callback(new Error('Could not get a peer to retrieve mempool.'));
|
var peer = this._getPeer();
|
||||||
}
|
|
||||||
|
|
||||||
self._filters[peer] = true;
|
return this._getResourceFilter(filter, resource);
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
P2P.prototype.getHeaders = function(filter) {
|
||||||
|
|
||||||
|
var headerFilter = this._getResourceMessage(filter, 'headers');
|
||||||
|
peer.sendMessage(this.messages.GetHeader(headerFilter));
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
P2P.prototype.getMempool = function(filter) {
|
||||||
|
|
||||||
|
// mempools can grow quite large, especially if subscribers are liberally accepting
|
||||||
|
// all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can
|
||||||
|
// grow quite large over time. Care should be taken to limit the size of this filter.
|
||||||
|
// Even still, when a tx is broadcasted, the reference to it is dropped from the filter.
|
||||||
|
this._mempoolFilter = this._getResourceMessage(filter, 'mempool');
|
||||||
|
|
||||||
peer.sendMessage(self.messages.MemPool());
|
peer.sendMessage(self.messages.MemPool());
|
||||||
var mempool = [];
|
|
||||||
|
|
||||||
self.on('tx', function(tx) {
|
|
||||||
mempool.push(tx);
|
|
||||||
});
|
|
||||||
|
|
||||||
self.on('end', function() {
|
|
||||||
callback(null, mempool);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
P2P.prototype.getBlocks = function(startBlockHash, endBlockHash) {
|
P2P.prototype.getBlocks = function(filter) {
|
||||||
|
|
||||||
var self = this;
|
// it is on the caller to work out what block hashes are needed from the network,
|
||||||
if (!endBlockHash) {
|
// so a list of block hashes should be given. If this is an initial sync from the
|
||||||
endBlockHash = startBlockHash;
|
// genesis block, then this list will be quite large (32 bytes * current block height).
|
||||||
}
|
var blockFilter = this._getResourceMessage(filter, 'blocks');
|
||||||
|
peer.sendMessage(this.messages.GetBlock(blockFilter));
|
||||||
var peer = self._getPeer();
|
|
||||||
|
|
||||||
if (!peer) {
|
|
||||||
return callback(new Error('Could not get a peer to retrieve blocks.'));
|
|
||||||
}
|
|
||||||
|
|
||||||
self._filters[peer] = true;
|
|
||||||
|
|
||||||
var message = this.messages.GetBlocks({ starts: [startBlockHash] });
|
|
||||||
peer.sendMessage(message);
|
|
||||||
var blocks = [];
|
|
||||||
|
|
||||||
self.on('block', function(block) {
|
|
||||||
blocks.push(block);
|
|
||||||
});
|
|
||||||
|
|
||||||
self.on('end', function() {
|
|
||||||
callback(null, blocks);
|
|
||||||
});
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user