Payment processing is 95% done...

This commit is contained in:
Matt 2014-03-20 16:25:59 -06:00
parent 80ec2e8bca
commit be1b1f689c
7 changed files with 131 additions and 83 deletions

View File

@ -361,6 +361,7 @@ in case the master process crashes.
for exploring your redis database. for exploring your redis database.
* Use something like [logrotator](http://www.thegeekstuff.com/2010/07/logrotate-examples/) to rotate log * Use something like [logrotator](http://www.thegeekstuff.com/2010/07/logrotate-examples/) to rotate log
output from NOMP. output from NOMP.
* Use [New Relic](http://newrelic.com/) to monitor your NOMP instance and server performance.
Donations Donations

10
init.js
View File

@ -3,6 +3,7 @@ var os = require('os');
var cluster = require('cluster'); var cluster = require('cluster');
var posix = require('posix'); var posix = require('posix');
var PoolLogger = require('./libs/logUtil.js'); var PoolLogger = require('./libs/logUtil.js');
var BlocknotifyListener = require('./libs/blocknotifyListener.js'); var BlocknotifyListener = require('./libs/blocknotifyListener.js');
@ -14,7 +15,7 @@ var Website = require('./libs/website.js');
JSON.minify = JSON.minify || require("node-json-minify"); JSON.minify = JSON.minify || require("node-json-minify");
var portalConfig = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'}))); var portalConfig = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'})));
var loggerInstance = new PoolLogger({ var loggerInstance = new PoolLogger({
logLevel: portalConfig.logLevel logLevel: portalConfig.logLevel
@ -25,6 +26,13 @@ var logWarning = loggerInstance.logWarning;
var logError = loggerInstance.logError; var logError = loggerInstance.logError;
try {
require('newrelic');
if (cluster.isMaster)
logDebug('newrelic', 'system', 'New Relic initiated');
} catch(e) {}
//Try to give process ability to handle 100k concurrent connections //Try to give process ability to handle 100k concurrent connections
try{ try{
posix.setrlimit('nofile', { soft: 100000, hard: 100000 }); posix.setrlimit('nofile', { soft: 100000, hard: 100000 });

View File

@ -5,7 +5,6 @@ var Stratum = require('stratum-pool');
module.exports = function(logger){ module.exports = function(logger){
var poolConfigs = JSON.parse(process.env.pools); var poolConfigs = JSON.parse(process.env.pools);
@ -79,49 +78,6 @@ function SetupForPool(logger, poolOptions){
connectToRedis(); connectToRedis();
/* When blocks or orphaned, all shares contributed to that round would receive no reward. That doesn't seem fair
so we still all the shares from an orphaned rounds into the current round.
*/
/*var adoptOrphanRounds = function(rounds){
var shareLookups = rounds.map(function(r){
return ['hgetall', coin + '_shares:round' + r.height]
});
var shareIncries = [];
redisClient.multi(shareLookups).exec(function(error, allWorkerShares){
if (error){
paymentLogger.error('redis', 'Error with multi get rounds share for adopting orphan');
return;
}
var workerRewards = {};
for (var i = 0; i < rounds.length; i++){
var round = rounds[i];
var workerShares = allWorkerShares[i];
var reward = round.reward * (1 - processingConfig.feePercent);
var totalShares = Object.keys(workerShares).reduce(function(p, c){
return p + parseInt(workerShares[c])
}, 0);
for (var worker in workerShares){
var percent = parseInt(workerShares[worker]) / totalShares;
var workerRewardTotal = Math.floor(reward * percent);
if (!(worker in workerRewards)) workerRewards[worker] = 0;
workerRewards[worker] += workerRewardTotal;
}
}
});
};*/
var processPayments = function(){ var processPayments = function(){
async.waterfall([ async.waterfall([
@ -130,7 +86,7 @@ function SetupForPool(logger, poolOptions){
blocks. */ blocks. */
function(callback){ function(callback){
redisClient.smembers(coin + '_blocks', function(error, results){ redisClient.smembers(coin + '_blocksPending', function(error, results){
if (error){ if (error){
logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error)); logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error));
@ -144,7 +100,7 @@ function SetupForPool(logger, poolOptions){
var rounds = results.map(function(r){ var rounds = results.map(function(r){
var details = r.split(':'); var details = r.split(':');
return {txHash: details[0], height: details[1], reward: details[2]}; return {txHash: details[0], height: details[1], reward: details[2], serialized: r};
}); });
callback(null, rounds); callback(null, rounds);
@ -217,9 +173,10 @@ function SetupForPool(logger, poolOptions){
amount owned to each miner for each round. */ amount owned to each miner for each round. */
function(confirmedRounds, orphanedRounds, callback){ function(confirmedRounds, orphanedRounds, callback){
var rounds = []; var rounds = [];
for (var i = 0; i < orphanedRounds; i++) rounds.push(orphanedRounds[i]); for (var i = 0; i < orphanedRounds.length; i++) rounds.push(orphanedRounds[i]);
for (var i = 0; i < confirmedRounds; i++) rounds.push(confirmedRounds[i]); for (var i = 0; i < confirmedRounds.length; i++) rounds.push(confirmedRounds[i]);
var shareLookups = rounds.map(function(r){ var shareLookups = rounds.map(function(r){
@ -232,6 +189,8 @@ function SetupForPool(logger, poolOptions){
return; return;
} }
// Iterate through the beginning of the share results which are for the orphaned rounds
var orphanMergeCommands = [] var orphanMergeCommands = []
for (var i = 0; i < orphanedRounds.length; i++){ for (var i = 0; i < orphanedRounds.length; i++){
var workerShares = allWorkerShares[i]; var workerShares = allWorkerShares[i];
@ -241,11 +200,10 @@ function SetupForPool(logger, poolOptions){
orphanMergeCommands.push([]); orphanMergeCommands.push([]);
} }
// Iterate through the rest of the share results which are for the worker rewards
var workerRewards = {}; var workerRewards = {};
for (var i = orphanedRounds.length; i < allWorkerShares.length; i++){
for (var i = 0; i < rounds.length; i++){
var round = rounds[i]; var round = rounds[i];
var workerShares = allWorkerShares[i]; var workerShares = allWorkerShares[i];
@ -264,6 +222,7 @@ function SetupForPool(logger, poolOptions){
} }
} }
//this calculates profit if you wanna see it //this calculates profit if you wanna see it
/* /*
var workerTotalRewards = Object.keys(workerRewards).reduce(function(p, c){ var workerTotalRewards = Object.keys(workerRewards).reduce(function(p, c){
@ -278,19 +237,19 @@ function SetupForPool(logger, poolOptions){
console.log('pool profit percent' + ((poolTotalRewards - workerTotalRewards) / poolTotalRewards)); console.log('pool profit percent' + ((poolTotalRewards - workerTotalRewards) / poolTotalRewards));
*/ */
callback(null, rounds, workerRewards); callback(null, rounds, workerRewards, orphanMergeCommands);
}); });
}, },
/* Does a batch call to redis to get worker existing balances from coin_balances*/ /* Does a batch call to redis to get worker existing balances from coin_balances*/
function(rounds, workerRewards, callback){ function(rounds, workerRewards, orphanMergeCommands, callback){
var workers = Object.keys(workerRewards); var workers = Object.keys(workerRewards);
redisClient.hmget([coin + '_balances'].concat(workers), function(error, results){ redisClient.hmget([coin + '_balances'].concat(workers), function(error, results){
if (error){ if (error){
callback('done - redis error with multi get balances') callback('done - redis error with multi get balances');
return; return;
} }
@ -302,7 +261,7 @@ function SetupForPool(logger, poolOptions){
} }
callback(null, rounds, workerRewards, workerBalances) callback(null, rounds, workerRewards, workerBalances, orphanMergeCommands);
}); });
}, },
@ -314,29 +273,97 @@ function SetupForPool(logger, poolOptions){
when deciding the sent balance, it the difference should be -1*amount they had in db, when deciding the sent balance, it the difference should be -1*amount they had in db,
if not sending the balance, the differnce should be +(the amount they earned this round) if not sending the balance, the differnce should be +(the amount they earned this round)
*/ */
function(rounds, workerRewards, workerBalances, callback){ function(rounds, workerRewards, workerBalances, orphanMergeCommands, callback){
/* if payments dont succeed (likely because daemon isnt responding to rpc), then cancel here var magnitude = rounds[0].magnitude;
so that all of this can be tried again when the daemon is working. otherwise we will consider
payment sent after we cleaned up the db.
*/
/* In here do daemon.getbalance, figure out how many payments should be sent, see if the daemon.cmd('getbalance', [], function(results){
remaining balance after payments-to-be sent is greater than the min reserver, otherwise
put everything in worker balances to be paid next time. var totalBalance = results[0].response * magnitude;
var toBePaid = 0;
var workerPayments = {};
*/ var balanceUpdateCommands = [];
var workerPayoutsCommand = [];
for (var worker in workerRewards){
workerPayments[worker] = (workerPayments[worker] || 0) + workerRewards[worker];
}
for (var worker in workerBalances){
workerPayments[worker] = (workerPayments[worker] || 0) + workerBalances[worker];
}
for (var worker in workerPayments){
if (workerPayments[worker] < processingConfig.minimumPayment * magnitude){
balanceUpdateCommands.push(['hincrby', coin + '_balances', worker, workerRewards[worker]]);
delete workerPayments[worker];
}
else{
if (workerBalances[worker] !== 0)
balanceUpdateCommands.push(['hincrby', coin + '_balances', worker, -1 * workerBalances[worker]]);
workerPayoutsCommand.push(['hincrby', coin + '_balances', worker, workerRewards[worker]]);
toBePaid += workerPayments[worker];
}
}
var balanceLeftOver = totalBalance - toBePaid;
var minReserveSatoshis = processingConfig.minimumReserve * magnitude;
if (balanceLeftOver < minReserveSatoshis){
callback('done - payments would wipe out minimum reserve, tried to pay out ' + toBePaid +
' but only have ' + totalBalance + '. Left over balance would be ' + balanceLeftOver +
', needs to be at least ' + minReserveSatoshis);
return;
}
var movePendingCommands = [];
var deleteRoundsCommand = ['del'];
rounds.forEach(function(r){
var destinationSet = r.category === 'orphan' ? '_blocksOrphaned' : '_blocksConfirmed';
movePendingCommands.push(['smove', coin + '_blocksPending', coin + destinationSet, r.serialized]);
deleteRoundsCommand.push(coin + '_shares:round' + r.height)
});
var finalRedisCommands = [];
finalRedisCommands = finalRedisCommands.concat(
movePendingCommands,
orphanMergeCommands,
balanceUpdateCommands,
workerPayoutsCommand
);
finalRedisCommands.push(deleteRoundsCommand);
finalRedisCommands.push(['hincrby', coin + '_stats', 'totalPaid', toBePaid]);
callback(null, magnitude, workerPayments, finalRedisCommands);
});
}, },
function(magnitude, workerPayments, finalRedisCommands, callback){
var sendManyCmd = ['', {}];
for (var address in workerPayments){
sendManyCmd[1][address] = workerPayments[address] / magnitude;
}
console.log(JSON.stringify(finalRedisCommands, null, 4));
console.log(JSON.stringify(workerPayments, null, 4));
console.log(JSON.stringify(sendManyCmd, null, 4));
return; //not yet...
daemon.cmd(sendManyCmd, function(error, result){
//if successful then do finalRedisCommands
});
/* clean DB: update remaining balances in coin_balance hashset in redis
*/
function(balanceDifference, rounds, callback){
//SMOVE each tx key from coin_blocks to coin_processedBlocks
//HINCRBY to apply balance different for coin_balances worker1
} }
], function(error, result){ ], function(error, result){

View File

@ -2,6 +2,8 @@ var Stratum = require('stratum-pool');
var Vardiff = require('stratum-pool/lib/varDiff.js'); var Vardiff = require('stratum-pool/lib/varDiff.js');
var net = require('net'); var net = require('net');
var MposCompatibility = require('./mposCompatibility.js'); var MposCompatibility = require('./mposCompatibility.js');
var ShareProcessor = require('./shareProcessor.js'); var ShareProcessor = require('./shareProcessor.js');
@ -170,16 +172,12 @@ module.exports = function(logger){
}); });
Object.keys(portalConfig.proxy.ports).forEach(function (port) { Object.keys(portalConfig.proxy.ports).forEach(function (port) {
proxyStuff.proxys[port] = net .createServer({allowHalfOpen: true}, function(socket) {
proxyStuff.proxys[port] = net console.log(proxyStuff.curActivePool);
.createServer({allowHalfOpen: true}, function(socket) { pools[proxyStuff.curActivePool].getStratumServer().handleNewClient(socket);
console.log(proxyStuff.curActivePool); }).listen(parseInt(port), function(){
pools[proxyStuff.curActivePool].getStratumServer().handleNewClient(socket); console.log("Proxy listening on " + port);
} ) });
.listen(parseInt(port), function(){
console.log("Proxy listening on "+port);
});
}); });

View File

@ -2,6 +2,7 @@ var redis = require('redis');
var Stratum = require('stratum-pool'); var Stratum = require('stratum-pool');
/* /*
This module deals with handling shares when in internal payment processing mode. It connects to a redis This module deals with handling shares when in internal payment processing mode. It connects to a redis
database and inserts shares with the database structure of: database and inserts shares with the database structure of:
@ -65,7 +66,7 @@ module.exports = function(logger, poolConfig){
if (isValidBlock){ if (isValidBlock){
redisCommands.push(['rename', coin + '_shares:roundCurrent', coin + '_shares:round' + shareData.height]); redisCommands.push(['rename', coin + '_shares:roundCurrent', coin + '_shares:round' + shareData.height]);
redisCommands.push(['sadd', coin + '_blocks', shareData.tx + ':' + shareData.height + ':' + shareData.reward]); redisCommands.push(['sadd', coin + '_blocksPending', shareData.tx + ':' + shareData.height + ':' + shareData.reward]);
redisCommands.push(['hincrby', coin + '_stats', 'validBlocks', 1]); redisCommands.push(['hincrby', coin + '_stats', 'validBlocks', 1]);
} }
else if (shareData.solution){ else if (shareData.solution){

View File

@ -51,7 +51,7 @@ module.exports = function(logger, portalConfig, poolConfigs){
redisCommands.push(['zremrangebyscore', coin + '_hashrate', '-inf', '(' + windowTime]); redisCommands.push(['zremrangebyscore', coin + '_hashrate', '-inf', '(' + windowTime]);
redisCommands.push(['zrangebyscore', coin + '_hashrate', windowTime, '+inf']); redisCommands.push(['zrangebyscore', coin + '_hashrate', windowTime, '+inf']);
redisCommands.push(['hgetall', coin + '_stats']); redisCommands.push(['hgetall', coin + '_stats']);
redisCommands.push(['scard', coin + '_blocks']); redisCommands.push(['scard', coin + '_blocksPending']);
}); });

View File

@ -69,15 +69,25 @@ module.exports = function(logger){
var pageTemplates = {}; var pageTemplates = {};
var pageProcessed = {}; var pageProcessed = {};
var indexesProcessed = {};
var processTemplates = function(){ var processTemplates = function(){
for (var pageName in pageTemplates){ for (var pageName in pageTemplates){
if (pageName === 'index') continue;
pageProcessed[pageName] = pageTemplates[pageName]({ pageProcessed[pageName] = pageTemplates[pageName]({
poolsConfigs: poolConfigs, poolsConfigs: poolConfigs,
stats: portalStats.stats, stats: portalStats.stats,
portalConfig: portalConfig portalConfig: portalConfig
}); });
indexesProcessed[pageName] = pageTemplates.index({
page: pageProcessed[pageName],
selected: pageName,
stats: portalStats.stats,
poolConfigs: poolConfigs,
portalConfig: portalConfig
});
} }
}; };
@ -138,7 +148,7 @@ module.exports = function(logger){
var route = function(req, res, next){ var route = function(req, res, next){
var pageId = req.params.page || ''; var pageId = req.params.page || '';
var requestedPage = getPage(pageId); /*var requestedPage = getPage(pageId);
if (requestedPage){ if (requestedPage){
var data = pageTemplates.index({ var data = pageTemplates.index({
page: requestedPage, page: requestedPage,
@ -148,6 +158,9 @@ module.exports = function(logger){
portalConfig: portalConfig portalConfig: portalConfig
}); });
res.end(data); res.end(data);
}*/
if (pageId in indexesProcessed){
res.end(indexesProcessed[pageId]);
} }
else else
next(); next();