This commit is contained in:
Chris Kleeschulte 2017-05-30 15:42:01 -04:00
parent 87df236e35
commit 58223948ac
2 changed files with 127 additions and 10 deletions

View File

@ -27,12 +27,15 @@ var P2P = function(options) {
BaseService.call(this, options);
this.options = options;
this._maxPeers = this.options.maxPeers || 60;
this._minPeers = this.options.minPeers || 1;
this._peers = this.options.peers;
this.subscriptions = {};
this.subscriptions.block = [];
this.subscriptions.transaction = [];
this.messages = new p2p.Messages({ network: this.node.network });
this._peerHeights = [];
this._peers = [];
this._peerIndex = 0;
};
util.inherits(P2P, BaseService);
@ -86,6 +89,31 @@ P2P.prototype._initPool = function() {
this._pool = new p2p.Pool(opts);
};
P2P.prototype._addPeer = function(peer) {
this._peers.push(peer);
};
P2P.prototype._removePeer = function(peer) {
this._peer.splice(peer, 1);
};
P2P.prototype._getPeer = function() {
// TODO: this logic could get complicated depending on the state of the pool
if (this._peers.length === 0) {
return;
}
var index = this._peerIndex++ % this._peers.length;
return this._peers[index];
};
P2P.prototype._getBestHeight = function(peer) {
this._peerHeights.push(peer.bestHeight);
if (this._peerHeights >= this._minPeers) {
return Math.max(this._bestHeights);
}
};
P2P.prototype._setupListeners = function() {
var self = this;
@ -95,11 +123,17 @@ P2P.prototype._setupListeners = function() {
peer.network.alias + ', version: ' + peer.version + ', subversion: ' +
peer.subversion + ', status: ' + peer.status + ', port: ' +
peer.port + ', best height: ' + peer.bestHeight);
peer.sendMessage(self.messages.MemPool());
self._addPeer(peer);
var bestHeight = self._getBestHeight(peer);
if (bestHeight >= 0) {
self.emit('bestHeight', bestHeight);
}
});
self._pool.on('peerdisconnect', function(peer, addr) {
self._removePeer(peer);
log.info('Disconnected from peer: ' + addr.ip.v4);
});
@ -115,7 +149,6 @@ P2P.prototype._setupListeners = function() {
self._inv.set(inv.hash, true);
//self._generateRetrievalMessage()
newDataNeeded.push(inv);
}
@ -132,9 +165,7 @@ P2P.prototype._setupListeners = function() {
self._pool.on('peerblock', self._broadcastBlock.bind(self));
self.node.on('ready', function() {
setTimeout(function() {
self._pool.connect();
}, 1000);
self._pool.connect();
});
};
@ -152,7 +183,12 @@ P2P.prototype._broadcastBlock = function(peer, message) {
};
P2P.prototype.getAPIMethods = function() {
return [];
var methods = [
['createHeaderStream', this, this.getHeaders, 2],
['createMempoolStream', this, this.getMempool, 0],
['createBlockStream', this, this.getBlocks, 2]
];
return methods;
};
P2P.prototype.getPublishEvents = function() {
@ -168,8 +204,54 @@ P2P.prototype.getPublishEvents = function() {
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
},
}
];
};
P2P.prototype.createHeaderStream = function(startBlockHash, endBlockHash) {
var self = this;
if (!endBlockHash) {
endBlockHash = startBlockHash;
}
var message = self.messages.GetHeaders(startBlockHash, endBlockHash);
var peer = self._getPeer();
peer.on('peerheaders', function(peer, message) {
self.emit('headers', message.headers);
});
peer.sendMessage(message);
return self;
};
P2P.prototype.createMempoolStream = function() {
var self = this;
var peer = self._getPeer();
peer.sendMessage(self.messages.MemPool());
peer.on('inv', function(message) {
});
peer.on('tx', function(message) {
});
};
P2P.prototype.createBlockStream = function(startBlockHash, endBlockHash) {
if (!endBlockHash) {
endBlockHash = startBlockHash;
}
var message = this.messages.GetBlocks(startBlockHash, endBlockHash);
var peer = this._getPeer();
peer.sendMessage(message);
};
module.exports = P2P;

View File

@ -26,16 +26,51 @@ TestBusService.prototype.start = function(callback) {
self.bus = self.node.openBus({ remoteAddress: 'localhost' });
self.bus.on('p2p/transaction', function(tx) {
self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]);
var self = this;
self._cache.transaction.push(tx);
if (self._ready) {
for(var i = 0; i < self._cache.transaction.length; i++) {
var transaction = self._cache.transaction.unshift();
self.pubSocket.send([ 'transaction', new Buffer(transaction.uncheckedSerialize(), 'hex') ]);
}
return;
}
});
self.bus.on('p2p/block', function(block) {
self.pubSocket.send([ 'block', block.toBuffer() ]);
var self = this;
self._cache.block.push(block);
if (self._ready) {
for(var i = 0; i < self._cache.block.length; i++) {
var blk = self._cache.block.unshift();
self.pubSocket.send([ 'block', blk.toBuffer() ]);
}
return;
}
});
self.bus.on('p2p/headers', function(headers) {
var self = this;
self._cache.headers.push(headers);
if (self._ready) {
for(var i = 0; i < self._cache.headers.length; i++) {
var hdrs = self._cache.headers.unshift();
self.pubSocket.send([ 'headers', hdrs ]);
}
return;
}
});
self.bus.subscribe('p2p/transaction');
self.bus.subscribe('p2p/block');
self.node.on('ready', function() {
//we could be getting events right away, so until our subscribers get connected, we need to cache
self._cache = { transaction: [], block: [], headers: [] };
setTimeout(function() {
self._ready = true;
}, 2000);
});
callback();
};