diff --git a/config/config.js b/config/config.js index 3496723..a639f2d 100644 --- a/config/config.js +++ b/config/config.js @@ -76,6 +76,7 @@ var bitcoindConf = { disableAgent: true }; +var enableCleaner = process.env.ENABLE_CLEANER === 'true'; var enableMailbox = process.env.ENABLE_MAILBOX === 'true'; var enableRatelimiter = process.env.ENABLE_RATELIMITER === 'true'; var loggerLevel = process.env.LOGGER_LEVEL || 'info'; @@ -92,8 +93,12 @@ if (!fs.existsSync(db)) { } module.exports = { + enableCleaner: enableCleaner, + cleaner: require('../plugins/config-cleaner.js'), enableMailbox: enableMailbox, + mailbox: require('../plugins/config-mailbox.js'), enableRatelimiter: enableRatelimiter, + ratelimiter: require('../plugins/config-ratelimiter.js'), loggerLevel: loggerLevel, enableHTTPS: enableHTTPS, version: version, diff --git a/insight.js b/insight.js index 5fd9011..2b810a1 100755 --- a/insight.js +++ b/insight.js @@ -135,6 +135,10 @@ if (config.enableMailbox) { require('./plugins/mailbox').init(ios, config.mailbox); } +if (config.enableCleaner) { + require('./plugins/cleaner').init(config.cleaner); +} + // express settings diff --git a/lib/MessageDb.js b/lib/MessageDb.js index a7afe88..20b2c8c 100644 --- a/lib/MessageDb.js +++ b/lib/MessageDb.js @@ -83,13 +83,21 @@ MessageDb.prototype.authenticate = function(m) { return AuthMessage._verify(frompubkey, sig, encrypted); }; +MessageDb.parseKey = function(key) { + var ret = {}; + var spl = key.split('-'); + + ret.to = spl[1]; + ret.ts = +spl[2]; + + return ret; +}; + MessageDb.fromStorage = function(data) { - var spl = data.key.split('-'); - var to = spl[1]; - var ts = +spl[2]; + var parsed = MessageDb.parseKey(data.key); var message = data.value; - message.ts = ts; - message.to = to; + message.ts = parsed.ts; + message.to = parsed.to; return message; }; @@ -97,10 +105,10 @@ MessageDb.prototype.getMessages = function(to, lower_ts, upper_ts, cb) { var list = []; lower_ts = lower_ts || 1; var opts = { - end: messageKey(to, lower_ts), - start: messageKey(to, upper_ts), + start: messageKey(to, lower_ts), + end: messageKey(to, upper_ts), // limit: limit, TODO - reverse: true, + reverse: false, }; db.createReadStream(opts) @@ -112,8 +120,50 @@ MessageDb.prototype.getMessages = function(to, lower_ts, upper_ts, cb) { return cb(err); }) .on('end', function() { - return cb(null, list.reverse()); + return cb(null, list); }); }; +MessageDb.prototype.getAll = function(cb) { + var list = []; + db.createReadStream() + .on('data', function(data) { + list.push(MessageDb.fromStorage(data)); + }) + .on('error', function(err) { + return cb(err); + }) + .on('end', function() { + return cb(null, list); + }); +}; + +MessageDb.prototype.removeUpTo = function(ts, cb) { + preconditions.checkArgument(ts); + preconditions.checkArgument(typeof ts === 'number'); + var opts = {}; + var dels = []; + db.createKeyStream(opts) + .on('data', function(key) { + var parsed = MessageDb.parseKey(key); + if (parsed.ts < ts) { + logger.verbose('Deleting message ' + key); + dels.push({ + type: 'del', + key: key + }); + } + }) + .on('error', function(err) { + return cb(err); + }) + .on('end', function() { + db.batch(dels, function(err) { + if (err) return cb(err); + else cb(null, dels.length); + }) + }); + +}; + module.exports = soop(MessageDb); diff --git a/package.json b/package.json index a9150aa..ec43f30 100644 --- a/package.json +++ b/package.json @@ -59,10 +59,10 @@ "buffertools": "*", "commander": "^2.3.0", "connect-ratelimit": "git://github.com/dharmafly/connect-ratelimit.git#0550eff209c54f35078f46445000797fa942ab97", + "cron": "^1.0.4", "express": "~3.4.7", "glob": "*", - "leveldown": "~0.10.0", - "levelup": "~0.19.0", + "microtime": "^0.6.0", "moment": "~2.5.0", "preconditions": "^1.0.7", "should": "~2.1.1", @@ -71,8 +71,7 @@ "socket.io-client": "1.0.6", "soop": "=0.1.5", "winston": "*", - "xmlhttprequest": "~1.6.0", - "microtime": "^0.6.0" + "xmlhttprequest": "~1.6.0" }, "devDependencies": { "chai": "*", diff --git a/plugins/cleaner.js b/plugins/cleaner.js new file mode 100644 index 0000000..9323f65 --- /dev/null +++ b/plugins/cleaner.js @@ -0,0 +1,25 @@ +var mdb = require('../lib/MessageDb').default(); +var logger = require('../lib/logger').logger; +var preconditions = require('preconditions').singleton(); +var microtime = require('microtime'); +var cron = require('cron'); +var CronJob = cron.CronJob; + + +module.exports.init = function(config) { + var cronTime = config.cronTime || '0 * * * *'; + logger.info('Using cleaner plugin with cronTime ' + cronTime); + var onTick = function() { + var limit = microtime.now() - 1000 * 1000 * config.threshold; + mdb.removeUpTo(limit, function(err, n) { + if (err) logger.error(err); + else logger.info('Ran cleaner task, removed ' + n); + }); + }; + var job = new CronJob({ + cronTime: cronTime, + onTick: onTick + }); + onTick(); + job.start(); +}; diff --git a/plugins/config-cleaner.js b/plugins/config-cleaner.js new file mode 100644 index 0000000..0948f68 --- /dev/null +++ b/plugins/config-cleaner.js @@ -0,0 +1,6 @@ +module.exports = { + + cronTime: '* * * * *', + threshold: 2*24*60*60, // 2 days, in seconds + +}; diff --git a/plugins/config-mailbox.js b/plugins/config-mailbox.js new file mode 100644 index 0000000..f4d6253 --- /dev/null +++ b/plugins/config-mailbox.js @@ -0,0 +1,3 @@ +module.exports = { + +}; diff --git a/plugins/config-ratelimiter.js b/plugins/config-ratelimiter.js new file mode 100644 index 0000000..f4d6253 --- /dev/null +++ b/plugins/config-ratelimiter.js @@ -0,0 +1,3 @@ +module.exports = { + +}; diff --git a/test/test.MessageDb.js b/test/test.MessageDb.js index efc1006..960fc84 100644 --- a/test/test.MessageDb.js +++ b/test/test.MessageDb.js @@ -108,6 +108,23 @@ describe('MessageDb', function() { done(); }); }); + it('should be able #removeUpTo', function(done) { + var mdb = sharedMDB; + var upper_ts = microtime.now(); + mdb.addMessage(message, function(err) { + expect(err).to.not.exist; + mdb.removeUpTo(upper_ts, function(err, n) { + expect(err).to.not.exist; + n.should.equal(4); + mdb.getAll(function(error, all) { + expect(error).to.not.exist; + all.length.should.equal(1); + done(); + }); + + }); + }); + }); it('should be able to close instance', function() { var mdb = new MessageDb(opts); mdb.close();