Moved database interactions out of master process into worker processes to avoid IPC overhead

This commit is contained in:
Matt 2014-03-05 15:10:50 -07:00
parent 63c131762b
commit 5c19ec9abc
5 changed files with 152 additions and 253 deletions

View File

@ -86,7 +86,7 @@ if (cluster.isMaster){
for (var i = 0; i < numForks; i++) { for (var i = 0; i < numForks; i++) {
cluster.fork({ cluster.fork({
fork: i, forkId: i,
pools: serializedConfigs pools: serializedConfigs
}); });
} }

View File

@ -1,118 +1,80 @@
var mysql = require('mysql'); var mysql = require('mysql');
var cluster = require('cluster'); var cluster = require('cluster');
module.exports = function(logger, poolConfigs){ module.exports = function(logger, poolConfig){
var dbConnections = (function(){ var mposConfig = poolConfig.shareProcessing.mpos;
var connections = {}; var coin = poolConfig.coin.name;
var connection;
Object.keys(poolConfigs).forEach(function(coin) { function connect(){
connection = mysql.createConnection({
var config = poolConfigs[coin]; host: mposConfig.host,
port: mposConfig.port,
if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled) user: mposConfig.user,
return; password: mposConfig.password,
database: mposConfig.database
var mposConfig = config.shareProcessing.mpos;
function connect(){
var connection = connections[coin] = mysql.createConnection({
host: mposConfig.host,
port: mposConfig.port,
user: mposConfig.user,
password: mposConfig.password,
database: mposConfig.database
});
connection.connect(function(err){
if (err)
logger.logError('shareProcessor', 'mysql', coin +
' - could not connect to mysql database: ' + JSON.stringify(err))
else{
logger.logDebug('shareProcessor', 'mysql', coin +
' - successful connection to MySQL database');
}
});
connection.on('error', function(err){
if(err.code === 'PROTOCOL_CONNECTION_LOST') {
logger.logWarning('shareProcessor', 'mysql', coin +
' - lost connection to MySQL database, attempting reconnection...');
connect();
}
else{
logger.logError('shareProcessor', 'mysql', coin +
' - mysql database error: ' + JSON.stringify(err))
}
});
}
connect();
}); });
return connections; connection.connect(function(err){
})(); if (err)
logger.error('mysql', 'Could not connect to mysql database: ' + JSON.stringify(err))
else{
logger.debug('mysql', 'Successful connection to MySQL database');
}
});
connection.on('error', function(err){
if(err.code === 'PROTOCOL_CONNECTION_LOST') {
logger.warning('mysql', 'Lost connection to MySQL database, attempting reconnection...');
connect();
}
else{
logger.error('mysql', 'Database error: ' + JSON.stringify(err))
}
});
}
connect();
this.handleAuth = function(workerName, password, authCallback){
this.handleAuth = function(data){
/*
type: 'mposAuth',
coin: poolOptions.coin.name,
callbackId: callbackId,
workerId: cluster.worker.id,
workerName: workerName,
password: password,
authLevel: authLevel
*/
var sendResult = function(authorized){
cluster.workers[data.workerId].send({
type : 'mposAuth',
callbackId : data.callbackId,
authorized : authorized
});
};
var connection = dbConnections[data.coin];
connection.query( connection.query(
'SELECT password FROM pool_worker WHERE username = LOWER(?)', 'SELECT password FROM pool_worker WHERE username = LOWER(?)',
[data.workerName], [workerName],
function(err, result){ function(err, result){
if (err){ if (err){
logger.logError('shareProcessor', 'mysql', 'MySQL error when authenticating worker: ' + logger.error('mysql', 'Database error when authenticating worker: ' +
JSON.stringify(err)); JSON.stringify(err));
sendResult(false); authCallback(false);
} }
else if (!result[0]) else if (!result[0])
sendResult(false); authCallback(false);
else if (data.authLevel === 'worker') else if (data.authLevel === 'worker')
sendResult(true); authCallback(true);
else if (result[0].password === data.password) else if (result[0].password === password)
sendResult(true) authCallback(true)
else else
sendResult(false); authCallback(false);
} }
); );
}; };
this.handleShare = function(data){ this.handleShare = function(isValidShare, isValidBlock, shareData){
var isValidShare = data.isValidShare;
var isValidBlock = data.isValidBlock;
if ((!data.coin in dbConnections)) return;
var connection = dbConnections[data.coin];
var dbData = [ var dbData = [
data.share.ip, shareData.ip,
data.share.worker, shareData.worker,
isValidShare ? 'Y' : 'N', isValidShare ? 'Y' : 'N',
isValidBlock ? 'Y' : 'N', isValidBlock ? 'Y' : 'N',
data.share.difficulty, shareData.difficulty,
typeof(data.share.error)==='undefined'?null:data.share.error, typeof(shareData.error) === 'undefined' ? null : shareData.error,
typeof(data.solution)==='undefined'?'':data.solution // solution? typeof(shareData.solution) === 'undefined' ? null : shareData.solution
]; ];
connection.query( connection.query(
'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?', 'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?',
dbData, dbData,
function(err, result) { function(err, result) {
if (err) if (err)
logger.logError('shareProcessor', 'mysql', 'MySQL insert error when adding share: ' + logger.error('mysql', 'Insert error when adding share: ' +
JSON.stringify(err)); JSON.stringify(err));
} }
); );
@ -120,14 +82,11 @@ module.exports = function(logger, poolConfigs){
this.handleDifficultyUpdate = function(workerName, diff){ this.handleDifficultyUpdate = function(workerName, diff){
if ((!data.coin in dbConnections)) return;
var connection = dbConnections[data.coin];
connection.query( connection.query(
'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName), 'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName),
function(err, result){ function(err, result){
if (err) if (err)
logger.logError('shareProcessor', 'mysql', 'MySQL error when updating worker diff: ' + logger.error('mysql', 'Error when updating worker diff: ' +
JSON.stringify(err)); JSON.stringify(err));
else if (result.affectedRows === 0){ else if (result.affectedRows === 0){
connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff}); connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff});

View File

@ -2,148 +2,135 @@ var cluster = require('cluster');
var Stratum = require('stratum-pool'); var Stratum = require('stratum-pool');
var MposCompatibility = require('./mposCompatibility.js');
var ShareProcessor = require('./shareProcessor.js');
module.exports = function(logger){ module.exports = function(logger){
var logDebug = logger.logDebug;
var logWarning = logger.logWarning;
var logError = logger.logError;
var poolConfigs = JSON.parse(process.env.pools); var poolConfigs = JSON.parse(process.env.pools);
var fork = process.env.fork; var forkId = process.env.forkId;
var pools = []; var pools = {};
//Handle messages from master process sent via IPC //Handle messages from master process sent via IPC
process.on('message', function(message) { process.on('message', function(message) {
switch(message.type){ switch(message.type){
case 'blocknotify': case 'blocknotify':
for (var i = 0; i < pools.length; i++){ var pool = pools[message.coin.toLowerCase()]
if (pools[i].options.coin.name.toLowerCase() === message.coin.toLowerCase()){ if (pool) pool.processBlockNotify(message.hash)
pools[i].processBlockNotify(message.hash)
return;
}
}
break;
case 'mposAuth':
var callbackId = message.callbackId;
if (callbackId in mposAuthCallbacks) {
mposAuthCallbacks[callbackId](message.authorized);
}
break; break;
} }
}); });
var mposAuthCallbacks = {};
Object.keys(poolConfigs).forEach(function(coin) { Object.keys(poolConfigs).forEach(function(coin) {
var poolOptions = poolConfigs[coin]; var poolOptions = poolConfigs[coin];
var logIdentify = coin + ' (Fork ' + fork + ')'; var logIdentify = coin + ' (Fork ' + forkId + ')';
var authorizeFN = function (ip, workerName, password, callback) { var poolLogger = {
// Default implementation just returns true debug: function(key, text){
logDebug(logIdentify, 'client', "Authorize [" + ip + "] " + workerName + ":" + password); logger.logDebug(logIdentify, key, text);
},
var mposAuthLevel; warning: function(key, text){
if (poolOptions.shareProcessing.mpos.enabled && ( logger.logWarning(logIdentify, key, text);
(mposAuthLevel = poolOptions.shareProcessing.mpos.stratumAuth) === 'worker' || },
mposAuthLevel === 'password' error: function(key, text){
)){ logger.logError(logIdentify, key, text);
var callbackId = coin + workerName + password + Date.now();
var authTimeout = setTimeout(function(){
if (!(callbackId in mposAuthCallbacks))
return;
callback({
error: null,
authorized: false,
disconnect: false
});
delete mposAuthCallbacks[callbackId];
}, 30000);
mposAuthCallbacks[callbackId] = function(authorized){
callback({
error: null,
authorized: authorized,
disconnect: false
});
delete mposAuthCallbacks[callbackId];
clearTimeout(authTimeout);
};
process.send({
type : 'mposAuth',
coin : poolOptions.coin.name,
callbackId : callbackId,
workerId : cluster.worker.id,
workerName : workerName,
password : password,
authLevel : mposAuthLevel
});
} }
else{ };
// if we're here than it means we're on stratumAuth: "none" or something unrecognized by the system!
callback({ var handlers = {
auth: function(){},
share: function(){},
diff: function(){}
};
var shareProcessing = poolOptions.shareProcessing;
//Functions required for MPOS compatibility
if (shareProcessing.mpos && shareProcessing.mpos.enabled){
var mposCompat = new MposCompatibility(poolLogger, poolOptions)
handlers.auth = function(workerName, password, authCallback){
mposCompat.handleAuth(workerName, password, authCallback);
};
handlers.share = function(isValidShare, isValidBlock, data){
mposCompat.handleShare(isValidShare, isValidBlock, data);
};
handlers.diff = function(workerName, diff){
mposCompat.handleDifficultyUpdate(workerName, diff);
}
}
//Functions required for internal payment processing
else if (shareProcessing.internal && shareProcessing.internal.enabled){
var shareProcessor = new ShareProcessor(poolLogger, poolOptions)
handlers.auth = function(workerName, password, authCallback){
authCallback({
error: null, error: null,
authorized: true, authorized: true,
disconnect: false disconnect: false
}); });
} };
handlers.share = function(isValidShare, isValidBlock, data){
shareProcessor.handleShare(isValidShare, isValidBlock, data);
};
}
var authorizeFN = function (ip, workerName, password, callback) {
handlers.auth(workerName, password, function(authorized){
var authString = authorized ? 'Authorized' : 'Unauthorized ';
poolLogger.debug('client', authorized + ' [' + ip + '] ' + workerName + ':' + password);
callback({
error: null,
authorized: authorized,
disconnect: false
});
});
}; };
var pool = Stratum.createPool(poolOptions, authorizeFN); var pool = Stratum.createPool(poolOptions, authorizeFN);
pool.on('share', function(isValidShare, isValidBlock, data, blockHex){ pool.on('share', function(isValidShare, isValidBlock, data){
var shareData = JSON.stringify(data); var shareData = JSON.stringify(data);
if (data.solution && !isValidBlock){ if (data.solution && !isValidBlock)
logDebug(logIdentify, 'client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData); poolLogger.debug('client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData);
} else if (isValidBlock)
else if (!isValidShare){ poolLogger.debug('client', 'Block found, solution: ' + shareData.solution);
logDebug(logIdentify, 'client', 'Invalid share submitted, share data: ' + shareData)
}
logDebug(logIdentify, 'client', 'Valid share submitted, share data: ' + shareData); if (isValidShare)
process.send({ poolLogger.debug('client', 'Valid share submitted, share data: ' + shareData);
type : 'share', else if (!isValidShare)
share : data, poolLogger.debug('client', 'Invalid share submitted, share data: ' + shareData)
coin : poolOptions.coin.name,
isValidShare : isValidShare,
isValidBlock : isValidBlock, handlers.share(isValidShare, isValidBlock, data)
solution : blockHex // blockHex is undefined is this was not a valid block.
});
if (isValidBlock){
logDebug(logIdentify, 'client', 'Block found, solution: ' + shareData.solution);
process.send({
type : 'block',
share : data,
coin : poolOptions.coin.name
});
}
}).on('difficultyUpdate', function(workerName, diff){ }).on('difficultyUpdate', function(workerName, diff){
if (poolOptions.shareProcessing.mpos.enabled){ handlers.diff(workerName, diff);
process.send({
type : 'difficultyUpdate',
workerName : workerName,
diff : diff,
coin : poolOptions.coin.name
});
}
}).on('log', function(severity, logKey, logText) { }).on('log', function(severity, logKey, logText) {
if (severity == 'debug') { if (severity == 'debug') {
logDebug(logIdentify, logKey, logText); poolLogger.debug(logKey, logText);
} else if (severity == 'warning') { } else if (severity == 'warning') {
logWarning(logIdentify, logKey, logText); poolLogger.warning(logKey, logText);
} else if (severity == 'error') { } else if (severity == 'error') {
logError(logIdentify, logKey, logText); poolLogger.error(logKey, logText);
} }
}); });
pool.start(); pool.start();
pools.push(pool); pools[poolOptions.coin.name.toLowerCase()] = pool;
}); });
}; };

View File

@ -1,61 +1,37 @@
var redis = require('redis'); var redis = require('redis');
module.exports = function(logger, poolConfigs){ module.exports = function(logger, poolConfig){
var redisConfig = poolConfig.shareProcessing.internal.redis;
var coin = poolConfig.coin.name;
var dbConnections = (function(){ var connection;
var connections = {};
Object.keys(poolConfigs).forEach(function(coin) {
var config = poolConfigs[coin]; function connect(){
var connection = connections[coin] = redis.createClient(redisConfig.port, redisConfig.host);
if (!config.shareProcessing || !config.shareProcessing.internal || !config.shareProcessing.internal.enabled) connection.on('error', function(err){
return; logger.error('redis', 'Redis client had an error: ' + JSON.stringify(err))
});
var redisConfig = config.shareProcessing.internal.redis; connection.on('end', function(){
logger.warning('redis', 'Connection to redis database as been ended');
function connect(){
var connection = connections[coin] = redis.createClient(redisConfig.port, redisConfig.host);
connection.on('error', function(err){
logger.logError('shareProcessor', 'redis', coin +
' - redis client had an error: ' + JSON.stringify(err))
});
connection.on('end', function(){
logger.logWarning('shareProcessor', 'redis', coin +
' - connection to redis database as been ended');
connect();
});
}
connect(); connect();
}); });
})(); }
connect();
this.handleDifficultyUpdate = function(data){
var coin = data.coin;
var poolConfig = poolConfigs[coin];
};
this.handleShare = function(data){
if ((!data.coin in dbConnections)) return; this.handleShare = function(isValidShare, isValidBlock, shareData){
if (!data.isValidShare) return;
var connection = dbConnections[data.coin]; if (!isValidShare) return;
var shareData = data.share;
var coin = data.coin;
var poolConfig = poolConfigs[coin];
connection.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){ connection.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){
if (error) if (error)
logger.logError('shareProcessor', 'redis', 'could not store worker share') logger.error('redis', 'Could not store worker share')
}); });
}; };
this.handleBlock = function(data){
//
};
}; };

View File

@ -10,35 +10,12 @@ var processor = module.exports = function processor(logger, poolConfigs){
var _this = this; var _this = this;
var mposCompat = new MposCompatibility(logger, poolConfigs);
var shareProcessor = new ShareProcessor(logger, poolConfigs);
this.init = function(){ this.init = function(){
Object.keys(cluster.workers).forEach(function(id) { Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].on('message', function(data){ cluster.workers[id].on('message', function(data){
var shareProcessing = poolConfigs[data.coin].shareProcessing;
switch(data.type){ switch(data.type){
case 'share':
if (shareProcessing.internal.enabled)
shareProcessor.handleShare(data);
if (shareProcessing.mpos.enabled)
mposCompat.handleShare(data);
break;
case 'difficultyUpdate':
if (shareProcessing.mpos.enabled)
mposCompat.handleDifficultyUpdate(data);
break;
case 'block':
if (shareProcessing.internal.enabled)
shareProcessor.handleBlock(data);
break;
case 'mposAuth':
mposCompat.handleAuth(data);
break;
} }
}); });
}); });