diff --git a/README.md b/README.md index 4ea447a..1f8db26 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ Description of options: "internal": { //enabled this options for share payments to be processed and sent locally "enabled": true, /* This daemon is used to send out payments. It MUST be for the daemon that owns the - 'pool.address' field below, otherwise the daemon will not be able to confirm blocks + 'address' field above, otherwise the daemon will not be able to confirm blocks or sent out payments. */ "daemon": { "host": "localhost", diff --git a/init.js b/init.js index f5dfc53..7dd1f25 100644 --- a/init.js +++ b/init.js @@ -6,7 +6,7 @@ var cluster = require('cluster'); var posix = require('posix'); var PoolLogger = require('./libs/logutils.js'); var BlocknotifyListener = require('./libs/blocknotifyListener.js'); -var ShareProcessor = require('./libs/shareProcessor.js'); +var WorkerListener = require('./libs/workerListener.js'); var PoolWorker = require('./libs/poolWorker.js'); JSON.minify = JSON.minify || require("node-json-minify"); @@ -97,8 +97,8 @@ if (cluster.isMaster){ - var shareProcessor = new ShareProcessor(loggerInstance, poolConfigs); - shareProcessor.init(); + var workerListener = new WorkerListener(loggerInstance, poolConfigs); + workerListener.init(); diff --git a/libs/mposCompatibility.js b/libs/mposCompatibility.js new file mode 100644 index 0000000..02dafff --- /dev/null +++ b/libs/mposCompatibility.js @@ -0,0 +1,92 @@ +var mysql = require('mysql'); + +module.exports = function(logger, poolConfigs){ + + var dbConnections = (function(){ + var connections = {}; + Object.keys(poolConfigs).forEach(function(coin) { + + var config = poolConfigs[coin]; + + if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled) + return; + + var mposConfig = config.shareProcessing.mpos; + + function connect(){ + var connection = connections[coin] = mysql.createConnection({ + host: mposConfig.host, + port: mposConfig.port, + user: mposConfig.user, + password: mposConfig.password, + database: mposConfig.database + }); + connection.connect(function(err){ + if (err) + logger.logError('shareProcessor', 'mysql', config.coin.name + + ' - could not connect to mysql database: ' + JSON.stringify(err)) + else{ + logger.logDebug('shareProcessor', 'mysql', config.coin.name + + ' - successful connection to MySQL database'); + } + }); + connection.on('error', function(err){ + if(err.code === 'PROTOCOL_CONNECTION_LOST') { + logger.logWarning('shareProcessor', 'mysql', config.coin.name + + ' - lost connection to MySQL database, attempting reconnection...'); + connect(); + } + else{ + logger.logError('shareProcessor', 'mysql', config.coin.name + + ' - mysql database error: ' + JSON.stringify(err)) + } + }); + } + connect(); + }); + return connections; + }); + + + this.handleAuth = function(allowCallback){ + + }; + + this.handleShare = function(isValidShare, isValidBlock, data){ + + if ((!data.coin in dbConnections)) return; + + var connection = dbConnections[data.coin]; + connection.query( + 'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?', + [data.ip, data.worker, isValidShare ? 'Y' : 'N', isValidBlock ? 'Y' : 'N', data.difficulty, data.error, data.solution], + function(err, result) { + if (err) + logger.logError('shareProcessor', 'mysql', 'MySQL insert error when adding share: ' + + JSON.stringify(err)); + } + ); + }; + + this.handleDifficultyUpdate = function(workerName, diff){ + + if ((!data.coin in dbConnections)) return; + + var connection = dbConnections[data.coin]; + connection.query( + 'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName), + function(err, result){ + if (err) + logger.logError('shareProcessor', 'mysql', 'MySQL error when updating worker diff: ' + + JSON.stringify(err)); + else if (result.affectedRows === 0){ + connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff}); + } + else + console.log('Updated difficulty successfully', result); + } + ); + }; + + +}; \ No newline at end of file diff --git a/libs/poolWorker.js b/libs/poolWorker.js index 4d91268..7fee87e 100644 --- a/libs/poolWorker.js +++ b/libs/poolWorker.js @@ -73,15 +73,24 @@ module.exports = function(logger){ }); } + }).on('difficultyUpdate', function(workerName, diff){ + if (poolOptions.shareProcessing.mpos.enabled){ + process.send({ + type: 'difficultyUpdate', + workerName: workerName, + diff: diff, + coin: poolOptions.coin.name + }); + } }).on('log', function(severity, logKey, logText) { - if (severity == 'debug') { - logDebug(logIdentify, logKey, logText); - } else if (severity == 'warning') { - logWarning(logIdentify, logKey, logText); - } else if (severity == 'error') { - logError(logIdentify, logKey, logText); - } - }); + if (severity == 'debug') { + logDebug(logIdentify, logKey, logText); + } else if (severity == 'warning') { + logWarning(logIdentify, logKey, logText); + } else if (severity == 'error') { + logError(logIdentify, logKey, logText); + } + }); pool.start(); pools.push(pool); }); diff --git a/libs/shareProcessor.js b/libs/shareProcessor.js index f033b41..a96dbcf 100644 --- a/libs/shareProcessor.js +++ b/libs/shareProcessor.js @@ -1,76 +1,27 @@ -var events = require('events'); -var cluster = require('cluster'); - var redis = require('redis'); -var mysql = require('mysql'); -var processor = module.exports = function processor(logger, poolConfigs){ +module.exports = function(logger, poolConfigs){ - var _this = this; + + //TODO: need to add redis config to json. probably do one redis client per pool? var client; - var poolMposHandlers = (function(){ - var handlers = {}; + client = redis.createClient(); - Object.keys(poolConfigs).forEach(function(coin) { + client.on("error", function (err) { + logger.logError('shareProcessor', 'redis', 'Redis client had an error: ' + err); + }); - var config = poolConfigs[coin]; + this.handleDifficultyUpdate = function(data){ + var coin = data.coin; + var poolConfig = poolConfigs[coin]; + if (poolConfig.shareProcessing.mpos && poolConfig.shareProcessing.mpos.enabled){ + poolMposHandlers[coin].updateDifficulty(data.workerName, data.diff); + } + }; - if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled) - return; - var mposConfig = config.shareProcessing.mpos; - var connection = mysql.createConnection({ - host: mposConfig.host, - port: mposConfig.port, - user: mposConfig.user, - password: mposConfig.password, - database: mposConfig.database - }); - connection.connect(function(err){ - logger.logError('shareProcessor', 'database', config.coin.name + - ' - could not connect to mysql database: ' + JSON.stringify(err)) - }); - connection.on('error', function(err){ - logger.logError('shareProcessor', 'database', config.coin.name + - ' - mysql database error: ' + JSON.stringify(err)) - }); - - var insertShare = function(isValidShare, isValidBlock, data){ - connection.query( - 'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?', - [data.ip, data.worker, isValidShare ? 'Y' : 'N', isValidBlock ? 'Y' : 'N', data.difficulty, data.error, data.solution], - function(err, result) { - if (err) - logger.logError('shareProcessor', 'database', 'MySQL insert error when adding share: ' + - JSON.stringify(err)); - } - ); - }; - - var updateDifficulty = function(workerName, diff){ - connection.query( - 'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName), - function(err, result){ - if (err) - logger.logError('shareProcessor', 'database', 'MySQL error when updating worker diff: ' + - JSON.stringify(err)); - else if (result.affectedRows === 0){ - connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff}); - } - else - console.log('Updated difficulty successfully', result); - } - ); - }; - - handlers[config.coin.name] = {insertShare: insertShare, updateDifficulty: updateDifficulty}; - }); - return handlers; - })(); - - - function handleShare(data){ + this.handleShare = function(data){ var shareData = data.share; var coin = data.coin; var poolConfig = poolConfigs[coin]; @@ -82,39 +33,12 @@ var processor = module.exports = function processor(logger, poolConfigs){ if (poolConfig.shareProcessing.internal && poolConfig.shareProcessing.internal.enable && data.isValidShare){ client.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){ if (error) - logger.logError('shareProcessor', 'database', 'could not store worker share') + logger.logError('shareProcessor', 'redis', 'could not store worker share') }); } - } + }; - function handleBlock(data){ - var requiredConfirmations = data.confirmations; - //setInterval where we check for block confirmations - //probably create our own rpc interface for each pool - } - - this.init = function(){ - - client = redis.createClient(); - - client.on("error", function (err) { - logger.logError('shareProcessor', 'database', 'Redis client had an error: ' + err); - }); - - Object.keys(cluster.workers).forEach(function(id) { - cluster.workers[id].on('message', function(data){ - switch(data.type){ - case 'share': - handleShare(data); - break; - case 'block': - handleBlock(data) - break; - } - }); - }); - } -}; - - -processor.prototype.__proto__ = events.EventEmitter.prototype; + this.handleBlock = function(data){ + // + }; +}; \ No newline at end of file diff --git a/libs/workerListener.js b/libs/workerListener.js new file mode 100644 index 0000000..634b4d3 --- /dev/null +++ b/libs/workerListener.js @@ -0,0 +1,46 @@ +var events = require('events'); +var cluster = require('cluster'); + +var MposCompatibility = require('./mposCompatibility.js'); +var ShareProcessor = require('./shareProcessor.js'); + + +var processor = module.exports = function processor(logger, poolConfigs){ + + var _this = this; + + + var mposCompat = new MposCompatibility(logger, poolConfigs); + var shareProcessor = new ShareProcessor(logger, poolConfigs); + + + this.init = function(){ + + Object.keys(cluster.workers).forEach(function(id) { + cluster.workers[id].on('message', function(data){ + + var shareProcessing = poolConfigs[data.coin].shareProcessing; + + switch(data.type){ + case 'share': + if (shareProcessing.internal.enabled) + shareProcessor.handleShare(data); + if (shareProcessing.mpos.enabled) + mposCompat.handleShare(data); + break; + case 'difficultyUpdate': + if (shareProcessing.mpos.enabled) + mposCompat.handleDifficultyUpdate(data); + break; + case 'block': + if (shareProcessing.internal.enabled) + shareProcessor.handleBlock(data); + break; + } + }); + }); + } +}; + + +processor.prototype.__proto__ = events.EventEmitter.prototype; diff --git a/pool_configs/litecoin_testnet_example.json b/pool_configs/litecoin_testnet_example.json index 1475730..e5d5304 100644 --- a/pool_configs/litecoin_testnet_example.json +++ b/pool_configs/litecoin_testnet_example.json @@ -56,7 +56,7 @@ ], "p2p": { - "enabled": true, + "enabled": false, "host": "localhost", "port": 19333, "protocolVersion": 70002, diff --git a/test.js b/test.js deleted file mode 100644 index 0cbdd66..0000000 --- a/test.js +++ /dev/null @@ -1,11 +0,0 @@ -var net = require('net'); - -var socketServer = net.createServer({allowHalfOpen: true}, function(socket){ - console.log(socket); -}); -socketServer.listen(1111, function(){ - console.log('started 0'); -}); -socketServer.listen(1112, function(){ - console.log('started 1'); -}) \ No newline at end of file