listen on server socket for leech peers.

This commit is contained in:
Christopher Jeffrey 2016-02-08 18:46:09 -08:00
parent 72b7adc800
commit 6cab97b265
2 changed files with 197 additions and 77 deletions

View File

@ -18,17 +18,24 @@ var network = bcoin.protocol.network;
* Peer
*/
function Peer(pool, createConnection, options) {
function Peer(pool, options) {
var self = this;
if (!(this instanceof Peer))
return new Peer(pool, createConnection, options);
return new Peer(pool, options);
EventEmitter.call(this);
this.options = options || {};
if (!options)
options = {};
this.options = options;
this.pool = pool;
this.socket = null;
this.host = null;
this.port = 0;
this._createSocket = this.options.createSocket;
this.priority = this.options.priority;
this.parser = new bcoin.protocol.parser();
this.framer = new bcoin.protocol.framer();
this.chain = this.pool.chain;
@ -39,9 +46,6 @@ function Peer(pool, createConnection, options) {
this.connected = false;
this.ts = this.options.ts || 0;
this.host = null;
this.port = 0;
this.challenge = null;
this.lastPong = 0;
@ -49,7 +53,17 @@ function Peer(pool, createConnection, options) {
this.orphans = 0;
this.orphanTime = 0;
this.socket = createConnection.call(pool, this, options);
if (options.socket) {
this.socket = options.socket;
this.host = this.socket.remoteAddress;
this.port = this.socket.remotePort;
assert(this.host);
assert(this.port != null);
} else if (options.seed) {
options.seed = utils.parseHost(options.seed);
this.socket = this.createSocket(options.seed.port, options.seed.host);
}
if (!this.socket)
throw new Error('No socket');
@ -168,6 +182,36 @@ Peer.prototype._init = function init() {
}));
};
Peer.prototype.createSocket = function createSocket(port, host) {
var net, socket;
assert(port != null);
assert(host);
this.host = host;
this.port = port;
if (this._createSocket) {
socket = this._createSocket(port, host);
} else {
if (!bcoin.net)
throw new Error('Please include a `createSocket` callback.');
socket = bcoin.net.connect(port, host);
}
utils.debug(
'Connecting to %s:%d (priority=%s)',
host, port, this.priority);
socket.on('connect', function() {
utils.debug(
'Connected to %s:%d (priority=%s)',
host, port, this.priority);
});
return socket;
};
Peer.prototype.broadcast = function broadcast(items) {
var self = this;
var result;

View File

@ -54,11 +54,10 @@ function Pool(options) {
this.originalSeeds = (options.seeds || network.seeds).map(utils.parseHost);
this.setSeeds([]);
this.server = null;
this.destroyed = false;
this.size = options.size || 32;
this._createSocket = options.createSocket;
if (!this.options.fullNode) {
if (this.options.headers == null)
this.options.headers = true;
@ -99,9 +98,11 @@ function Pool(options) {
this.peers = {
// Peers that are loading blocks themselves
block: [],
regular: [],
// Peers that are still connecting
pending: [],
// Peers that connected to us
leeches: [],
// Peers that are loading block ids
load: null,
// All peers
@ -234,11 +235,46 @@ Pool.prototype._init = function _init() {
this.emit('full');
utils.debug('Chain is fully synced (height=%d).', this.chain.height());
}
this.startServer();
};
Pool.prototype.startServer = function startServer() {
bcoin.net.createServer(function(socket) {
var self = this;
if (!bcoin.net)
return;
if (!this.options.listen)
return;
assert(!this.server);
this.server = new bcoin.net.Server();
this.server.on('connection', function(socket) {
self._addLeech(socket);
})
this.server.on('listening', function() {
var data = self.server.address();
utils.debug(
'Bitcoin server listening on %s (port=%d)',
data.address, data.port);
});
this.server.listen(network.port, '0.0.0.0');
};
Pool.prototype.stopServer = function stopServer() {
if (!bcoin.net)
return;
if (!this.server)
return;
this.server.close();
delete this.server;
};
Pool.prototype._startTimer = function _startTimer() {
@ -300,38 +336,6 @@ Pool.prototype._stopInterval = function _stopInterval() {
delete this._interval;
};
Pool.prototype.createConnection = function createConnection(peer, options) {
var addr, net, socket;
addr = this.getSeed(options.priority);
assert(addr);
assert(addr.host);
peer.host = addr.host;
peer.port = addr.port;
if (this._createSocket) {
socket = this._createSocket(addr.port, addr.host);
} else {
if (!bcoin.net)
throw new Error('Please include a `createSocket` callback.');
socket = bcoin.net.connect(addr.port, addr.host);
}
utils.debug(
'Connecting to %s:%d (priority=%s)',
addr.host, addr.port, options.priority);
socket.on('connect', function() {
utils.debug(
'Connected to %s:%d (priority=%s)',
addr.host, addr.port, options.priority);
});
return socket;
};
Pool.prototype._addLoader = function _addLoader() {
var self = this;
var peer;
@ -342,7 +346,12 @@ Pool.prototype._addLoader = function _addLoader() {
if (this.peers.load != null)
return;
peer = this._createPeer(true);
peer = this._createPeer({
seed: this.getSeed(true),
priority: true
});
assert(peer);
utils.debug('Added loader peer: %s', peer.host);
@ -464,8 +473,8 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
blockPeer = peer;
// if (this.options.multiplePeers) {
// if (this.peers.block.length) {
// blockPeer = this.peers.block[i % (this.peers.block.length + 1)];
// if (this.peers.regular.length) {
// blockPeer = this.peers.regular[i % (this.peers.regular.length + 1)];
// if (!blockPeer)
// blockPeer = this.peers.load;
// }
@ -720,19 +729,20 @@ Pool.prototype.loadMempool = function loadMempool() {
if (this.peers.load)
this.peers.load.loadMempool();
this.peers.block.forEach(function(peer) {
this.peers.regular.forEach(function(peer) {
peer.loadMempool();
});
};
Pool.prototype._createPeer = function _createPeer(priority, socket) {
Pool.prototype._createPeer = function _createPeer(options) {
var self = this;
var peer = new bcoin.peer(this, this.createConnection, {
startHeight: this.options.startHeight,
var peer = new bcoin.peer(this, {
seed: options.seed,
createSocket: this.options.createSocket,
relay: this.options.relay,
priority: priority,
socket: socket
priority: options.priority,
socket: options.socket
});
peer.on('error', function(err) {
@ -808,30 +818,90 @@ Pool.prototype._handleTX = function _handleTX(tx, peer) {
this.emit('watched', tx, peer);
};
Pool.prototype._addPeer = function _addPeer(socket) {
var self = this;
Pool.prototype._addLeech = function _addLeech(socket) {
var peer;
if (!socket) {
if (this.destroyed)
if (this.destroyed)
return socket.destroy();
peer = this._createPeer({
socket: socket,
priority: false
});
assert(peer);
this.peers.leeches.push(peer);
this.peers.all.push(peer);
peer.once('close', function() {
self._removePeer(peer);
});
peer.once('ack', function() {
var i;
if (self.destroyed)
return;
if (this.peers.block.length + this.peers.pending.length >= this.size)
return;
peer.updateWatch();
if (!this.getSeed()) {
setTimeout(this._addPeer.bind(this), 5000);
return;
}
self.inv.list.forEach(function(entry) {
var result = peer.broadcast(entry.msg);
if (!result)
return;
peer = this._createPeer(false);
} else {
if (this.destroyed)
return socket.destroy();
result[0].once('request', function() {
entry.e.emit('ack', peer);
});
peer = this._createPeer(false, socket);
result[0].once('reject', function(payload) {
entry.e.emit('reject', payload, peer);
});
});
});
peer.on('merkleblock', function(block) {
self._handleBlock(block, peer);
});
peer.on('block', function(block) {
self._handleBlock(block, peer);
});
peer.on('blocks', function(hashes) {
self._handleInv(hashes, peer);
});
utils.nextTick(function() {
self.emit('leech', peer);
});
return peer;
};
Pool.prototype._addPeer = function _addPeer() {
var self = this;
var peer, seed;
if (this.destroyed)
return;
if (this.peers.regular.length + this.peers.pending.length >= this.size)
return;
seed = this.getSeed(false);
if (!seed) {
setTimeout(this._addPeer.bind(this), 5000);
return;
}
peer = this._createPeer({
seed: seed,
priority: false
});
this.peers.pending.push(peer);
this.peers.all.push(peer);
@ -851,7 +921,7 @@ Pool.prototype._addPeer = function _addPeer(socket) {
i = self.peers.pending.indexOf(peer);
if (i !== -1) {
self.peers.pending.splice(i, 1);
self.peers.block.push(peer);
self.peers.regular.push(peer);
}
peer.updateWatch();
@ -914,7 +984,7 @@ Pool.prototype._addTX = function(hash, state) {
};
Pool.prototype.bestPeer = function bestPeer() {
return this.peers.block.reduce(function(best, peer) {
return this.peers.regular.reduce(function(best, peer) {
if (!peer.version || !peer.socket)
return;
@ -930,9 +1000,13 @@ Pool.prototype._removePeer = function _removePeer(peer) {
if (i !== -1)
this.peers.pending.splice(i, 1);
i = this.peers.block.indexOf(peer);
i = this.peers.regular.indexOf(peer);
if (i !== -1)
this.peers.block.splice(i, 1);
this.peers.regular.splice(i, 1);
i = this.peers.leeches.indexOf(peer);
if (i !== -1)
this.peers.leeches.splice(i, 1);
i = this.peers.all.indexOf(peer);
if (i !== -1)
@ -978,8 +1052,8 @@ Pool.prototype.watch = function watch(id) {
if (self.peers.load)
self.peers.load.updateWatch();
for (i = 0; i < self.peers.block.length; i++)
self.peers.block[i].updateWatch();
for (i = 0; i < self.peers.regular.length; i++)
self.peers.regular[i].updateWatch();
});
};
@ -1015,8 +1089,8 @@ Pool.prototype.unwatch = function unwatch(id) {
if (self.peers.load)
self.peers.load.updateWatch();
for (i = 0; i < self.peers.block.length; i++)
self.peers.block[i].updateWatch();
for (i = 0; i < self.peers.regular.length; i++)
self.peers.regular[i].updateWatch();
});
};
@ -1532,7 +1606,7 @@ Pool.prototype.broadcast = function broadcast(msg) {
this.inv.list.push(entry);
this.peers.block.forEach(function(peer) {
this.peers.regular.forEach(function(peer) {
var result = peer.broadcast(msg);
if (!result) return;
result[0].once('request', function() {
@ -1565,9 +1639,11 @@ Pool.prototype.destroy = function destroy() {
peer.destroy();
});
this.peers.block.slice().forEach(function(peer) {
this.peers.regular.slice().forEach(function(peer) {
peer.destroy();
});
this.stopServer();
};
Pool.prototype.getPeer = function getPeer(addr) {