This commit is contained in:
Chris Kleeschulte 2017-03-09 15:57:24 -05:00
parent 6a9d9bac7b
commit 750cda44ae

View File

@ -569,6 +569,10 @@ WalletService.prototype._endpointPostAddresses = function() {
}
// TODO make imdempotent
//this could be a long-running operation, so we'll return a job id
if (!self._isJobQueueReady()) {
return callback(new Error('Job queue is currently overloaded, please try again later.'), jobResults);
}
var jobId = utils.generateJobId();
self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
@ -601,6 +605,10 @@ WalletService.prototype._endpointGetTransactions = function() {
WalletService.prototype._endpointPutAddresses = function() {
var self = this;
return function(req, res) {
if (!self._isJobQueueReady()) {
return utils.sendError(new Error('Job Queue is full, current job limit: ' + self._MAX_QUEUE), res);
}
var newAddresses = req.body;
if(!Array.isArray(req.body)) {
@ -843,11 +851,14 @@ WalletService.prototype._isJobQueueReady = function() {
};
WalletService.prototype._jobCompletionCallback = function(err, results) {
log.info('Completed job: ', results.jobId);
var jobId = results.jobId;
var job = this._jobs.get(jobId);
if (!job) {
log.debug('ERROR: Could not locate job id: ' + jobId +
' in the list of jobs. It may have been purged already although it should not have.');
return;
}
@ -865,14 +876,11 @@ WalletService.prototype._jobCompletionCallback = function(err, results) {
job.reported = false;
};
//TODO: if this is running as a job, then the whole process can be moved to another CPU
WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) {
var self = this;
var results = { jobId: jobId };
if (!self._isJobQueueReady()) {
return callback(new Error('Job queue is currently overloaded, please try again later.'), results);
}
var jobResults = { jobId: jobId };
var job = {
starttime: Date.now(),
@ -885,7 +893,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
self._getAddresses(walletId, function(err, oldAddresses) {
if(err) {
return callback(err, results);
return callback(err, jobResults);
}
async.parallel(
@ -895,7 +903,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
],
function(err, results) {
if(err) {
return callback(err, results);
return callback(err, jobResults);
}
var now = Date.now();
@ -911,16 +919,19 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
self.store.batch(operations, function(err) {
if(err) {
return callback(err, results);
return callback(err, jobResults);
}
self._loadAllAddresses(function(err) {
if(err) {
return callback(err, results);
return callback(err, jobResults);
}
self._loadAllBalances(function(err) {
callback(err, results);
if(err) {
return callback(err, jobResults);
}
callback(null, jobResults);
});
});
});