Added support for multiple daemons/rpc for redundancy when they occasionally die or drop out-of-sync

This commit is contained in:
Matt 2014-02-26 18:16:18 -07:00
parent 37ab36f776
commit 2d3416a02f
6 changed files with 178 additions and 177 deletions

View File

@ -98,18 +98,29 @@ var pool = stratum.createPool({
stratumPort: 3334,
//instanceId: 37, //I recommend not to use this option as a crypto-random one will be generated
difficulty: 32,
blockRefreshInterval: 2, //seconds
daemon: {
host: "localhost",
port: 19334,
user: "testnet",
password: "testnet"
},
blockRefreshInterval: 2000, //milliseconds
/* Recommended to have at least two daemon instances running in case one drops out-of-sync or offline. For
redundancy, all instances will be polled for block/transaction updates and be used for submitting blocks */
daemon: [
{ //main daemon instance
host: "localhost",
port: 19334,
user: "testnet",
password: "testnet"
},
{ //backup daemon instance
host: "localhost",
port: 19335,
user: "testnet",
password: "testnet"
}
],
varDiff: {
enabled: true, //set to false to disable vardiff functionality
minDifficulty: 16, //minimum difficulty. below 16 will cause problems
maxDifficulty: 1000, //network difficulty will be used if it is lower than this
daemonDiffUpdateFrequency: 3600, //get the network difficulty every this many seconds
targetTime: 30, //target time per share (i.e. try to get 1 share per this many seconds)
retargetTime: 120, //check to see if we should retarget every this many seconds
variancePercent: 20 //allow average time to very this % from target without retarget

View File

@ -1,11 +1,12 @@
var http = require('http');
var cp = require('child_process');
var events = require('events');
var startFailedTimeout = 120; //seconds
var async = require('async');
/**
* The daemon interface interacts with the coin daemon by using the rpc interface.
* in order to make it work it needs, as constructor, an object containing
* in order to make it work it needs, as constructor, an array of objects containing
* - 'host' : hostname where the coin lives
* - 'port' : port where the coin accepts rpc connections
* - 'user' : username of the coin for the rpc interface
@ -17,34 +18,21 @@ function DaemonInterface(options){
//private members
var _this = this;
this.options = options;
var persistentConnection;
var instances = (function(){
for (var i = 0; i < options.length; i++)
options[i]['instance'] = i;
return options;
})();
(function init(){
isOnline(function(online){
if (online)
_this.emit('online');
else if (options.startIfOffline){
me.start();
emitOnline();
}
});
})();
function emitOnline(){
var startedTime = Date.now();
var checkFunc = function(){
isOnline(function(online){
if (online)
_this.emit('online');
else if (Date.now() - startedTime < startFailedTimeout * 1000)
setTimeout(checkFunc, 2000);
else
_this.emit('startFailed');
});
};
checkFunc();
}
function isOnline(callback){
cmd('getinfo', [], function(error, result){
if (error)
@ -54,53 +42,69 @@ function DaemonInterface(options){
});
}
function cmd(method, params, callback){
var requestJson = JSON.stringify({
id: Date.now() + Math.floor(Math.random() * 10),
method: method,
params: params
});
/* Sends a JSON RPC (http://json-rpc.org/wiki/specification) command to every configured daemon.
The callback function is fired once with the result from each daemon unless streamResults is
set to true. */
function cmd(method, params, callback, streamResults){
var options = {
hostname: (typeof(_this.options.host) === 'undefined'?'localhost':_this.options.host),
port : _this.options.port,
method : 'POST',
auth : _this.options.user + ':' + _this.options.password,
headers : {
'Content-Length': requestJson.length
async.map(instances, function(instance, mapCallback){
var multiCallback = streamResults ? callback : mapCallback;
var tries = 5;
var requestJson = JSON.stringify({
id: Date.now() + Math.floor(Math.random() * 10),
method: method,
params: params
});
var options = {
hostname: (typeof(instance.host) === 'undefined'?'localhost':instance.host),
port : instance.port,
method : 'POST',
auth : instance.user + ':' + instance.password,
headers : {
'Content-Length': requestJson.length
}
};
var req = http.request(options, function(res) {
var data = '';
res.setEncoding('utf8');
res.on('data', function (chunk) {
data += chunk;
});
res.on('end', function(){
var dataJson;
try{
dataJson = JSON.parse(data);
}
catch(e){
_this.emit('error', 'could not parse rpc data from method: ' + method +
' on instance ' + JSON.stringify(instance));
}
if (typeof(dataJson) !== 'undefined')
multiCallback(dataJson.error, dataJson.result);
});
});
req.on('error', function(e) {
if (e.code === 'ECONNREFUSED')
multiCallback({type: 'offline', message: e.message});
else
multiCallback({type: 'request error', message: e.message});
});
req.end(requestJson);
}, function(err, results){
if (!streamResults){
callback(err, results);
}
};
var req = http.request(options, function(res) {
var data = '';
res.setEncoding('utf8');
res.on('data', function (chunk) {
data += chunk;
});
res.on('end', function(){
var dataJson;
try{
dataJson = JSON.parse(data);
}
catch(e){
_this.emit('error', 'daemon interface could not parse rpc data from method: ' + method + ' ' + options.hostname);
}
if (typeof(dataJson) !== 'undefined')
callback(dataJson.error, dataJson.result);
});
});
req.on('error', function(e) {
if (e.code === 'ECONNREFUSED')
callback({type: 'offline', message: e.message});
else
callback({type: 'request error', message: e.message});
});
req.end(requestJson);
}
@ -108,17 +112,6 @@ function DaemonInterface(options){
this.isOnline = isOnline;
this.cmd = cmd;
this.start = function(){
var cmdArgs = [
'-rpcport=' + _this.options.port,
'-rpcuser=' + _this.options.user,
'-rpcpassword=' + _this.options.password,
'-blocknotify=' + _this.options.blocknotify
];
var child = cp.spawn(_this.options.bin, cmdArgs, { detached: true, stdio: [ 'ignore', 'ignore', 'ignore' ] });
child.unref();
console.log('started daemon');
};
}
DaemonInterface.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -57,22 +57,6 @@ var JobManager = module.exports = function JobManager(options){
var jobCounter = new JobCounter();
/**
* It only checks if the blockTemplate is already in our jobs list.
* @returns true if it's a new block, false otherwise.
* used by onNewTemplate
**/
function CheckNewIfNewBlock(prevBlockHash){
if (typeof(_this.currentJob) === 'undefined') {
return true;
} else if (_this.currentJob.rpcData.previousblockhash !== prevBlockHash) {
return true;
} else {
return false;
}
}
//Which number to use as dividend when converting difficulty to target
var diffDividend = bignum((function(){
switch(options.algorithm){
@ -124,12 +108,27 @@ var JobManager = module.exports = function JobManager(options){
this.currentJob;
this.validJobs = {};
//returns true if processed a new block
this.processTemplate = function(rpcData, publicKey){
var isNewBlock = CheckNewIfNewBlock(rpcData.previousblockhash);
/* Block is new if A) its the first block we have seen so far or B) the blockhash is different and the
block height is greater than the one we have */
var isNewBlock = typeof(_this.currentJob) === 'undefined';
if (!isNewBlock && _this.currentJob.rpcData.previousblockhash !== rpcData.previousblockhash){
isNewBlock = true;
//If new block is outdated/out-of-sync than return
if (rpcData.height < _this.currentJob.rpcData.height)
return;
}
/* If block isn't new, lets see if the transactions have updated */
var updatedTransactions = !isNewBlock &&
(_this.currentJob.rpcData.transactions.length != rpcData.transactions.length)
//Update current job if new block or new transactions
if (isNewBlock || _this.currentJob.rpcData.transactions.length != rpcData.transactions.length){
if (isNewBlock || updatedTransactions){
var tmpBlockTemplate = new blockTemplate(
jobCounter.next(),
@ -147,12 +146,16 @@ var JobManager = module.exports = function JobManager(options){
this.validJobs = {};
_this.emit('newBlock', tmpBlockTemplate);
}
else
else{
//emit when transactions have updated
_this.emit('updatedBlock', tmpBlockTemplate, true);
}
this.validJobs[tmpBlockTemplate.jobId] = tmpBlockTemplate;
}
return isNewBlock;
};
this.processShare = function(jobId, difficulty, extraNonce1, extraNonce2, nTime, nonce, ipAddress, workerName){

View File

@ -29,6 +29,7 @@ var pool = module.exports = function pool(options, authorizeFn){
var _this = this;
var publicKeyBuffer;
var maxDifficulty = 0x00000000ffff0000000000000000000000000000000000000000000000000000;
var emitLog = function(key, text) { _this.emit('log', 'debug' , key, text); };
var emitWarningLog = function(key, text) { _this.emit('log', 'warning', key, text); };
@ -63,12 +64,7 @@ var pool = module.exports = function pool(options, authorizeFn){
return;
}
_this.varDiff = new varDiff(options.varDiff, options.difficulty);
_this.varDiff.on('difficultyRequest', function(){
emitLog('varDiff', 'Difficulty requested for vardiff');
if (_this.stratumServer)
RequestDifficulty(function(){});
}).on('newDifficulty', function(client, newDiff) {
_this.varDiff.on('newDifficulty', function(client, newDiff) {
if (options.varDiff.mode === 'safe'){
/* We request to set the newDiff @ the next difficulty retarget
@ -81,29 +77,13 @@ var pool = module.exports = function pool(options, authorizeFn){
so the miner doesn't restart work and submit duplicate shares */
client.sendDifficulty(newDiff);
var job = _this.jobManager.currentJob.getJobParams();
job[8] = false;
//job[8] = false;
client.sendMiningJob(job);
}
});
emitLog("system", "VarDiff enabled and setup");
}
function RequestDifficulty(callback){
_this.daemon.cmd('getmininginfo',
[],
function(error, result){
if (error) {
emitErrorLog('getdifficulty', 'Error requesting difficulty from daemon for vardiff');
} else {
if (options.varDiff.enabled)
_this.varDiff.setNetworkDifficulty(result.difficulty);
callback(error, result);
}
}
);
}
/*
Coin daemons either use submitblock or getblocktemplate for submitting new blocks
*/
@ -112,10 +92,9 @@ var pool = module.exports = function pool(options, authorizeFn){
_this.daemon.cmd('submitblock',
[blockHex],
function(error, result){
emitLog('submitblock', JSON.stringify([error,result]));
if (error)
emitErrorLog('submitblock', 'rpc error when submitting block with submitblock')
emitErrorLog('submitblock', 'rpc error when submitting block with submitblock ' +
JSON.stringify(error));
else
emitLog('submitblock', 'Submitted Block using submitblock');
callback();
@ -125,10 +104,9 @@ var pool = module.exports = function pool(options, authorizeFn){
_this.daemon.cmd('getblocktemplate',
[{'mode': 'submit', 'data': blockHex}],
function(error, result){
emitLog('submitblock', JSON.stringify([error,result]));
if (error)
emitErrorLog('submitblock', 'rpc error when submitting block with getblocktemplate')
emitErrorLog('submitblock', 'rpc error when submitting block with getblocktemplate' +
JSON.stringify(error));
else
emitLog('submitblock', 'Submitted Block using getblocktemplate');
callback()
@ -190,53 +168,63 @@ var pool = module.exports = function pool(options, authorizeFn){
function SetupDaemonInterface(){
//emitLog('system','Connecting to daemon');
_this.daemon = new daemon.interface(options.daemon);
_this.daemon.on('online', function(){
_this.daemon.once('online', function(){
async.parallel({
networkDifficulty: RequestDifficulty,
addressInfo: function(callback){
_this.daemon.cmd('validateaddress',
[options.address],
function(error, result){
if (error){
emitErrorLog('system','validateaddress rpc error');
callback(error);
} else if (!result.isvalid) {
emitErrorLog('system','address is not valid');
callback("address-not-valid");
} else {
callback(error, result);
}
_this.daemon.cmd('validateaddress', [options.address], function(error, results){
if (error){
emitErrorLog('system','validateaddress rpc error');
callback(error);
}
);
//Make sure address is valid with each daemon
else if (results.every(function(result, index){
if (!result.isvalid)
emitErrorLog('system', 'Daemon instance ' + index + ' reports address is not valid');
return result.isvalid;
})){
callback(null, results[0]);
} else {
callback("address-not-valid");
}
});
},
miningInfo: function(callback){
_this.daemon.cmd('getmininginfo', [], function(error, result){
if (!error && result){
emitLog('system', 'Daemon is running on ' +
(result.testnet ? 'testnet' : 'live blockchain'))
}
else
emitErrorLog('system', 'getmininginfo on init failed');
_this.daemon.cmd('getmininginfo', [], function(error, results){
if (!error && results){
// Print which network each daemon is running on
results.forEach(function(result, index){
var network = result.testnet ? 'testnet' : 'live blockchain';
emitLog('system', 'Daemon instance ' + index + ' is running on ' + network);
});
// Find and return the result with the largest block height (most in-sync)
var largestHeight = results.sort(function(a, b){
return b.blocks - a.blocks;
})[0];
callback(null, largestHeight);
}
else{
emitErrorLog('system', 'getmininginfo on init failed');
callback(error, results);
}
callback(error, result);
});
},
submitMethod: function(callback){
/* This checks to see whether the daemon uses submitblock
or getblocktemplate for submitting new blocks */
_this.daemon.cmd('submitblock',
[],
function(error, result){
if (error && error.message === 'Method not found')
callback(null, false);
else
callback(null, true);
}
);
_this.daemon.cmd('submitblock', [], function(error, result){
if (error && error.message === 'Method not found')
callback(null, false);
else
callback(null, true);
});
}
}, function(err, results){
if (err){
emitErrorLog('system', 'Failed to daemon');
emitErrorLog('system', 'Failed to connect daemon ' + JSON.stringify(err));
return;
}
@ -244,6 +232,9 @@ var pool = module.exports = function pool(options, authorizeFn){
options.hasSubmitMethod = results.submitMethod;
if (options.varDiff.enabled)
_this.varDiff.setNetworkDifficulty(results.miningInfo.difficulty);
if (options.reward === 'POS' && typeof(results.addressInfo.pubkey) == 'undefined') {
// address provided is not of the wallet.
emitErrorLog('system', 'The address provided is not from the daemon wallet.');
@ -254,9 +245,9 @@ var pool = module.exports = function pool(options, authorizeFn){
util.script_to_address(results.addressInfo.address) :
util.script_to_pubkey(results.addressInfo.pubkey);
if (options.difficulty > results.networkDifficulty && options.difficulty > 16){
var newDiff = results.networkDifficulty > 16 ? results.networkDifficulty : 16;
emitWarningLog('system', 'pool difficulty was set higher than network difficulty of ' + results.networkDifficulty);
if (options.difficulty > results.miningInfo.difficulty && options.difficulty > 16){
var newDiff = results.miningInfo.difficulty > 16 ? results.miningInfo.difficulty : 16;
emitWarningLog('system', 'pool difficulty was set higher than network difficulty of ' + results.miningInfo.difficulty);
emitWarningLog('system', 'lowering pool diff from ' + options.difficulty + ' to ' + newDiff);
options.difficulty = newDiff
@ -279,7 +270,7 @@ var pool = module.exports = function pool(options, authorizeFn){
}
});
}).on('startFailed', function(){
}).on('connectionFailed', function(instance){
emitErrorLog('system','Failed to start daemon');
}).on('error', function(message){
emitErrorLog('system', message);
@ -345,12 +336,12 @@ var pool = module.exports = function pool(options, authorizeFn){
function SetupBlockPolling(){
if (options.blockRefreshInterval === 0){
if (typeof options.blockRefreshInterval === "number" && options.blockRefreshInterval > 0){
emitLog('system', 'Block template polling has been disabled');
return;
}
var pollingInterval = options.blockRefreshInterval * 1000;
var pollingInterval = options.blockRefreshInterval;
setInterval(function () {
GetBlockTemplate(function(error, result) {
@ -371,10 +362,16 @@ var pool = module.exports = function pool(options, authorizeFn){
if (error) {
callback(error);
} else {
_this.jobManager.processTemplate(result, publicKeyBuffer);
var processedNewBlock = _this.jobManager.processTemplate(result, publicKeyBuffer);
if (processedNewBlock && options.varDiff.enabled)
_this.varDiff.setNetworkDifficulty(maxDifficulty / parseInt(result.target, 16));
callback(null, result);
callback = function(){};
}
}
}, true
);
}

View File

@ -228,6 +228,7 @@ var StratumClient = function(options){
method: "mining.set_difficulty",
params: [difficulty]//[512],
});
console.log('new diff sent ' + difficulty);
return true;

View File

@ -51,10 +51,6 @@ var varDiff = module.exports = function varDiff(options, poolDifficulty){
var tMax = options.targetTime + variance;
setInterval(function(){
_this.emit('difficultyRequest');
}, options.daemonDiffUpdateFrequency * 1000);
this.setNetworkDifficulty = function(diff){
networkDifficulty = diff;