From 750cda44aea5c12eb8bd7ec573415c8932f6092e Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 9 Mar 2017 15:57:24 -0500 Subject: [PATCH] wip --- lib/services/wallet-api/index.js | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index bda6136d..23119fe3 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -569,6 +569,10 @@ WalletService.prototype._endpointPostAddresses = function() { } // TODO make imdempotent //this could be a long-running operation, so we'll return a job id + if (!self._isJobQueueReady()) { + return callback(new Error('Job queue is currently overloaded, please try again later.'), jobResults); + } + var jobId = utils.generateJobId(); self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self)); res.status(200).jsonp({jobId: jobId}); @@ -601,6 +605,10 @@ WalletService.prototype._endpointGetTransactions = function() { WalletService.prototype._endpointPutAddresses = function() { var self = this; return function(req, res) { + if (!self._isJobQueueReady()) { + return utils.sendError(new Error('Job Queue is full, current job limit: ' + self._MAX_QUEUE), res); + } + var newAddresses = req.body; if(!Array.isArray(req.body)) { @@ -843,11 +851,14 @@ WalletService.prototype._isJobQueueReady = function() { }; WalletService.prototype._jobCompletionCallback = function(err, results) { + log.info('Completed job: ', results.jobId); var jobId = results.jobId; var job = this._jobs.get(jobId); if (!job) { + log.debug('ERROR: Could not locate job id: ' + jobId + + ' in the list of jobs. It may have been purged already although it should not have.'); return; } @@ -865,14 +876,11 @@ WalletService.prototype._jobCompletionCallback = function(err, results) { job.reported = false; }; +//TODO: if this is running as a job, then the whole process can be moved to another CPU WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) { var self = this; - var results = { jobId: jobId }; - - if (!self._isJobQueueReady()) { - return callback(new Error('Job queue is currently overloaded, please try again later.'), results); - } + var jobResults = { jobId: jobId }; var job = { starttime: Date.now(), @@ -885,7 +893,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, self._getAddresses(walletId, function(err, oldAddresses) { if(err) { - return callback(err, results); + return callback(err, jobResults); } async.parallel( @@ -895,7 +903,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, ], function(err, results) { if(err) { - return callback(err, results); + return callback(err, jobResults); } var now = Date.now(); @@ -911,16 +919,19 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, self.store.batch(operations, function(err) { if(err) { - return callback(err, results); + return callback(err, jobResults); } self._loadAllAddresses(function(err) { if(err) { - return callback(err, results); + return callback(err, jobResults); } self._loadAllBalances(function(err) { - callback(err, results); + if(err) { + return callback(err, jobResults); + } + callback(null, jobResults); }); }); });