diff --git a/README.md b/README.md index 86e4e6d..1d05bc6 100644 --- a/README.md +++ b/README.md @@ -160,8 +160,15 @@ var pool = Stratum.createPool({ due to mining apps using incorrect max diffs and this pool using correct max diffs. */ "shareVariancePercent": 10, - /* If a worker is submitting a good deal of invalid shares we can temporarily ban them to - reduce system/network load. Also useful to fight against flooding attacks. */ + /* Enable for client IP addresses to be detected when using a load balancer with TCP proxy + protocol enabled, such as HAProxy with 'send-proxy' param: + http://haproxy.1wt.eu/download/1.5/doc/configuration.txt */ + "tcpProxyProtocol": false, + + /* If a worker is submitting a high threshold of invalid shares we can temporarily ban their IP + to reduce system/network load. Also useful to fight against flooding attacks. The worker's + If running behind something like HAProxy be sure to enable the TCP Proxy Protocol config, + otherwise you'll end up banning your own IP address (and therefore all workers). */ "banning": { "enabled": true, "time": 600, //How many seconds to ban worker for @@ -202,13 +209,13 @@ var pool = Stratum.createPool({ - https://en.bitcoin.it/wiki/Running_bitcoind */ "daemons": [ { //Main daemon instance - "host": "localhost", + "host": "127.0.0.1", "port": 19332, "user": "litecoinrpc", "password": "testnet" }, { //Backup daemon instance - "host": "localhost", + "host": "127.0.0.1", "port": 19344, "user": "litecoinrpc", "password": "testnet" @@ -221,7 +228,7 @@ var pool = Stratum.createPool({ intensive than blocknotify script). However its still under development (not yet working). */ "p2p": { "enabled": false, - "host": "localhost", + "host": "127.0.0.1", "port": 19333, /* Magic value is different for main/testnet and for each coin. It is found in the daemon diff --git a/lib/daemon.js b/lib/daemon.js index 01f20f3..24ef5b3 100644 --- a/lib/daemon.js +++ b/lib/daemon.js @@ -47,7 +47,7 @@ function DaemonInterface(options){ function performHttpRequest(instance, jsonData, callback){ var options = { - hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host), + hostname: (typeof(instance.host) === 'undefined' ? '127.0.0.1' : instance.host), port : instance.port, method : 'POST', auth : instance.user + ':' + instance.password, @@ -104,7 +104,7 @@ function DaemonInterface(options){ }); req.end(jsonData); - }; + } diff --git a/lib/pool.js b/lib/pool.js index ca3398e..29f0c86 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -62,6 +62,7 @@ var pool = module.exports = function pool(options, authorizeFn){ SetupPeer(); StartStratumServer(function(){ OutputPoolInfo(); + _this.emit('started'); }); }); }); @@ -361,11 +362,21 @@ var pool = module.exports = function pool(options, authorizeFn){ return r.response.ismine; }); + options.coin.addressByte = util.getVersionByte(options.address); + callback(null, ownedInfo.length > 0 ? ownedInfo[0].response : results[0].response); }); }, + function(addressInfo, callback){ + var examplePubKey = new Buffer([options.coin.addressByte, new Buffer('010966776006953D5567439E5E39F86A0D273BEE', 'hex')]); + var dHashed = util.sha256d(examplePubKey); + var binaryAddress = + + callback(null, addressInfo); + }, + function(addressInfo, callback){ _this.daemon.cmd('getdifficulty', [], function(results){ @@ -520,7 +531,7 @@ var pool = module.exports = function pool(options, authorizeFn){ function StartStratumServer(finishedCallback){ - _this.stratumServer = new stratum.Server(options.ports, options.connectionTimeout, options.jobRebroadcastTimeout, options.banning, authorizeFn); + _this.stratumServer = new stratum.Server(options, authorizeFn); _this.stratumServer.on('started', function(){ options.initStats.stratumPorts = Object.keys(options.ports); @@ -533,7 +544,6 @@ var pool = module.exports = function pool(options, authorizeFn){ } client.on('difficultyChanged', function(diff){ - _this.emit('difficultyUpdate', client.workerName, diff); }).on('subscription', function(params, resultCallback){ @@ -569,24 +579,38 @@ var pool = module.exports = function pool(options, authorizeFn){ resultCallback(result.error, result.result ? true : null); }).on('malformedMessage', function (message) { - emitWarningLog((client.workerName || ('Unauthorized miner [' + client.remoteAddress + ']')) + " has sent us a malformed message: " + message); + emitWarningLog('Malformed message from ' + client.getLabel() + ': ' + message); }).on('socketError', function(err) { - emitWarningLog(client.workerName + " has somehow had a socket error: " + JSON.stringify(err)); + emitWarningLog('Socket error from ' + client.getLabel() + ': ' + JSON.stringify(err)); - }).on('socketDisconnect', function(){ - var clientTitle = client.workerName ? ("'" + client.workerName + '"') : '(unauthorized)'; - emitLog("Client " + clientTitle + ' [' + client.remoteAddress + "] disconnected"); + }).on('socketTimeout', function(reason){ + emitWarningLog('Connected timed out for ' + client.getLabel() + ': ' + reason) + + }).on('socketDisconnect', function() { + //emitLog('Socket disconnected from ' + client.getLabel()); + + }).on('kickedBannedIP', function(remainingBanTime){ + emitLog('Rejected incoming connection from ' + client.remoteAddress + ' banned for ' + remainingBanTime + ' more seconds'); + + }).on('forgaveBannedIP', function(){ + emitLog('Forgave banned IP ' + client.remoteAddress); }).on('unknownStratumMethod', function(fullMessage) { - emitLog("Client '" + client.workerName + "' has sent us an unknown stratum method: " + fullMessage.method); + emitLog('Unknown stratum method from ' + client.getLabel() + ': ' + fullMessage.method); - }).on('socketFlooded', function(){ - emitWarningLog('Detected socket flooding and purged buffer'); + }).on('socketFlooded', function() { + emitWarningLog('Detected socket flooding from ' + client.getLabel()); - }).on('ban', function(ipAddress){ - _this.emit('banIP', ipAddress); - emitWarningLog('banned IP ' + ipAddress); + }).on('tcpProxyError', function(data) { + emitErrorLog('Client IP detection failed, tcpProxyProtocol is enabled yet did not receive proxy protocol message, instead got data: ' + data); + + }).on('bootedBannedWorker', function(){ + emitWarningLog('Booted worker ' + client.getLabel() + ' who was connected from an IP address that was just banned'); + + }).on('triggerBan', function(reason){ + emitWarningLog('Banned triggered for ' + client.getLabel() + ': ' + reason); + _this.emit('banIP', client.remoteAddress, client.workerName); }); }); } diff --git a/lib/stratum.js b/lib/stratum.js index e7d0f8d..c197436 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -28,7 +28,7 @@ var StratumClient = function(options){ //private members this.socket = options.socket; - this.remoteAddress = options.remoteAddress; + this.remoteAddress = options.socket.remoteAddress; var banning = options.banning; @@ -38,24 +38,26 @@ var StratumClient = function(options){ this.shares = {valid: 0, invalid: 0}; - var considerBan = (!banning || !banning.enabled) ? function(){} : function(shareValid){ + var considerBan = (!banning || !banning.enabled) ? function(){ return false } : function(shareValid){ if (shareValid === true) _this.shares.valid++; else _this.shares.invalid++; var totalShares = _this.shares.valid + _this.shares.invalid; if (totalShares >= banning.checkThreshold){ var percentBad = (_this.shares.invalid / totalShares) * 100; - if (percentBad >= banning.invalidPercent){ - _this.emit('ban', _this.remoteAddress); - _this.socket.end(); - } - else //reset shares + if (percentBad < banning.invalidPercent) //reset shares this.shares = {valid: 0, invalid: 0}; + else { + _this.emit('triggerBan', _this.shares.invalid + ' out of the last ' + totalShares + ' shares were invalid'); + _this.socket.destroy(); + return true; + } } + return false; }; - (function init(){ + this.init = function init(){ setupSocket(); - })(); + }; function handleMessage(message){ switch(message.method){ @@ -115,10 +117,9 @@ var StratumClient = function(options){ } function handleAuthorize(message, replyToSocket){ - _this.workerIP = options.socket.address().address; - _this.workerName = message.params[0].toLowerCase(); + _this.workerName = message.params[0]; _this.workerPass = message.params[1]; - options.authorizeFn(_this.workerIP, _this.workerName, _this.workerPass, function(result) { + options.authorizeFn(_this.remoteAddress, _this.workerName, _this.workerPass, function(result) { _this.authorized = (!result.error && result.authorized); if (replyToSocket) { @@ -131,7 +132,7 @@ var StratumClient = function(options){ // If the authorizer wants us to close the socket lets do it. if (result.disconnect === true) { - options.socket.end(); + options.socket.destroy(); } }); } @@ -164,12 +165,13 @@ var StratumClient = function(options){ nonce : message.params[4] }, function(error, result){ - considerBan(result); - sendJson({ - id : message.id, - result : result, - error : error - }); + if (!considerBan(result)){ + sendJson({ + id: message.id, + result: result, + error: error + }); + } } ); @@ -187,19 +189,27 @@ var StratumClient = function(options){ var socket = options.socket; var dataBuffer = ''; socket.setEncoding('utf8'); - socket.once('data', function(d){ - if (d.indexOf('PROXY') === 0){ - _this.remoteAddress = d.split(' ')[2]; - console.log('detected proxy source IP address of ' + _this.remoteAddress); - } + + if (options.tcpProxyProtocol === true) { + socket.once('data', function (d) { + if (d.indexOf('PROXY') === 0) { + _this.remoteAddress = d.split(' ')[2]; + } + else{ + _this.emit('tcpProxyError', d); + } + _this.emit('checkBan'); + }); + } + else{ _this.emit('checkBan'); - }); + } socket.on('data', function(d){ dataBuffer += d; if (Buffer.byteLength(dataBuffer, 'utf8') > 1024){ dataBuffer = ''; _this.emit('socketFlooded'); - socket.end(); + socket.destroy(); return; } if (dataBuffer.slice(-1) === '\n'){ @@ -210,9 +220,9 @@ var StratumClient = function(options){ try { messageJson = JSON.parse(message); } catch(e) { - if (d.indexOf('PROXY') !== 0){ + if (options.tcpProxyProtocol !== true || d.indexOf('PROXY') !== 0){ _this.emit('malformedMessage', message); - socket.end(); + socket.destroy(); } return; } @@ -224,17 +234,20 @@ var StratumClient = function(options){ dataBuffer = ''; } }); - socket.on('end', function() { - _this.emit('socketDisconnect') + socket.on('close', function() { + _this.emit('socketDisconnect'); }); socket.on('error', function(err){ - if (err.code === 'ECONNRESET') - _this.emit('socketDisconnect'); - else + if (err.code !== 'ECONNRESET') _this.emit('socketError', err); }); } + + this.getLabel = function(){ + return (_this.workerName || '(unauthorized)') + ' [' + _this.remoteAddress + ']'; + }; + this.enqueueNextDifficulty = function(requestedNewDifficulty) { pendingDifficulty = requestedNewDifficulty; return true; @@ -262,8 +275,10 @@ var StratumClient = function(options){ this.sendMiningJob = function(jobParams){ - if (Date.now() - _this.lastActivity > options.socketTimeout){ - _this.socket.end(); + var lastActivityAgo = Date.now() - _this.lastActivity; + if (lastActivityAgo > options.connectionTimeout * 1000){ + _this.emit('socketTimeout', 'last submitted a share was ' + (lastActivityAgo / 1000 | 0) + ' seconds ago'); + _this.socket.destroy(); return; } @@ -304,12 +319,13 @@ StratumClient.prototype.__proto__ = events.EventEmitter.prototype; * - 'client.disconnected'(StratumClientInstance) - when a miner disconnects. Be aware that the socket cannot be used anymore. * - 'started' - when the server is up and running **/ -var StratumServer = exports.Server = function StratumServer(ports, connectionTimeout, jobRebroadcastTimeout, banning, authorizeFn){ +var StratumServer = exports.Server = function StratumServer(options, authorizeFn){ //private members - var socketTimeout = connectionTimeout * 1000; - var bannedMS = banning ? banning.time * 1000 : null; + //ports, connectionTimeout, jobRebroadcastTimeout, banning, haproxy, authorizeFn + + var bannedMS = options.banning ? options.banning.time * 1000 : null; var _this = this; var stratumClients = {}; @@ -317,27 +333,22 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim var rebroadcastTimeout; var bannedIPs = {}; - //Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak - var purgeOldBans = (!banning || !banning.enabled) ? null : setInterval(function(){ - for (ip in bannedIPs){ - var banTime = bannedIPs[ip]; - if (Date.now() - banTime > banning.time) - delete bannedIPs[ip]; - } - }, 1000 * banning.purgeInterval); - var checkBan = function(client){ - if (banning && banning.enabled && client.remoteAddress in bannedIPs){ + function checkBan(client){ + if (options.banning && options.banning.enabled && client.remoteAddress in bannedIPs){ var bannedTime = bannedIPs[client.remoteAddress]; - if ((Date.now() - bannedTime) < bannedMS){ - client.socket.end(); - return null; + var bannedTimeAgo = Date.now() - bannedTime; + var timeLeft = bannedMS - bannedTimeAgo; + if (timeLeft > 0){ + client.socket.destroy(); + client.emit('kickedBannedIP', timeLeft / 1000 | 0); } else { delete bannedIPs[client.remoteAddress]; + client.emit('forgaveBannedIP'); } } - }; + } this.handleNewClient = function (socket){ @@ -346,11 +357,11 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim var client = new StratumClient( { subscriptionId: subscriptionId, - socket: socket, authorizeFn: authorizeFn, - banning: banning, - socketTimeout: socketTimeout, - remoteAddress: socket.remoteAddress + socket: socket, + banning: options.banning, + connectionTimeout: options.connectionTimeout, + tcpProxyProtocol: options.tcpProxyProtocol } ); @@ -359,55 +370,85 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim client.on('socketDisconnect', function() { _this.removeStratumClientBySubId(subscriptionId); _this.emit('client.disconnected', client); - }).on('ban', function(ipAddress){ - _this.banIP(ipAddress); }).on('checkBan', function(){ checkBan(client); - }); + }).on('triggerBan', function(){ + _this.addBannedIP(client.remoteAddress); + }).init(); return subscriptionId; }; - + + + + function SetupBroadcasting(){ + + var broadcastJobs = function(jobParams){ + for (var clientId in stratumClients) { + var client = stratumClients[clientId]; + client.sendMiningJob(jobParams); + } + }; + + if (isNaN(options.jobRebroadcastTimeout) || options.jobRebroadcastTimeout <= 0){ + _this.broadcastMiningJobs = broadcastJobs; + } + else{ + /* Some miners will consider the pool dead if it doesn't receive a job for around a minute. + So every time we broadcast jobs, set a timeout to rebroadcast in X seconds unless cleared. */ + _this.broadcastMiningJobs = function(jobParams){ + broadcastJobs(jobParams); + clearTimeout(rebroadcastTimeout); + rebroadcastTimeout = setTimeout(function(){ + var resendParams = jobParams; + resendParams[8] = false; + _this.broadcastMiningJobs(resendParams); + }, options.jobRebroadcastTimeout * 1000); + } + } + } + (function init(){ + + //Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak + if (options.banning && options.banning.enabled){ + setInterval(function(){ + for (ip in bannedIPs){ + var banTime = bannedIPs[ip]; + if (Date.now() - banTime > options.banning.time) + delete bannedIPs[ip]; + } + }, 1000 * options.banning.purgeInterval); + } + + + SetupBroadcasting(); + + var serversStarted = 0; - Object.keys(ports).forEach(function(port){ + Object.keys(options.ports).forEach(function(port){ net.createServer({allowHalfOpen: false}, function(socket) { _this.handleNewClient(socket); }).listen(parseInt(port), function() { serversStarted++; - if (serversStarted == Object.keys(ports).length) + if (serversStarted == Object.keys(options.ports).length) _this.emit('started'); }); }); - })(); //public members - this.banIP = function(ipAddress){ + this.addBannedIP = function(ipAddress){ bannedIPs[ipAddress] = Date.now(); + /*for (var c in stratumClients){ + var client = stratumClients[c]; + if (client.remoteAddress === ipAddress){ + _this.emit('bootedBannedWorker'); + } + }*/ }; - this.broadcastMiningJobs = function(jobParams) { - for (var clientId in stratumClients) { - // if a client gets disconnected WHILE doing this loop a crash might happen. - // 'm not sure if that can ever happen but an if here doesn't hurt! - var client = stratumClients[clientId]; - if (typeof(client) !== 'undefined') { - client.sendMiningJob(jobParams); - } - } - - /* Some miners will consider the pool dead if it doesn't receive a job for around a minute. - So every time we broadcast jobs, set a timeout to rebroadcast in X seconds unless cleared. */ - if (isNaN(jobRebroadcastTimeout) || jobRebroadcastTimeout <= 0) return; - clearTimeout(rebroadcastTimeout); - rebroadcastTimeout = setTimeout(function(){ - var resendParams = jobParams; - resendParams[8] = false; - _this.broadcastMiningJobs(resendParams); - }, jobRebroadcastTimeout * 1000); - }; this.getStratumClients = function () { return stratumClients;