From 5c19ec9abcedf80fdcffba1d7f07fa177051f0c8 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 5 Mar 2014 15:10:50 -0700 Subject: [PATCH] Moved database interactions out of master process into worker processes to avoid IPC overhead --- init.js | 2 +- libs/mposCompatibility.js | 135 ++++++++++----------------- libs/poolWorker.js | 187 ++++++++++++++++++-------------------- libs/shareProcessor.js | 56 ++++-------- libs/workerListener.js | 25 +---- 5 files changed, 152 insertions(+), 253 deletions(-) diff --git a/init.js b/init.js index 3023015..4da2b19 100644 --- a/init.js +++ b/init.js @@ -86,7 +86,7 @@ if (cluster.isMaster){ for (var i = 0; i < numForks; i++) { cluster.fork({ - fork: i, + forkId: i, pools: serializedConfigs }); } diff --git a/libs/mposCompatibility.js b/libs/mposCompatibility.js index 51d9834..972f7e6 100644 --- a/libs/mposCompatibility.js +++ b/libs/mposCompatibility.js @@ -1,118 +1,80 @@ var mysql = require('mysql'); var cluster = require('cluster'); -module.exports = function(logger, poolConfigs){ +module.exports = function(logger, poolConfig){ - var dbConnections = (function(){ - var connections = {}; + var mposConfig = poolConfig.shareProcessing.mpos; + var coin = poolConfig.coin.name; + var connection; - Object.keys(poolConfigs).forEach(function(coin) { - - var config = poolConfigs[coin]; - - if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled) - return; - - var mposConfig = config.shareProcessing.mpos; - - function connect(){ - var connection = connections[coin] = mysql.createConnection({ - host: mposConfig.host, - port: mposConfig.port, - user: mposConfig.user, - password: mposConfig.password, - database: mposConfig.database - }); - connection.connect(function(err){ - if (err) - logger.logError('shareProcessor', 'mysql', coin + - ' - could not connect to mysql database: ' + JSON.stringify(err)) - else{ - logger.logDebug('shareProcessor', 'mysql', coin + - ' - successful connection to MySQL database'); - } - }); - connection.on('error', function(err){ - if(err.code === 'PROTOCOL_CONNECTION_LOST') { - logger.logWarning('shareProcessor', 'mysql', coin + - ' - lost connection to MySQL database, attempting reconnection...'); - connect(); - } - else{ - logger.logError('shareProcessor', 'mysql', coin + - ' - mysql database error: ' + JSON.stringify(err)) - } - }); - } - connect(); + function connect(){ + connection = mysql.createConnection({ + host: mposConfig.host, + port: mposConfig.port, + user: mposConfig.user, + password: mposConfig.password, + database: mposConfig.database }); - return connections; - })(); + connection.connect(function(err){ + if (err) + logger.error('mysql', 'Could not connect to mysql database: ' + JSON.stringify(err)) + else{ + logger.debug('mysql', 'Successful connection to MySQL database'); + } + }); + connection.on('error', function(err){ + if(err.code === 'PROTOCOL_CONNECTION_LOST') { + logger.warning('mysql', 'Lost connection to MySQL database, attempting reconnection...'); + connect(); + } + else{ + logger.error('mysql', 'Database error: ' + JSON.stringify(err)) + } + }); + } + connect(); + this.handleAuth = function(workerName, password, authCallback){ - this.handleAuth = function(data){ - /* - type: 'mposAuth', - coin: poolOptions.coin.name, - callbackId: callbackId, - workerId: cluster.worker.id, - workerName: workerName, - password: password, - authLevel: authLevel - */ - - var sendResult = function(authorized){ - cluster.workers[data.workerId].send({ - type : 'mposAuth', - callbackId : data.callbackId, - authorized : authorized - }); - }; - - var connection = dbConnections[data.coin]; connection.query( 'SELECT password FROM pool_worker WHERE username = LOWER(?)', - [data.workerName], + [workerName], function(err, result){ if (err){ - logger.logError('shareProcessor', 'mysql', 'MySQL error when authenticating worker: ' + + logger.error('mysql', 'Database error when authenticating worker: ' + JSON.stringify(err)); - sendResult(false); + authCallback(false); } else if (!result[0]) - sendResult(false); + authCallback(false); else if (data.authLevel === 'worker') - sendResult(true); - else if (result[0].password === data.password) - sendResult(true) + authCallback(true); + else if (result[0].password === password) + authCallback(true) else - sendResult(false); + authCallback(false); } ); }; - this.handleShare = function(data){ - var isValidShare = data.isValidShare; - var isValidBlock = data.isValidBlock; - if ((!data.coin in dbConnections)) return; + this.handleShare = function(isValidShare, isValidBlock, shareData){ - var connection = dbConnections[data.coin]; var dbData = [ - data.share.ip, - data.share.worker, + shareData.ip, + shareData.worker, isValidShare ? 'Y' : 'N', - isValidBlock ? 'Y' : 'N', - data.share.difficulty, - typeof(data.share.error)==='undefined'?null:data.share.error, - typeof(data.solution)==='undefined'?'':data.solution // solution? + isValidBlock ? 'Y' : 'N', + shareData.difficulty, + typeof(shareData.error) === 'undefined' ? null : shareData.error, + typeof(shareData.solution) === 'undefined' ? null : shareData.solution ]; connection.query( 'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?', dbData, function(err, result) { if (err) - logger.logError('shareProcessor', 'mysql', 'MySQL insert error when adding share: ' + + logger.error('mysql', 'Insert error when adding share: ' + JSON.stringify(err)); } ); @@ -120,14 +82,11 @@ module.exports = function(logger, poolConfigs){ this.handleDifficultyUpdate = function(workerName, diff){ - if ((!data.coin in dbConnections)) return; - - var connection = dbConnections[data.coin]; connection.query( 'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName), function(err, result){ if (err) - logger.logError('shareProcessor', 'mysql', 'MySQL error when updating worker diff: ' + + logger.error('mysql', 'Error when updating worker diff: ' + JSON.stringify(err)); else if (result.affectedRows === 0){ connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff}); diff --git a/libs/poolWorker.js b/libs/poolWorker.js index 1469ed4..067438d 100644 --- a/libs/poolWorker.js +++ b/libs/poolWorker.js @@ -2,148 +2,135 @@ var cluster = require('cluster'); var Stratum = require('stratum-pool'); +var MposCompatibility = require('./mposCompatibility.js'); +var ShareProcessor = require('./shareProcessor.js'); + module.exports = function(logger){ - var logDebug = logger.logDebug; - var logWarning = logger.logWarning; - var logError = logger.logError; - - var poolConfigs = JSON.parse(process.env.pools); - var fork = process.env.fork; + var forkId = process.env.forkId; - var pools = []; + var pools = {}; //Handle messages from master process sent via IPC process.on('message', function(message) { switch(message.type){ case 'blocknotify': - for (var i = 0; i < pools.length; i++){ - if (pools[i].options.coin.name.toLowerCase() === message.coin.toLowerCase()){ - pools[i].processBlockNotify(message.hash) - return; - } - } - break; - case 'mposAuth': - var callbackId = message.callbackId; - if (callbackId in mposAuthCallbacks) { - mposAuthCallbacks[callbackId](message.authorized); - } + var pool = pools[message.coin.toLowerCase()] + if (pool) pool.processBlockNotify(message.hash) break; } }); - var mposAuthCallbacks = {}; - Object.keys(poolConfigs).forEach(function(coin) { var poolOptions = poolConfigs[coin]; - var logIdentify = coin + ' (Fork ' + fork + ')'; + var logIdentify = coin + ' (Fork ' + forkId + ')'; - var authorizeFN = function (ip, workerName, password, callback) { - // Default implementation just returns true - logDebug(logIdentify, 'client', "Authorize [" + ip + "] " + workerName + ":" + password); - - var mposAuthLevel; - if (poolOptions.shareProcessing.mpos.enabled && ( - (mposAuthLevel = poolOptions.shareProcessing.mpos.stratumAuth) === 'worker' || - mposAuthLevel === 'password' - )){ - var callbackId = coin + workerName + password + Date.now(); - var authTimeout = setTimeout(function(){ - if (!(callbackId in mposAuthCallbacks)) - return; - callback({ - error: null, - authorized: false, - disconnect: false - }); - delete mposAuthCallbacks[callbackId]; - }, 30000); - mposAuthCallbacks[callbackId] = function(authorized){ - callback({ - error: null, - authorized: authorized, - disconnect: false - }); - delete mposAuthCallbacks[callbackId]; - clearTimeout(authTimeout); - }; - process.send({ - type : 'mposAuth', - coin : poolOptions.coin.name, - callbackId : callbackId, - workerId : cluster.worker.id, - workerName : workerName, - password : password, - authLevel : mposAuthLevel - }); + var poolLogger = { + debug: function(key, text){ + logger.logDebug(logIdentify, key, text); + }, + warning: function(key, text){ + logger.logWarning(logIdentify, key, text); + }, + error: function(key, text){ + logger.logError(logIdentify, key, text); } - else{ - // if we're here than it means we're on stratumAuth: "none" or something unrecognized by the system! - callback({ + }; + + var handlers = { + auth: function(){}, + share: function(){}, + diff: function(){} + }; + + var shareProcessing = poolOptions.shareProcessing; + + //Functions required for MPOS compatibility + if (shareProcessing.mpos && shareProcessing.mpos.enabled){ + var mposCompat = new MposCompatibility(poolLogger, poolOptions) + + handlers.auth = function(workerName, password, authCallback){ + mposCompat.handleAuth(workerName, password, authCallback); + }; + + handlers.share = function(isValidShare, isValidBlock, data){ + mposCompat.handleShare(isValidShare, isValidBlock, data); + }; + + handlers.diff = function(workerName, diff){ + mposCompat.handleDifficultyUpdate(workerName, diff); + } + } + + //Functions required for internal payment processing + else if (shareProcessing.internal && shareProcessing.internal.enabled){ + + var shareProcessor = new ShareProcessor(poolLogger, poolOptions) + + handlers.auth = function(workerName, password, authCallback){ + authCallback({ error: null, authorized: true, disconnect: false }); - } + }; + + handlers.share = function(isValidShare, isValidBlock, data){ + shareProcessor.handleShare(isValidShare, isValidBlock, data); + }; + } + + var authorizeFN = function (ip, workerName, password, callback) { + handlers.auth(workerName, password, function(authorized){ + + var authString = authorized ? 'Authorized' : 'Unauthorized '; + + poolLogger.debug('client', authorized + ' [' + ip + '] ' + workerName + ':' + password); + callback({ + error: null, + authorized: authorized, + disconnect: false + }); + }); }; var pool = Stratum.createPool(poolOptions, authorizeFN); - pool.on('share', function(isValidShare, isValidBlock, data, blockHex){ + pool.on('share', function(isValidShare, isValidBlock, data){ var shareData = JSON.stringify(data); - if (data.solution && !isValidBlock){ - logDebug(logIdentify, 'client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData); - } - else if (!isValidShare){ - logDebug(logIdentify, 'client', 'Invalid share submitted, share data: ' + shareData) - } + if (data.solution && !isValidBlock) + poolLogger.debug('client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData); + else if (isValidBlock) + poolLogger.debug('client', 'Block found, solution: ' + shareData.solution); - logDebug(logIdentify, 'client', 'Valid share submitted, share data: ' + shareData); - process.send({ - type : 'share', - share : data, - coin : poolOptions.coin.name, - isValidShare : isValidShare, - isValidBlock : isValidBlock, - solution : blockHex // blockHex is undefined is this was not a valid block. - }); + if (isValidShare) + poolLogger.debug('client', 'Valid share submitted, share data: ' + shareData); + else if (!isValidShare) + poolLogger.debug('client', 'Invalid share submitted, share data: ' + shareData) + + + handlers.share(isValidShare, isValidBlock, data) - if (isValidBlock){ - logDebug(logIdentify, 'client', 'Block found, solution: ' + shareData.solution); - process.send({ - type : 'block', - share : data, - coin : poolOptions.coin.name - }); - } }).on('difficultyUpdate', function(workerName, diff){ - if (poolOptions.shareProcessing.mpos.enabled){ - process.send({ - type : 'difficultyUpdate', - workerName : workerName, - diff : diff, - coin : poolOptions.coin.name - }); - } + handlers.diff(workerName, diff); }).on('log', function(severity, logKey, logText) { if (severity == 'debug') { - logDebug(logIdentify, logKey, logText); + poolLogger.debug(logKey, logText); } else if (severity == 'warning') { - logWarning(logIdentify, logKey, logText); + poolLogger.warning(logKey, logText); } else if (severity == 'error') { - logError(logIdentify, logKey, logText); + poolLogger.error(logKey, logText); } }); pool.start(); - pools.push(pool); + pools[poolOptions.coin.name.toLowerCase()] = pool; }); }; \ No newline at end of file diff --git a/libs/shareProcessor.js b/libs/shareProcessor.js index 83fe0e8..8ffc7ce 100644 --- a/libs/shareProcessor.js +++ b/libs/shareProcessor.js @@ -1,61 +1,37 @@ var redis = require('redis'); -module.exports = function(logger, poolConfigs){ +module.exports = function(logger, poolConfig){ + var redisConfig = poolConfig.shareProcessing.internal.redis; + var coin = poolConfig.coin.name; - var dbConnections = (function(){ - var connections = {}; - Object.keys(poolConfigs).forEach(function(coin) { + var connection; - var config = poolConfigs[coin]; - - if (!config.shareProcessing || !config.shareProcessing.internal || !config.shareProcessing.internal.enabled) - return; - - var redisConfig = config.shareProcessing.internal.redis; - - function connect(){ - var connection = connections[coin] = redis.createClient(redisConfig.port, redisConfig.host); - connection.on('error', function(err){ - logger.logError('shareProcessor', 'redis', coin + - ' - redis client had an error: ' + JSON.stringify(err)) - }); - connection.on('end', function(){ - logger.logWarning('shareProcessor', 'redis', coin + - ' - connection to redis database as been ended'); - connect(); - }); - } + function connect(){ + var connection = connections[coin] = redis.createClient(redisConfig.port, redisConfig.host); + connection.on('error', function(err){ + logger.error('redis', 'Redis client had an error: ' + JSON.stringify(err)) + }); + connection.on('end', function(){ + logger.warning('redis', 'Connection to redis database as been ended'); connect(); }); - })(); + } + connect(); - this.handleDifficultyUpdate = function(data){ - var coin = data.coin; - var poolConfig = poolConfigs[coin]; - }; - this.handleShare = function(data){ - if ((!data.coin in dbConnections)) return; + this.handleShare = function(isValidShare, isValidBlock, shareData){ - if (!data.isValidShare) return; - var connection = dbConnections[data.coin]; - - var shareData = data.share; - var coin = data.coin; - var poolConfig = poolConfigs[coin]; + if (!isValidShare) return; connection.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){ if (error) - logger.logError('shareProcessor', 'redis', 'could not store worker share') + logger.error('redis', 'Could not store worker share') }); }; - this.handleBlock = function(data){ - // - }; }; \ No newline at end of file diff --git a/libs/workerListener.js b/libs/workerListener.js index 92c0329..6bb669b 100644 --- a/libs/workerListener.js +++ b/libs/workerListener.js @@ -10,35 +10,12 @@ var processor = module.exports = function processor(logger, poolConfigs){ var _this = this; - var mposCompat = new MposCompatibility(logger, poolConfigs); - var shareProcessor = new ShareProcessor(logger, poolConfigs); - - this.init = function(){ Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].on('message', function(data){ - - var shareProcessing = poolConfigs[data.coin].shareProcessing; - switch(data.type){ - case 'share': - if (shareProcessing.internal.enabled) - shareProcessor.handleShare(data); - if (shareProcessing.mpos.enabled) - mposCompat.handleShare(data); - break; - case 'difficultyUpdate': - if (shareProcessing.mpos.enabled) - mposCompat.handleDifficultyUpdate(data); - break; - case 'block': - if (shareProcessing.internal.enabled) - shareProcessor.handleBlock(data); - break; - case 'mposAuth': - mposCompat.handleAuth(data); - break; + } }); });