diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 95974253..bda6136d 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -18,9 +18,17 @@ var Encoding = require('./encoding'); var WalletService = function(options) { BaseService.call(this, options); + //TODO: what to do if we overflow jobs for which we never reported on + //jobs are agnostic to the amount of work in each job. Additionally, + //each job in the cache is already running immediately after setting + //the job. This means that a large job can be started before smaller + //jobs and this can lead to slower processing of all jobs over time. + this._MAX_QUEUE = 1; this._jobs = LRU({ - max: 50 //keep 50 jobs around; + max: this._MAX_QUEUE, + maxAge: 86400000 * 3 //3 days }); + this._cache = LRU({ max: 500 * 1024 * 1024, length: function(n) { @@ -820,35 +828,64 @@ WalletService.prototype._createWallet = function(walletId, callback) { }); }; +WalletService.prototype._isJobQueueReady = function() { + + var self = this; + + self._jobs.rforEach(function(value, key) { + if ((value.state === 'complete' || value.state === 'error') && value.reported) { + self._jobs.del(key); + } + }); + + return self._jobs.length < self._MAX_QUEUE; + +}; + WalletService.prototype._jobCompletionCallback = function(err, results) { var jobId = results.jobId; - this._jobs[jobId].progress = 1.0; - this._jobs[jobId].endtime = Date.now(); + var job = this._jobs.get(jobId); - if (err) { - this._jobs[jobId].status = 'error'; - return this._jobs[jobId].message = err.message; + if (!job) { + return; } - this._jobs[jobId].status = 'complete'; - this._jobs[jobId].message = results; + job.progress = 1.0; + job.endtime = Date.now(); + + if (err) { + job.status = 'error'; + job.message = err.message; + return; + } + + job.status = 'complete'; + job.message = results; + job.reported = false; }; WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) { var self = this; + var results = { jobId: jobId }; - self._jobs[jobId] = { + if (!self._isJobQueueReady()) { + return callback(new Error('Job queue is currently overloaded, please try again later.'), results); + } + + var job = { starttime: Date.now(), fn: 'importAddresses', progress: 0, projectedendtime: null }; + this._jobs.set(jobId, job); + self._getAddresses(walletId, function(err, oldAddresses) { if(err) { - return callback(err); + return callback(err, results); } async.parallel( @@ -858,12 +895,12 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, ], function(err, results) { if(err) { - return callback(err); + return callback(err, results); } var now = Date.now(); - self._jobs[jobId].progress = 0.50; - self._jobs[jobId].projectedendtime = now + (now - self._jobs[jobId].starttime); + job.progress = 0.50; + job.projectedendtime = now + (now - job.starttime); var operations = results[0].concat(results[1]); operations.push({ @@ -874,15 +911,17 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, self.store.batch(operations, function(err) { if(err) { - return callback(err); + return callback(err, results); } self._loadAllAddresses(function(err) { if(err) { - return callback(err); + return callback(err, results); } - self._loadAllBalances(callback); + self._loadAllBalances(function(err) { + callback(err, results); + }); }); }); } @@ -983,15 +1022,39 @@ WalletService.prototype._storeBalance = function(walletId, balance, callback) { this.store.put(key, value, callback); }; -WalletService.prototype._endpointJobStatus = function() { +WalletService.prototype._endpointJobs = function() { + var self = this; + return function(req, res) { - var jobId = req.jobId; - if (!jobId || !self._jobs[jobId]) { - return utils.sendError(new Error('Job not found.')); - } - return res.status(201).jsonp(self._jobs[jobId]); + + var count = 0; + self._jobs.rforEach(function(value, key) { + if ((value.state === 'complete' || value.state === 'error') && value.reported) { + count++; + } + }); + + res.status(200).jsonp({ jobCount: self._jobs.length - count }); }; + +}; + +WalletService.prototype._endpointJobStatus = function() { + + var self = this; + + return function(req, res) { + var jobId = req.params.jobId; + var job = self._jobs.get(jobId); + if (!jobId || !job) { + return utils.sendError(new Error('Job not found. ' + + 'The job results may have been purged to make room for new jobs.'), res); + } + job.reported = true; + return res.status(201).jsonp(job); + }; + }; WalletService.prototype._endpointGetInfo = function() { @@ -1044,6 +1107,9 @@ WalletService.prototype.setupRoutes = function(app) { app.get('/jobs/:jobId', s._endpointJobStatus() ); + app.get('/jobs', + s._endpointJobs() + ); }; WalletService.prototype.getRoutePrefix = function() {