Better error handling when dealing with multiple RPC daemons

This commit is contained in:
Matt 2014-02-27 11:27:09 -07:00
parent 16807d3226
commit 6cc97aff5b
2 changed files with 150 additions and 98 deletions

View File

@ -21,7 +21,7 @@ function DaemonInterface(options){
var instances = (function(){ var instances = (function(){
for (var i = 0; i < options.length; i++) for (var i = 0; i < options.length; i++)
options[i]['instance'] = i; options[i]['index'] = i;
return options; return options;
})(); })();
@ -34,11 +34,11 @@ function DaemonInterface(options){
})(); })();
function isOnline(callback){ function isOnline(callback){
cmd('getinfo', [], function(error, result){ cmd('getinfo', [], function(results){
if (error) var allOnline = results.every(function(result){
callback(false); return !results.error;
else });
callback(true); callback(allOnline);
}); });
} }
@ -48,11 +48,17 @@ function DaemonInterface(options){
set to true. */ set to true. */
function cmd(method, params, callback, streamResults){ function cmd(method, params, callback, streamResults){
async.map(instances, function(instance, mapCallback){ var results = [];
var multiCallback = streamResults ? callback : mapCallback; async.each(instances, function(instance, eachCallback){
var tries = 5; var itemFinished = function(error, result){
var returnObj = {error: error, response: result, instance: instance};
if (streamResults) callback(returnObj);
else results.push(returnObj);
eachCallback();
itemFinished = function(){};
};
var requestJson = JSON.stringify({ var requestJson = JSON.stringify({
id: Date.now() + Math.floor(Math.random() * 10), id: Date.now() + Math.floor(Math.random() * 10),
@ -61,7 +67,7 @@ function DaemonInterface(options){
}); });
var options = { var options = {
hostname: (typeof(instance.host) === 'undefined'?'localhost':instance.host), hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host),
port : instance.port, port : instance.port,
method : 'POST', method : 'POST',
auth : instance.user + ':' + instance.password, auth : instance.user + ':' + instance.password,
@ -78,30 +84,36 @@ function DaemonInterface(options){
}); });
res.on('end', function(){ res.on('end', function(){
var dataJson; var dataJson;
var parsingError;
try{ try{
dataJson = JSON.parse(data); dataJson = JSON.parse(data);
} }
catch(e){ catch(e){
parsingError = e;
_this.emit('error', 'could not parse rpc data from method: ' + method + _this.emit('error', 'could not parse rpc data from method: ' + method +
' on instance ' + JSON.stringify(instance)); ' on instance ' + instance.index + ' data: ' + data);
} }
if (typeof(dataJson) !== 'undefined') if (typeof(dataJson) !== 'undefined')
multiCallback(dataJson.error, dataJson.result); itemFinished(dataJson.error, dataJson.result);
else
itemFinished(parsingError);
}); });
}); });
req.on('error', function(e) { req.on('error', function(e) {
if (e.code === 'ECONNREFUSED') if (e.code === 'ECONNREFUSED')
multiCallback({type: 'offline', message: e.message}); itemFinished({type: 'offline', message: e.message}, null);
else else
multiCallback({type: 'request error', message: e.message}); itemFinished({type: 'request error', message: e.message}, null);
}); });
req.end(requestJson); req.end(requestJson);
}, function(err, results){ }, function(){
if (!streamResults){ if (!streamResults){
callback(err, results); callback(results);
} }
}); });

View File

@ -89,31 +89,36 @@ var pool = module.exports = function pool(options, authorizeFn){
Coin daemons either use submitblock or getblocktemplate for submitting new blocks Coin daemons either use submitblock or getblocktemplate for submitting new blocks
*/ */
function SubmitBlock(blockHex, callback){ function SubmitBlock(blockHex, callback){
if (options.hasSubmitMethod) {
_this.daemon.cmd('submitblock', var rcpCommand, rpcArgs;
[blockHex], if (options.hasSubmitMethod){
function(error, result){ rcpCommand = 'submitblock';
if (error) rpcArgs = [blockHex];
emitErrorLog('submitblock', 'rpc error when submitting block with submitblock ' +
JSON.stringify(error));
else
emitLog('submitblock', 'Submitted Block using submitblock');
callback();
}
);
} else {
_this.daemon.cmd('getblocktemplate',
[{'mode': 'submit', 'data': blockHex}],
function(error, result){
if (error)
emitErrorLog('submitblock', 'rpc error when submitting block with getblocktemplate' +
JSON.stringify(error));
else
emitLog('submitblock', 'Submitted Block using getblocktemplate');
callback()
}
);
} }
else{
rpcCommand = 'getblocktemplate';
rcpArgs = [{'mode': 'submit', 'data': blockHex}];
}
_this.daemon.cmd(rcpCommand,
rpcArgs,
function(results){
results.forEach(function(result){
if (result.error)
emitErrorLog('submitblock', 'rpc error with daemon instance ' +
result.instance.index + ' when submitting block with ' + rpcCommand + ' ' +
JSON.stringify(result.error)
);
else
emitLog('submitblock', 'Submitted Block using ' + rpcCommand + ' to daemon instance ' +
result.instance.index
);
});
callback();
}
);
} }
@ -167,71 +172,109 @@ var pool = module.exports = function pool(options, authorizeFn){
function SetupDaemonInterface(){ function SetupDaemonInterface(){
//emitLog('system','Connecting to daemon'); emitLog('system', 'Connecting to daemon(s)');
_this.daemon = new daemon.interface(options.daemon); _this.daemon = new daemon.interface(options.daemon);
_this.daemon.once('online', function(){ _this.daemon.once('online', function(){
async.parallel({ async.parallel({
addressInfo: function(callback){ addressInfo: function(callback){
_this.daemon.cmd('validateaddress', [options.address], function(error, results){ _this.daemon.cmd('validateaddress', [options.address], function(results){
if (error){
emitErrorLog('system','validateaddress rpc error');
callback(error);
}
//Make sure address is valid with each daemon //Make sure address is valid with each daemon
else if (results.every(function(result, index){ var allValid = results.every(function(result){
if (!result.isvalid) if (result.error || !result.response){
emitErrorLog('system', 'Daemon instance ' + index + ' reports address is not valid'); emitErrorLog('system','validateaddress rpc error on daemon instance ' +
return result.isvalid; result.instance.index + ', error +' + JSON.stringify(result.error));
})){ }
callback(null, results[0]); else if (!result.response.isvalid)
} else { emitErrorLog('system', 'Daemon instance ' + result.instance.index +
callback("address-not-valid"); ' reports address is not valid');
return result.response && result.response.isvalid;
});
if (allValid)
callback(null, results[0].response);
else{
callback('not all addresses are valid')
} }
}); });
}, },
miningInfo: function(callback){ miningInfo: function(callback){
_this.daemon.cmd('getmininginfo', [], function(error, results){ _this.daemon.cmd('getmininginfo', [], function(results){
if (!error && results){
// Print which network each daemon is running on // Print which network each daemon is running on
results.forEach(function(result, index){
var network = result.testnet ? 'testnet' : 'live blockchain';
emitLog('system', 'Daemon instance ' + index + ' is running on ' + network);
});
// Find and return the result with the largest block height (most in-sync) var isTestnet;
var largestHeight = results.sort(function(a, b){ var allValid = results.every(function(result){
return b.blocks - a.blocks;
})[0];
if (options.varDiff.enabled) if (result.error){
_this.varDiff.setNetworkDifficulty(largestHeight.difficulty); emitErrorLog('system', 'getmininginfo on init failed with daemon instance ' +
result.instance.index + ', error ' + JSON.stringify(result.error)
);
return false;
}
emitLog('network', 'Current block height at ' + largestHeight.blocks + var network = result.response.testnet ? 'testnet' : 'live blockchain';
' with difficulty of ' + largestHeight.difficulty); emitLog('system', 'Daemon instance ' + result.instance.index + ' is running on ' + network);
callback(null, largestHeight); if (typeof isTestnet === 'undefined'){
} isTestnet = result.response.testnet;
else{ return true;
emitErrorLog('system', 'getmininginfo on init failed'); }
callback(error, results); else if (isTestnet !== result.response.testnet){
emitErrorLog('system', 'not all daemons are on same network');
return false;
}
else
return true;
});
if (!allValid){
callback('could not getmininginfo correctly on each daemon');
return;
} }
//Find and return the response with the largest block height (most in-sync)
var largestHeight = results.sort(function(a, b){
return b.response.blocks - a.response.blocks;
})[0].response;
if (options.varDiff.enabled)
_this.varDiff.setNetworkDifficulty(largestHeight.difficulty);
emitLog('network', 'Current block height at ' + largestHeight.blocks +
' with difficulty of ' + largestHeight.difficulty);
callback(null, largestHeight);
}); });
}, },
submitMethod: function(callback){ submitMethod: function(callback){
/* This checks to see whether the daemon uses submitblock /* This checks to see whether the daemon uses submitblock
or getblocktemplate for submitting new blocks */ or getblocktemplate for submitting new blocks */
_this.daemon.cmd('submitblock', [], function(error, result){ _this.daemon.cmd('submitblock', [], function(results){
if (error && error.message === 'Method not found') var couldNotDetectMethod = results.every(function(result){
callback(null, false); if (result.error && result.error.message === 'Method not found'){
else callback(null, false);
callback(null, true); return false;
}
else if (result.error && result.error.code === -1){
callback(null, true);
return false;
}
else
return true;
});
if (couldNotDetectMethod){
emitErrorLog('system', 'Could not detect block submission RPC method');
callback('block submission detection failed');
}
}); });
} }
}, function(err, results){ }, function(err, results){
if (err){ if (err){
emitErrorLog('system', 'Failed to connect daemon ' + JSON.stringify(err)); emitErrorLog('system', 'Could not start pool, ' + JSON.stringify(err));
return; return;
} }
@ -263,7 +306,7 @@ var pool = module.exports = function pool(options, authorizeFn){
GetBlockTemplate(function(error, result){ GetBlockTemplate(function(error, result){
if (error){ if (error){
console.error(error); console.error(error);
emitErrorLog('system', 'Error with initial getblocktemplate'); emitErrorLog('system', 'Error with getblocktemplate on initializing');
} }
else{ else{
SetupBlockPolling(); SetupBlockPolling();
@ -348,12 +391,7 @@ var pool = module.exports = function pool(options, authorizeFn){
var pollingInterval = options.blockRefreshInterval; var pollingInterval = options.blockRefreshInterval;
setInterval(function () { setInterval(function () {
GetBlockTemplate(function(error, result) { GetBlockTemplate(function(error, result){});
if (error) {
emitErrorLog('system', "Block polling error getting block template for " + options.name)
}
});
}, pollingInterval); }, pollingInterval);
emitLog('system', 'Block polling every ' + pollingInterval + ' milliseconds'); emitLog('system', 'Block polling every ' + pollingInterval + ' milliseconds');
} }
@ -361,16 +399,18 @@ var pool = module.exports = function pool(options, authorizeFn){
function GetBlockTemplate(callback){ function GetBlockTemplate(callback){
_this.daemon.cmd('getblocktemplate', _this.daemon.cmd('getblocktemplate',
[{"capabilities": [ "coinbasetxn", "workid", "coinbase/append" ]}], [{"capabilities": [ "coinbasetxn", "workid", "coinbase/append" ]}],
function(error, result){ function(result){
if (error) { if (result.error){
callback(error); emitErrorLog('system', 'getblocktemplate call failed for daemon instance ' +
result.instance.index + ' with error ' + JSON.stringify(result.error));
callback(result.error);
} else { } else {
var processedNewBlock = _this.jobManager.processTemplate(result, publicKeyBuffer); var processedNewBlock = _this.jobManager.processTemplate(result.response, publicKeyBuffer);
if (processedNewBlock && options.varDiff.enabled) if (processedNewBlock && options.varDiff.enabled)
_this.varDiff.setNetworkDifficulty(_this.jobManager.currentJob.difficulty); _this.varDiff.setNetworkDifficulty(_this.jobManager.currentJob.difficulty);
callback(null, result); callback(null, result.response);
callback = function(){}; callback = function(){};
} }
}, true }, true
@ -380,14 +420,14 @@ var pool = module.exports = function pool(options, authorizeFn){
function CheckBlockAccepted(blockHash, callback){ function CheckBlockAccepted(blockHash, callback){
_this.daemon.cmd('getblock', _this.daemon.cmd('getblock',
[blockHash], [blockHash],
function(error, results){ function(results){
if (error){ if (results.filter(function(result){return result.response &&
(result.response.hash === blockHash)}).length >= 1){
callback(true);
}
else{
callback(false); callback(false);
} }
else if (results.filter(function(result){return result.hash === blockHash}).length >= 1)
callback(true);
else
callback(false);
} }
); );
} }