Added job reporting.

This commit is contained in:
Chris Kleeschulte 2017-03-08 14:32:32 -05:00
parent d06c177a1a
commit 841530705e
2 changed files with 93 additions and 50 deletions

View File

@ -18,6 +18,9 @@ var Encoding = require('./encoding');
var WalletService = function(options) {
BaseService.call(this, options);
this._jobs = LRU({
max: 50 //keep 50 jobs around;
});
this._cache = LRU({
max: 500 * 1024 * 1024,
length: function(n) {
@ -527,40 +530,40 @@ WalletService.prototype._endpointGetWalletIds = function() {
};
};
WalletService.prototype._endpointRegisterWallet = function() {
var self = this;
return function(req, res) {
var walletId = req.walletId;
if (!walletId) {
walletId = utils.getWalletId();
}
self._createWallet(walletId, function(err) {
if(err) {
return utils.sendError(new Error('Create wallet operatton failed.'), res);
}
res.status(201).jsonp({
walletId: walletId
});
});
};
};
WalletService.prototype._endpointPostAddresses = function() {
var self = this;
return function(req, res) {
var addresses = req.addresses;
if (!addresses || !addresses.length) {
return utils.sendError(new Error('addresses are required when creating a wallet.'));
return utils.sendError(new Error('addresses are required when creating a wallet.'), res);
}
var walletId = utils.getWalletId();
//if this is a post to /wallets, then it is expected that a new wallet is to be created
async.series([
function(next) {
self._createWallet(walletId, function(err) {
if(err) {
return next(err);
}
next();
});
}, function(next) {
// TODO make imdempotent
self._importAddresses(walletId, addresses, function(err) {
if(err) {
return next(err);
}
next();
});
}], function(err) {
if(err) {
return utils.sendError(err, res);
}
res.status(201).jsonp({
walletId: walletId
});
});
var walletId = req.walletId; //also can't be zero
if (!walletId) {
return utils.sendError(new Error('WalletId must be given.'), res);
}
// TODO make imdempotent
//this could be a long-running operation, so we'll return a job id
var jobId = utils.generateJobId();
self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
};
};
@ -596,7 +599,10 @@ WalletService.prototype._endpointPutAddresses = function() {
return utils.sendError(new Error('Must PUT an array'), res);
}
var walletId = req.params.walletId;
var walletId = req.walletId;
if (!walletId) {
return utils.sendError(new Error('WalletId must be given.'), res);
}
self._getAddresses(walletId, function(err, oldAddresses) {
if(err) {
@ -610,16 +616,9 @@ WalletService.prototype._endpointPutAddresses = function() {
var addAddresses = _.without(newAddresses, oldAddresses);
var amountAdded = addAddresses.length;
self._importAddresses(walletId, addAddresses, function(err) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
walletId: walletId,
amountAdded: amountAdded
});
});
var jobId = utils.generateJobId();
self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
});
};
};
@ -816,9 +815,32 @@ WalletService.prototype._createWallet = function(walletId, callback) {
this.store.put(key, value, callback);
};
WalletService.prototype._importAddresses = function(walletId, addresses, callback) {
WalletService.prototype._jobCompletionCallback = function(err, results) {
var jobId = results.jobId;
this._jobs[jobId].progress = 1.0;
this._jobs[jobId].endtime = Date.now();
if (err) {
this._jobs[jobId].status = 'error';
return this._jobs[jobId].message = err.message;
}
this._jobs[jobId].status = 'complete';
this._jobs[jobId].message = results;
};
WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) {
var self = this;
self._jobs[jobId] = {
starttime: Date.now(),
fn: 'importAddresses',
progress: 0,
projectedendtime: null
};
self._getAddresses(walletId, function(err, oldAddresses) {
if(err) {
return callback(err);
@ -826,14 +848,18 @@ WalletService.prototype._importAddresses = function(walletId, addresses, callbac
async.parallel(
[
self._getUTXOIndexOperations.bind(self, walletId, addresses),
self._getTxidIndexOperations.bind(self, walletId, addresses)
self._getUTXOIndexOperations.bind(self, walletId, addresses, jobId),
self._getTxidIndexOperations.bind(self, walletId, addresses, jobId)
],
function(err, results) {
if(err) {
return callback(err);
}
var now = Date.now();
self._jobs[jobId].progress = 0.50;
self._jobs[jobId].projectedendtime = now + (now - self._jobs[jobId].starttime);
var operations = results[0].concat(results[1]);
operations.push({
type: 'put',
@ -851,7 +877,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, callbac
return callback(err);
}
self._loadAllBalances(callback);
self._loadAllBalances(jobId, callback);
});
});
}
@ -859,7 +885,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, callbac
});
};
WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, callback) {
WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, jobId, callback) {
var self = this;
var balance = 0;
@ -909,7 +935,7 @@ WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses,
});
};
WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, callback) {
WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, jobId, callback) {
var self = this;
var txids = {};
@ -952,6 +978,17 @@ WalletService.prototype._storeBalance = function(walletId, balance, callback) {
this.store.put(key, value, callback);
};
WalletService.prototype._endpointJobStatus = function() {
var self = this;
return function(req, res) {
var jobId = req.jobId;
if (!jobId || !self._jobs[jobId]) {
return utils.sendError(new Error('Job not found.'));
}
return res.status(201).jsonp(self._jobs[jobId]);
};
};
WalletService.prototype._endpointGetInfo = function() {
return function(req, res) {
res.jsonp({result: 'ok'});
@ -985,17 +1022,23 @@ WalletService.prototype.setupRoutes = function(app) {
app.put('/wallets/:walletId/addresses',
s._endpointPutAddresses()
);
app.get('/wallets/:walletId/transactions',
s._endpointGetTransactions()
);
app.post('/wallets',
app.post('/wallets/:walletId/addresses',
upload.single('addresses'),
v.checkAddresses,
s._endpointPostAddresses()
);
app.get('/wallets/:walletId/transactions',
s._endpointGetTransactions()
);
app.post('/wallets/:walletId',
s._endpointRegisterWallet()
);
app.get('/wallets',
s._endpointGetWalletIds()
);
app.get('/jobs/:jobId',
s._endpointJobStatus()
);
};
WalletService.prototype.getRoutePrefix = function() {

View File

@ -230,7 +230,7 @@ exports.createLogStream = function(logger) {
return stream;
};
exports.getWalletId = function() {
exports.getWalletId = exports.generateJobId = function() {
return crypto.randomBytes(16).toString('hex');
};