Added official haproxy support/configuration. Optimized/fixed banning.

This commit is contained in:
Matt 2014-04-15 16:37:04 -06:00
parent 42178dd5ad
commit fefa6986ea
4 changed files with 176 additions and 104 deletions

View File

@ -160,8 +160,15 @@ var pool = Stratum.createPool({
due to mining apps using incorrect max diffs and this pool using correct max diffs. */
"shareVariancePercent": 10,
/* If a worker is submitting a good deal of invalid shares we can temporarily ban them to
reduce system/network load. Also useful to fight against flooding attacks. */
/* Enable for client IP addresses to be detected when using a load balancer with TCP proxy
protocol enabled, such as HAProxy with 'send-proxy' param:
http://haproxy.1wt.eu/download/1.5/doc/configuration.txt */
"tcpProxyProtocol": false,
/* If a worker is submitting a high threshold of invalid shares we can temporarily ban their IP
to reduce system/network load. Also useful to fight against flooding attacks. The worker's
If running behind something like HAProxy be sure to enable the TCP Proxy Protocol config,
otherwise you'll end up banning your own IP address (and therefore all workers). */
"banning": {
"enabled": true,
"time": 600, //How many seconds to ban worker for
@ -202,13 +209,13 @@ var pool = Stratum.createPool({
- https://en.bitcoin.it/wiki/Running_bitcoind */
"daemons": [
{ //Main daemon instance
"host": "localhost",
"host": "127.0.0.1",
"port": 19332,
"user": "litecoinrpc",
"password": "testnet"
},
{ //Backup daemon instance
"host": "localhost",
"host": "127.0.0.1",
"port": 19344,
"user": "litecoinrpc",
"password": "testnet"
@ -221,7 +228,7 @@ var pool = Stratum.createPool({
intensive than blocknotify script). However its still under development (not yet working). */
"p2p": {
"enabled": false,
"host": "localhost",
"host": "127.0.0.1",
"port": 19333,
/* Magic value is different for main/testnet and for each coin. It is found in the daemon

View File

@ -47,7 +47,7 @@ function DaemonInterface(options){
function performHttpRequest(instance, jsonData, callback){
var options = {
hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host),
hostname: (typeof(instance.host) === 'undefined' ? '127.0.0.1' : instance.host),
port : instance.port,
method : 'POST',
auth : instance.user + ':' + instance.password,
@ -104,7 +104,7 @@ function DaemonInterface(options){
});
req.end(jsonData);
};
}

View File

@ -62,6 +62,7 @@ var pool = module.exports = function pool(options, authorizeFn){
SetupPeer();
StartStratumServer(function(){
OutputPoolInfo();
_this.emit('started');
});
});
});
@ -361,11 +362,21 @@ var pool = module.exports = function pool(options, authorizeFn){
return r.response.ismine;
});
options.coin.addressByte = util.getVersionByte(options.address);
callback(null, ownedInfo.length > 0 ? ownedInfo[0].response : results[0].response);
});
},
function(addressInfo, callback){
var examplePubKey = new Buffer([options.coin.addressByte, new Buffer('010966776006953D5567439E5E39F86A0D273BEE', 'hex')]);
var dHashed = util.sha256d(examplePubKey);
var binaryAddress =
callback(null, addressInfo);
},
function(addressInfo, callback){
_this.daemon.cmd('getdifficulty', [], function(results){
@ -520,7 +531,7 @@ var pool = module.exports = function pool(options, authorizeFn){
function StartStratumServer(finishedCallback){
_this.stratumServer = new stratum.Server(options.ports, options.connectionTimeout, options.jobRebroadcastTimeout, options.banning, authorizeFn);
_this.stratumServer = new stratum.Server(options, authorizeFn);
_this.stratumServer.on('started', function(){
options.initStats.stratumPorts = Object.keys(options.ports);
@ -533,7 +544,6 @@ var pool = module.exports = function pool(options, authorizeFn){
}
client.on('difficultyChanged', function(diff){
_this.emit('difficultyUpdate', client.workerName, diff);
}).on('subscription', function(params, resultCallback){
@ -569,24 +579,38 @@ var pool = module.exports = function pool(options, authorizeFn){
resultCallback(result.error, result.result ? true : null);
}).on('malformedMessage', function (message) {
emitWarningLog((client.workerName || ('Unauthorized miner [' + client.remoteAddress + ']')) + " has sent us a malformed message: " + message);
emitWarningLog('Malformed message from ' + client.getLabel() + ': ' + message);
}).on('socketError', function(err) {
emitWarningLog(client.workerName + " has somehow had a socket error: " + JSON.stringify(err));
emitWarningLog('Socket error from ' + client.getLabel() + ': ' + JSON.stringify(err));
}).on('socketDisconnect', function(){
var clientTitle = client.workerName ? ("'" + client.workerName + '"') : '(unauthorized)';
emitLog("Client " + clientTitle + ' [' + client.remoteAddress + "] disconnected");
}).on('socketTimeout', function(reason){
emitWarningLog('Connected timed out for ' + client.getLabel() + ': ' + reason)
}).on('socketDisconnect', function() {
//emitLog('Socket disconnected from ' + client.getLabel());
}).on('kickedBannedIP', function(remainingBanTime){
emitLog('Rejected incoming connection from ' + client.remoteAddress + ' banned for ' + remainingBanTime + ' more seconds');
}).on('forgaveBannedIP', function(){
emitLog('Forgave banned IP ' + client.remoteAddress);
}).on('unknownStratumMethod', function(fullMessage) {
emitLog("Client '" + client.workerName + "' has sent us an unknown stratum method: " + fullMessage.method);
emitLog('Unknown stratum method from ' + client.getLabel() + ': ' + fullMessage.method);
}).on('socketFlooded', function(){
emitWarningLog('Detected socket flooding and purged buffer');
}).on('socketFlooded', function() {
emitWarningLog('Detected socket flooding from ' + client.getLabel());
}).on('ban', function(ipAddress){
_this.emit('banIP', ipAddress);
emitWarningLog('banned IP ' + ipAddress);
}).on('tcpProxyError', function(data) {
emitErrorLog('Client IP detection failed, tcpProxyProtocol is enabled yet did not receive proxy protocol message, instead got data: ' + data);
}).on('bootedBannedWorker', function(){
emitWarningLog('Booted worker ' + client.getLabel() + ' who was connected from an IP address that was just banned');
}).on('triggerBan', function(reason){
emitWarningLog('Banned triggered for ' + client.getLabel() + ': ' + reason);
_this.emit('banIP', client.remoteAddress, client.workerName);
});
});
}

View File

@ -28,7 +28,7 @@ var StratumClient = function(options){
//private members
this.socket = options.socket;
this.remoteAddress = options.remoteAddress;
this.remoteAddress = options.socket.remoteAddress;
var banning = options.banning;
@ -38,24 +38,26 @@ var StratumClient = function(options){
this.shares = {valid: 0, invalid: 0};
var considerBan = (!banning || !banning.enabled) ? function(){} : function(shareValid){
var considerBan = (!banning || !banning.enabled) ? function(){ return false } : function(shareValid){
if (shareValid === true) _this.shares.valid++;
else _this.shares.invalid++;
var totalShares = _this.shares.valid + _this.shares.invalid;
if (totalShares >= banning.checkThreshold){
var percentBad = (_this.shares.invalid / totalShares) * 100;
if (percentBad >= banning.invalidPercent){
_this.emit('ban', _this.remoteAddress);
_this.socket.end();
}
else //reset shares
if (percentBad < banning.invalidPercent) //reset shares
this.shares = {valid: 0, invalid: 0};
else {
_this.emit('triggerBan', _this.shares.invalid + ' out of the last ' + totalShares + ' shares were invalid');
_this.socket.destroy();
return true;
}
}
return false;
};
(function init(){
this.init = function init(){
setupSocket();
})();
};
function handleMessage(message){
switch(message.method){
@ -115,10 +117,9 @@ var StratumClient = function(options){
}
function handleAuthorize(message, replyToSocket){
_this.workerIP = options.socket.address().address;
_this.workerName = message.params[0].toLowerCase();
_this.workerName = message.params[0];
_this.workerPass = message.params[1];
options.authorizeFn(_this.workerIP, _this.workerName, _this.workerPass, function(result) {
options.authorizeFn(_this.remoteAddress, _this.workerName, _this.workerPass, function(result) {
_this.authorized = (!result.error && result.authorized);
if (replyToSocket) {
@ -131,7 +132,7 @@ var StratumClient = function(options){
// If the authorizer wants us to close the socket lets do it.
if (result.disconnect === true) {
options.socket.end();
options.socket.destroy();
}
});
}
@ -164,12 +165,13 @@ var StratumClient = function(options){
nonce : message.params[4]
},
function(error, result){
considerBan(result);
sendJson({
id : message.id,
result : result,
error : error
});
if (!considerBan(result)){
sendJson({
id: message.id,
result: result,
error: error
});
}
}
);
@ -187,19 +189,27 @@ var StratumClient = function(options){
var socket = options.socket;
var dataBuffer = '';
socket.setEncoding('utf8');
socket.once('data', function(d){
if (d.indexOf('PROXY') === 0){
_this.remoteAddress = d.split(' ')[2];
console.log('detected proxy source IP address of ' + _this.remoteAddress);
}
if (options.tcpProxyProtocol === true) {
socket.once('data', function (d) {
if (d.indexOf('PROXY') === 0) {
_this.remoteAddress = d.split(' ')[2];
}
else{
_this.emit('tcpProxyError', d);
}
_this.emit('checkBan');
});
}
else{
_this.emit('checkBan');
});
}
socket.on('data', function(d){
dataBuffer += d;
if (Buffer.byteLength(dataBuffer, 'utf8') > 1024){
dataBuffer = '';
_this.emit('socketFlooded');
socket.end();
socket.destroy();
return;
}
if (dataBuffer.slice(-1) === '\n'){
@ -210,9 +220,9 @@ var StratumClient = function(options){
try {
messageJson = JSON.parse(message);
} catch(e) {
if (d.indexOf('PROXY') !== 0){
if (options.tcpProxyProtocol !== true || d.indexOf('PROXY') !== 0){
_this.emit('malformedMessage', message);
socket.end();
socket.destroy();
}
return;
}
@ -224,17 +234,20 @@ var StratumClient = function(options){
dataBuffer = '';
}
});
socket.on('end', function() {
_this.emit('socketDisconnect')
socket.on('close', function() {
_this.emit('socketDisconnect');
});
socket.on('error', function(err){
if (err.code === 'ECONNRESET')
_this.emit('socketDisconnect');
else
if (err.code !== 'ECONNRESET')
_this.emit('socketError', err);
});
}
this.getLabel = function(){
return (_this.workerName || '(unauthorized)') + ' [' + _this.remoteAddress + ']';
};
this.enqueueNextDifficulty = function(requestedNewDifficulty) {
pendingDifficulty = requestedNewDifficulty;
return true;
@ -262,8 +275,10 @@ var StratumClient = function(options){
this.sendMiningJob = function(jobParams){
if (Date.now() - _this.lastActivity > options.socketTimeout){
_this.socket.end();
var lastActivityAgo = Date.now() - _this.lastActivity;
if (lastActivityAgo > options.connectionTimeout * 1000){
_this.emit('socketTimeout', 'last submitted a share was ' + (lastActivityAgo / 1000 | 0) + ' seconds ago');
_this.socket.destroy();
return;
}
@ -304,12 +319,13 @@ StratumClient.prototype.__proto__ = events.EventEmitter.prototype;
* - 'client.disconnected'(StratumClientInstance) - when a miner disconnects. Be aware that the socket cannot be used anymore.
* - 'started' - when the server is up and running
**/
var StratumServer = exports.Server = function StratumServer(ports, connectionTimeout, jobRebroadcastTimeout, banning, authorizeFn){
var StratumServer = exports.Server = function StratumServer(options, authorizeFn){
//private members
var socketTimeout = connectionTimeout * 1000;
var bannedMS = banning ? banning.time * 1000 : null;
//ports, connectionTimeout, jobRebroadcastTimeout, banning, haproxy, authorizeFn
var bannedMS = options.banning ? options.banning.time * 1000 : null;
var _this = this;
var stratumClients = {};
@ -317,27 +333,22 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
var rebroadcastTimeout;
var bannedIPs = {};
//Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak
var purgeOldBans = (!banning || !banning.enabled) ? null : setInterval(function(){
for (ip in bannedIPs){
var banTime = bannedIPs[ip];
if (Date.now() - banTime > banning.time)
delete bannedIPs[ip];
}
}, 1000 * banning.purgeInterval);
var checkBan = function(client){
if (banning && banning.enabled && client.remoteAddress in bannedIPs){
function checkBan(client){
if (options.banning && options.banning.enabled && client.remoteAddress in bannedIPs){
var bannedTime = bannedIPs[client.remoteAddress];
if ((Date.now() - bannedTime) < bannedMS){
client.socket.end();
return null;
var bannedTimeAgo = Date.now() - bannedTime;
var timeLeft = bannedMS - bannedTimeAgo;
if (timeLeft > 0){
client.socket.destroy();
client.emit('kickedBannedIP', timeLeft / 1000 | 0);
}
else {
delete bannedIPs[client.remoteAddress];
client.emit('forgaveBannedIP');
}
}
};
}
this.handleNewClient = function (socket){
@ -346,11 +357,11 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
var client = new StratumClient(
{
subscriptionId: subscriptionId,
socket: socket,
authorizeFn: authorizeFn,
banning: banning,
socketTimeout: socketTimeout,
remoteAddress: socket.remoteAddress
socket: socket,
banning: options.banning,
connectionTimeout: options.connectionTimeout,
tcpProxyProtocol: options.tcpProxyProtocol
}
);
@ -359,55 +370,85 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
client.on('socketDisconnect', function() {
_this.removeStratumClientBySubId(subscriptionId);
_this.emit('client.disconnected', client);
}).on('ban', function(ipAddress){
_this.banIP(ipAddress);
}).on('checkBan', function(){
checkBan(client);
});
}).on('triggerBan', function(){
_this.addBannedIP(client.remoteAddress);
}).init();
return subscriptionId;
};
function SetupBroadcasting(){
var broadcastJobs = function(jobParams){
for (var clientId in stratumClients) {
var client = stratumClients[clientId];
client.sendMiningJob(jobParams);
}
};
if (isNaN(options.jobRebroadcastTimeout) || options.jobRebroadcastTimeout <= 0){
_this.broadcastMiningJobs = broadcastJobs;
}
else{
/* Some miners will consider the pool dead if it doesn't receive a job for around a minute.
So every time we broadcast jobs, set a timeout to rebroadcast in X seconds unless cleared. */
_this.broadcastMiningJobs = function(jobParams){
broadcastJobs(jobParams);
clearTimeout(rebroadcastTimeout);
rebroadcastTimeout = setTimeout(function(){
var resendParams = jobParams;
resendParams[8] = false;
_this.broadcastMiningJobs(resendParams);
}, options.jobRebroadcastTimeout * 1000);
}
}
}
(function init(){
//Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak
if (options.banning && options.banning.enabled){
setInterval(function(){
for (ip in bannedIPs){
var banTime = bannedIPs[ip];
if (Date.now() - banTime > options.banning.time)
delete bannedIPs[ip];
}
}, 1000 * options.banning.purgeInterval);
}
SetupBroadcasting();
var serversStarted = 0;
Object.keys(ports).forEach(function(port){
Object.keys(options.ports).forEach(function(port){
net.createServer({allowHalfOpen: false}, function(socket) {
_this.handleNewClient(socket);
}).listen(parseInt(port), function() {
serversStarted++;
if (serversStarted == Object.keys(ports).length)
if (serversStarted == Object.keys(options.ports).length)
_this.emit('started');
});
});
})();
//public members
this.banIP = function(ipAddress){
this.addBannedIP = function(ipAddress){
bannedIPs[ipAddress] = Date.now();
/*for (var c in stratumClients){
var client = stratumClients[c];
if (client.remoteAddress === ipAddress){
_this.emit('bootedBannedWorker');
}
}*/
};
this.broadcastMiningJobs = function(jobParams) {
for (var clientId in stratumClients) {
// if a client gets disconnected WHILE doing this loop a crash might happen.
// 'm not sure if that can ever happen but an if here doesn't hurt!
var client = stratumClients[clientId];
if (typeof(client) !== 'undefined') {
client.sendMiningJob(jobParams);
}
}
/* Some miners will consider the pool dead if it doesn't receive a job for around a minute.
So every time we broadcast jobs, set a timeout to rebroadcast in X seconds unless cleared. */
if (isNaN(jobRebroadcastTimeout) || jobRebroadcastTimeout <= 0) return;
clearTimeout(rebroadcastTimeout);
rebroadcastTimeout = setTimeout(function(){
var resendParams = jobParams;
resendParams[8] = false;
_this.broadcastMiningJobs(resendParams);
}, jobRebroadcastTimeout * 1000);
};
this.getStratumClients = function () {
return stratumClients;