From 7273776e55166282a8c85475fdf4f30220fc6d6d Mon Sep 17 00:00:00 2001 From: Manuel Araoz Date: Wed, 30 Jul 2014 17:29:28 -0300 Subject: [PATCH] linking socket with leveldb --- app/controllers/socket.js | 19 ++++++++++++------- insight.js | 2 +- lib/MessageDb.js | 39 +++++++++++++++++++++++++++++---------- test/test.MessageDb.js | 22 ++++++++++++++++------ 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/app/controllers/socket.js b/app/controllers/socket.js index d675729..81ce098 100644 --- a/app/controllers/socket.js +++ b/app/controllers/socket.js @@ -1,11 +1,10 @@ 'use strict'; // server-side socket behaviour -// io is a variable already taken in express -var ios = null; +var ios = null; // io is already taken in express var util = require('bitcore').util; -module.exports.init = function(app, io_ext) { +module.exports.init = function(io_ext) { ios = io_ext; ios.sockets.on('connection', function(socket) { socket.on('subscribe', function(topic) { @@ -21,9 +20,7 @@ module.exports.broadcastTx = function(tx) { t = { txid: tx }; - } - - else { + } else { t = { txid: tx.txid, size: tx.size, @@ -34,7 +31,7 @@ module.exports.broadcastTx = function(tx) { valueOut += o.valueSat; }); - t.valueOut = (valueOut.toFixed(8)/util.COIN); + t.valueOut = (valueOut.toFixed(8) / util.COIN); } ios.sockets.in('inv').emit('tx', t); } @@ -55,3 +52,11 @@ module.exports.broadcastSyncInfo = function(historicSync) { if (ios) ios.sockets.in('sync').emit('status', historicSync); }; + +module.exports.broadcastMessage = function(from, to, ts, message) { + console.log('sending socket: %s, %s, %s, %s', from, to, ts, message); + if (ios) { + ios.sockets.in(to).emit(from + '-' + ts, message); + } + +} diff --git a/insight.js b/insight.js index 6c41b9b..0923081 100755 --- a/insight.js +++ b/insight.js @@ -120,7 +120,7 @@ require('./config/routes')(expressApp); // socket.io var server = require('http').createServer(expressApp); var ios = require('socket.io')(server); -require('./app/controllers/socket.js').init(expressApp, ios); +require('./app/controllers/socket.js').init(ios); //Start the app by listening on server.listen(config.port, function() { diff --git a/lib/MessageDb.js b/lib/MessageDb.js index c8d1766..61bd334 100644 --- a/lib/MessageDb.js +++ b/lib/MessageDb.js @@ -5,31 +5,50 @@ var config = require('../config/config'); var Rpc = imports.rpc || require('./Rpc'); var async = require('async'); var logger = require('./logger').logger; +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var sockets = require('../app/controllers/socket.js'); var MESSAGE_PREFIX = 'msg-'; // msg-- => var MAX_OPEN_FILES = 500; var CONCURRENCY = 5; -var db = imports.db || levelup(config.leveldb + '/messages', { - maxOpenFiles: MAX_OPEN_FILES -}); var d = logger.log; var info = logger.info; +var db; +var MessageDb = function(opts) { + opts = opts || {}; + this.path = config.leveldb + '/messages' + (opts.name ? ('-' + opts.name) : '') + if (!db) { + db = levelup(this.path, { + maxOpenFiles: MAX_OPEN_FILES + }); + } + this.db = db; + this.initEvents(); +}; +util.inherits(MessageDb, EventEmitter); -var MessageDb = function() { - db.on('put', function(key, value) { - console.log(key + '=>' + value); +MessageDb.prototype.initEvents = function() { + this.db.on('put', function(key, value) { + console.log('putting ' + key + '=>' + value); + var spl = key.split('-'); + var from = spl[0]; + var to = spl[1]; + var ts = spl[2]; + var message = value; + sockets.broadcastMessage(from, to, ts, message); }); - db.on('ready', function() { + this.db.on('ready', function() { console.log('Database ready!'); }); }; MessageDb.prototype.close = function(cb) { - db.close(cb); + this.db.close(cb); }; @@ -41,12 +60,12 @@ var messageKey = function(from, to, ts) { MessageDb.prototype.addMessage = function(m, from, to, cb) { var key = messageKey(from, to); var value = m; - db.put(key, value, cb); + this.db.put(key, value, cb); }; MessageDb.prototype.getMessages = function(from, to, from_ts, to_ts, cb) { // TODO - db.get(messageKey(from, to), function(err, val) { + this.db.get(messageKey(from, to), function(err, val) { if (err && err.notFound) return cb(); if (err) return cb(err); diff --git a/test/test.MessageDb.js b/test/test.MessageDb.js index b5012a8..4a89d3a 100644 --- a/test/test.MessageDb.js +++ b/test/test.MessageDb.js @@ -9,13 +9,23 @@ var SIN = bitcore.SIN; describe('MessageDb', function() { it('should be able to create instance', function() { - var mdb = new MessageDb(); + var mdb = new MessageDb(opts); }); - it('should receive events', function(done) { - var mdb = new MessageDb(); - var message = {}; - var from = new SIN(new Buffer('dadbad00', 'hex')); - var to = new SIN(new Buffer('bacacafe', 'hex')); + it('should be able to close instance', function() { + var mdb = new MessageDb(opts); + mdb.close(); + }); + var opts = { + name: 'test-MessageDb' + }; + var from = new SIN(new Buffer('dadbad00', 'hex')); + var to = new SIN(new Buffer('bacacafe', 'hex')); + var message = { + a: 1, + b: 2 + }; + it('should be able to add messages', function(done) { + var mdb = new MessageDb(opts); console.log(to.toString()); mdb.addMessage(message, from, to, function(err) { done();