From 8d672de5bd3039437ea6dbd2c435a7fbd00ac21f Mon Sep 17 00:00:00 2001 From: Manuel Araoz Date: Wed, 20 Aug 2014 12:05:27 -0400 Subject: [PATCH] refactoring message broker into plugin --- app/controllers/socket.js | 54 +++----------------------------------- config/config.js | 6 +++-- insight.js | 53 ++++++++++++++++++++++++++++++++++--- plugins/mailbox.js | 2 ++ test/test.socket-server.js | 2 +- 5 files changed, 61 insertions(+), 56 deletions(-) create mode 100644 plugins/mailbox.js diff --git a/app/controllers/socket.js b/app/controllers/socket.js index 9220cf2..47952f9 100644 --- a/app/controllers/socket.js +++ b/app/controllers/socket.js @@ -3,9 +3,6 @@ // server-side socket behaviour var ios = null; // io is already taken in express var util = require('bitcore').util; -var mdb = require('../../lib/MessageDb').default(); -var microtime = require('microtime'); -var enableMessageBroker; var verbose = false; var log = function() { @@ -15,64 +12,21 @@ var log = function() { } module.exports.init = function(io_ext, config) { - enableMessageBroker = config ? config.enableMessageBroker : false; ios = io_ext; + verbose = config.verbose; if (ios) { // when a new socket connects ios.sockets.on('connection', function(socket) { log('New connection from ' + socket.id); // when it subscribes, make it join the according room socket.on('subscribe', function(topic) { - if (socket.rooms.length === 1) { - log('subscribe to ' + topic); - socket.join(topic); - } + log('subscribe to ' + topic); + socket.join(topic); }); - if (enableMessageBroker) { - // when it requests sync, send him all pending messages - socket.on('sync', function(ts) { - log('Sync requested by ' + socket.id); - log(' from timestamp '+ts); - var rooms = socket.rooms; - if (rooms.length !== 2) { - socket.emit('insight-error', 'Must subscribe with public key before syncing'); - return; - } - var to = rooms[1]; - var upper_ts = Math.round(microtime.now()); - log(' to timestamp '+upper_ts); - mdb.getMessages(to, ts, upper_ts, function(err, messages) { - if (err) { - throw new Error('Couldn\'t get messages on sync request: ' + err); - } - log('\tFound ' + messages.length + ' message' + (messages.length !== 1 ? 's' : '')); - for (var i = 0; i < messages.length; i++) { - broadcastMessage(messages[i], socket); - } - }); - }); - - // when it sends a message, add it to db - socket.on('message', function(m) { - log('Message sent from ' + m.pubkey + ' to ' + m.to); - mdb.addMessage(m, function(err) { - if (err) { - throw new Error('Couldn\'t add message to database: ' + err); - } - }); - }); - - - // disconnect handler - socket.on('disconnect', function() { - log('disconnected ' + socket.id); - }); - } }); - if (enableMessageBroker) - mdb.on('message', broadcastMessage); } + return ios; }; var simpleTx = function(tx) { diff --git a/config/config.js b/config/config.js index 8d7c02b..35b3cf3 100644 --- a/config/config.js +++ b/config/config.js @@ -61,6 +61,7 @@ dataDir += network === 'testnet' ? 'testnet3' : ''; var safeConfirmations = process.env.INSIGHT_SAFE_CONFIRMATIONS || 6; var ignoreCache = process.env.INSIGHT_IGNORE_CACHE || 0; +var verbose = process.env.VERBOSE === 'true'; var bitcoindConf = { @@ -76,7 +77,7 @@ var bitcoindConf = { disableAgent: true }; -var enableMessageBroker = process.env.ENABLE_MESSAGE_BROKER === 'true'; +var enableMailbox = process.env.ENABLE_MAILBOX === 'true'; if (!fs.existsSync(db)) { var err = fs.mkdirSync(db); @@ -89,7 +90,8 @@ if (!fs.existsSync(db)) { } module.exports = { - enableMessageBroker: enableMessageBroker, + enableMailbox: enableMailbox, + verbose: verbose, version: version, root: rootPath, publicPath: process.env.INSIGHT_PUBLIC_PATH || false, diff --git a/insight.js b/insight.js index a55703b..0b7a268 100755 --- a/insight.js +++ b/insight.js @@ -111,16 +111,63 @@ if (!config.disableHistoricSync) { if (peerSync) peerSync.allowReorgs = true; -//express settings +// express settings require('./config/express')(expressApp, historicSync, peerSync); -//Bootstrap routes +// routes require('./config/routes')(expressApp); // socket.io var server = require('http').createServer(expressApp); var ios = require('socket.io')(server); -require('./app/controllers/socket.js').init(ios, config); + +// plugins +require('./app/controllers/socket.js').init(ios); +if (config.enableMessageBroker) { + var mdb = require('../../lib/MessageDb').default(); + + // when it requests sync, send him all pending messages + socket.on('sync', function(ts) { + log('Sync requested by ' + socket.id); + log(' from timestamp ' + ts); + var rooms = socket.rooms; + if (rooms.length !== 2) { + socket.emit('insight-error', 'Must subscribe with public key before syncing'); + return; + } + var to = rooms[1]; + var upper_ts = Math.round(microtime.now()); + log(' to timestamp ' + upper_ts); + mdb.getMessages(to, ts, upper_ts, function(err, messages) { + if (err) { + throw new Error('Couldn\'t get messages on sync request: ' + err); + } + log('\tFound ' + messages.length + ' message' + (messages.length !== 1 ? 's' : '')); + for (var i = 0; i < messages.length; i++) { + broadcastMessage(messages[i], socket); + } + }); + }); + + // when it sends a message, add it to db + socket.on('message', function(m) { + log('Message sent from ' + m.pubkey + ' to ' + m.to); + mdb.addMessage(m, function(err) { + if (err) { + throw new Error('Couldn\'t add message to database: ' + err); + } + }); + }); + + + // disconnect handler + socket.on('disconnect', function() { + log('disconnected ' + socket.id); + }); + + mdb.on('message', broadcastMessage); + +} //Start the app by listening on server.listen(config.port, function() { diff --git a/plugins/mailbox.js b/plugins/mailbox.js new file mode 100644 index 0000000..441c5a1 --- /dev/null +++ b/plugins/mailbox.js @@ -0,0 +1,2 @@ + +var microtime = require('microtime'); diff --git a/test/test.socket-server.js b/test/test.socket-server.js index f074472..6920889 100644 --- a/test/test.socket-server.js +++ b/test/test.socket-server.js @@ -15,7 +15,7 @@ describe('socket server', function() { }); it('should register socket handlers', function() { var io = { - sockets: new EventEmitter() + sockets: new EventEmitter(), } socket.init(io);