net: handle server differently.

This commit is contained in:
Christopher Jeffrey 2017-01-14 07:54:07 -08:00
parent e8cc724488
commit f0e7aa9d82
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 149 additions and 66 deletions

View File

@ -35,12 +35,7 @@ process.on('uncaughtException', function(err) {
co.spawn(function *() {
yield node.open();
if (options.listen)
yield node.listen();
yield node.connect();
node.startSync();
}).catch(function(err) {
throw err;

View File

@ -294,6 +294,8 @@ Pool.prototype._initOptions = function _initOptions() {
Pool.prototype._init = function _init() {
var self = this;
this._initServer();
this.chain.on('block', function(block, entry) {
self.emit('block', block, entry);
});
@ -349,6 +351,39 @@ Pool.prototype._init = function _init() {
}
};
/**
* Initialize server.
* @private
*/
Pool.prototype._initServer = function _initServer() {
var self = this;
assert(!this.server);
if (!this.createServer)
return;
this.server = this.createServer();
this.server.on('error', function(err) {
self.emit('error', err);
});
this.server.on('connection', function(socket) {
self.handleSocket(socket);
self.emit('connection', socket);
});
this.server.on('listening', function() {
var data = self.server.address();
self.logger.info(
'Pool server listening on %s (port=%d).',
data.address, data.port);
self.emit('listening', data);
});
};
/**
* Open the pool, wait for the chain to load.
* @alias Pool#open
@ -379,35 +414,8 @@ Pool.prototype._open = co(function* _open() {
*/
Pool.prototype._close = co(function* close() {
var i, item, hashes, hash;
this.stopSync();
hashes = this.invMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
item = this.invMap.get(hash);
item.resolve();
}
this.peers.destroy();
this.requestMap.reset();
yield this.disconnect();
this.hosts.reset();
if (this.pendingWatch != null) {
clearTimeout(this.pendingWatch);
this.pendingWatch = null;
}
if (this.pendingRefill != null) {
clearTimeout(this.pendingRefill);
this.pendingRefill = null;
}
yield this.unlisten();
});
/**
@ -437,6 +445,8 @@ Pool.prototype._connect = co(function* connect() {
if (this.connected)
return;
yield this.listen();
if (this.address.isNull()) {
try {
ip = yield this.getIP();
@ -462,32 +472,87 @@ Pool.prototype._connect = co(function* connect() {
});
/**
* Start listening on a server socket.
* Disconnect from the network.
* @returns {Promise}
*/
Pool.prototype.listen = function listen() {
var self = this;
Pool.prototype.disconnect = co(function* disconnect() {
var unlock = yield this.locker.lock();
try {
return yield this._disconnect();
} finally {
unlock();
}
});
if (this.server)
return Promise.resolve();
/**
* Disconnect from the network.
* @returns {Promise}
*/
Pool.prototype._disconnect = co(function* disconnect() {
var i, item, hashes, hash;
assert(this.loaded, 'Pool is not loaded.');
if (!this.connected)
return;
hashes = this.invMap.keys();
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
item = this.invMap.get(hash);
item.resolve();
}
this.peers.destroy();
this.requestMap.reset();
if (this.pendingWatch != null) {
clearTimeout(this.pendingWatch);
this.pendingWatch = null;
}
if (this.pendingRefill != null) {
clearTimeout(this.pendingRefill);
this.pendingRefill = null;
}
yield this.unlisten();
this.syncing = false;
this.connected = false;
});
/**
* Start listening on a server socket.
* @private
* @returns {Promise}
*/
Pool.prototype.listen = co(function* listen() {
if (!this.createServer)
return;
this.server = this.createServer();
assert(this.server);
assert(!this.connected, 'Already listening.');
this.server.on('connection', function(socket) {
self.handleSocket(socket);
});
if (!this.options.listen)
return;
this.server.on('listening', function() {
var data = self.server.address();
self.logger.info(
'Pool server listening on %s (port=%d).',
data.address, data.port);
});
yield this._listen();
});
/**
* Listen on server socket.
* @private
* @returns {Promise}
*/
Pool.prototype._listen = function _listen() {
var self = this;
return new Promise(function(resolve, reject) {
self.server.listen(self.address.port, '0.0.0.0', co.wrap(resolve, reject));
});
@ -495,21 +560,33 @@ Pool.prototype.listen = function listen() {
/**
* Stop listening on server socket.
* @private
* @returns {Promise}
*/
Pool.prototype.unlisten = function unlisten() {
Pool.prototype.unlisten = co(function* unlisten() {
if (!this.createServer)
return;
assert(this.server);
assert(this.connected, 'Not listening.');
if (!this.options.listen)
return;
yield this._unlisten();
});
/**
* Unlisten on server socket.
* @private
* @returns {Promise}
*/
Pool.prototype._unlisten = function unlisten() {
var self = this;
if (util.isBrowser)
return Promise.resolve();
if (!this.server)
return Promise.resolve();
return new Promise(function(resolve, reject) {
self.server.close(co.wrap(resolve, reject));
self.server = null;
});
};

View File

@ -119,6 +119,7 @@ function FullNode(options) {
preferredSeed: this.options.preferredSeed,
ignoreDiscovery: this.options.ignoreDiscovery,
port: this.options.port,
listen: this.options.listen,
spv: false
});
@ -346,15 +347,6 @@ FullNode.prototype.sendTX = co(function* sendTX(tx) {
this.pool.announceTX(tx);
});
/**
* Listen on a server socket on
* the p2p network (accepts leech peers).
*/
FullNode.prototype.listen = function listen() {
return this.pool.listen();
};
/**
* Connect to the network.
* @returns {Promise}
@ -364,6 +356,15 @@ FullNode.prototype.connect = function connect() {
return this.pool.connect();
};
/**
* Disconnect from the network.
* @returns {Promise}
*/
FullNode.prototype.disconnect = function disconnect() {
return this.pool.disconnect();
};
/**
* Start the blockchain sync.
*/

View File

@ -74,6 +74,7 @@ function SPVNode(options) {
ignoreDiscovery: this.options.ignoreDiscovery,
headers: this.options.headers,
selfish: true,
listen: false,
spv: true
});
@ -314,6 +315,15 @@ SPVNode.prototype.connect = function connect() {
return this.pool.connect();
};
/**
* Disconnect from the network.
* @returns {Promise}
*/
SPVNode.prototype.disconnect = function disconnect() {
return this.pool.disconnect();
};
/**
* Start the blockchain sync.
*/