Further work on jobids.

This commit is contained in:
Chris Kleeschulte 2017-03-09 08:10:07 -05:00
parent fea7c33ab0
commit 6a9d9bac7b

View File

@ -18,9 +18,17 @@ var Encoding = require('./encoding');
var WalletService = function(options) { var WalletService = function(options) {
BaseService.call(this, options); BaseService.call(this, options);
//TODO: what to do if we overflow jobs for which we never reported on
//jobs are agnostic to the amount of work in each job. Additionally,
//each job in the cache is already running immediately after setting
//the job. This means that a large job can be started before smaller
//jobs and this can lead to slower processing of all jobs over time.
this._MAX_QUEUE = 1;
this._jobs = LRU({ this._jobs = LRU({
max: 50 //keep 50 jobs around; max: this._MAX_QUEUE,
maxAge: 86400000 * 3 //3 days
}); });
this._cache = LRU({ this._cache = LRU({
max: 500 * 1024 * 1024, max: 500 * 1024 * 1024,
length: function(n) { length: function(n) {
@ -820,35 +828,64 @@ WalletService.prototype._createWallet = function(walletId, callback) {
}); });
}; };
WalletService.prototype._isJobQueueReady = function() {
var self = this;
self._jobs.rforEach(function(value, key) {
if ((value.state === 'complete' || value.state === 'error') && value.reported) {
self._jobs.del(key);
}
});
return self._jobs.length < self._MAX_QUEUE;
};
WalletService.prototype._jobCompletionCallback = function(err, results) { WalletService.prototype._jobCompletionCallback = function(err, results) {
var jobId = results.jobId; var jobId = results.jobId;
this._jobs[jobId].progress = 1.0; var job = this._jobs.get(jobId);
this._jobs[jobId].endtime = Date.now();
if (err) { if (!job) {
this._jobs[jobId].status = 'error'; return;
return this._jobs[jobId].message = err.message;
} }
this._jobs[jobId].status = 'complete'; job.progress = 1.0;
this._jobs[jobId].message = results; job.endtime = Date.now();
if (err) {
job.status = 'error';
job.message = err.message;
return;
}
job.status = 'complete';
job.message = results;
job.reported = false;
}; };
WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) { WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) {
var self = this; var self = this;
var results = { jobId: jobId };
self._jobs[jobId] = { if (!self._isJobQueueReady()) {
return callback(new Error('Job queue is currently overloaded, please try again later.'), results);
}
var job = {
starttime: Date.now(), starttime: Date.now(),
fn: 'importAddresses', fn: 'importAddresses',
progress: 0, progress: 0,
projectedendtime: null projectedendtime: null
}; };
this._jobs.set(jobId, job);
self._getAddresses(walletId, function(err, oldAddresses) { self._getAddresses(walletId, function(err, oldAddresses) {
if(err) { if(err) {
return callback(err); return callback(err, results);
} }
async.parallel( async.parallel(
@ -858,12 +895,12 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
], ],
function(err, results) { function(err, results) {
if(err) { if(err) {
return callback(err); return callback(err, results);
} }
var now = Date.now(); var now = Date.now();
self._jobs[jobId].progress = 0.50; job.progress = 0.50;
self._jobs[jobId].projectedendtime = now + (now - self._jobs[jobId].starttime); job.projectedendtime = now + (now - job.starttime);
var operations = results[0].concat(results[1]); var operations = results[0].concat(results[1]);
operations.push({ operations.push({
@ -874,15 +911,17 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
self.store.batch(operations, function(err) { self.store.batch(operations, function(err) {
if(err) { if(err) {
return callback(err); return callback(err, results);
} }
self._loadAllAddresses(function(err) { self._loadAllAddresses(function(err) {
if(err) { if(err) {
return callback(err); return callback(err, results);
} }
self._loadAllBalances(callback); self._loadAllBalances(function(err) {
callback(err, results);
});
}); });
}); });
} }
@ -983,15 +1022,39 @@ WalletService.prototype._storeBalance = function(walletId, balance, callback) {
this.store.put(key, value, callback); this.store.put(key, value, callback);
}; };
WalletService.prototype._endpointJobStatus = function() { WalletService.prototype._endpointJobs = function() {
var self = this; var self = this;
return function(req, res) { return function(req, res) {
var jobId = req.jobId;
if (!jobId || !self._jobs[jobId]) { var count = 0;
return utils.sendError(new Error('Job not found.')); self._jobs.rforEach(function(value, key) {
} if ((value.state === 'complete' || value.state === 'error') && value.reported) {
return res.status(201).jsonp(self._jobs[jobId]); count++;
}
});
res.status(200).jsonp({ jobCount: self._jobs.length - count });
}; };
};
WalletService.prototype._endpointJobStatus = function() {
var self = this;
return function(req, res) {
var jobId = req.params.jobId;
var job = self._jobs.get(jobId);
if (!jobId || !job) {
return utils.sendError(new Error('Job not found. ' +
'The job results may have been purged to make room for new jobs.'), res);
}
job.reported = true;
return res.status(201).jsonp(job);
};
}; };
WalletService.prototype._endpointGetInfo = function() { WalletService.prototype._endpointGetInfo = function() {
@ -1044,6 +1107,9 @@ WalletService.prototype.setupRoutes = function(app) {
app.get('/jobs/:jobId', app.get('/jobs/:jobId',
s._endpointJobStatus() s._endpointJobStatus()
); );
app.get('/jobs',
s._endpointJobs()
);
}; };
WalletService.prototype.getRoutePrefix = function() { WalletService.prototype.getRoutePrefix = function() {