linking socket with leveldb

This commit is contained in:
Manuel Araoz 2014-07-30 17:29:28 -03:00
parent ac1c98f8d4
commit 7273776e55
4 changed files with 58 additions and 24 deletions

View File

@ -1,11 +1,10 @@
'use strict'; 'use strict';
// server-side socket behaviour // server-side socket behaviour
// io is a variable already taken in express var ios = null; // io is already taken in express
var ios = null;
var util = require('bitcore').util; var util = require('bitcore').util;
module.exports.init = function(app, io_ext) { module.exports.init = function(io_ext) {
ios = io_ext; ios = io_ext;
ios.sockets.on('connection', function(socket) { ios.sockets.on('connection', function(socket) {
socket.on('subscribe', function(topic) { socket.on('subscribe', function(topic) {
@ -21,9 +20,7 @@ module.exports.broadcastTx = function(tx) {
t = { t = {
txid: tx txid: tx
}; };
} } else {
else {
t = { t = {
txid: tx.txid, txid: tx.txid,
size: tx.size, size: tx.size,
@ -34,7 +31,7 @@ module.exports.broadcastTx = function(tx) {
valueOut += o.valueSat; valueOut += o.valueSat;
}); });
t.valueOut = (valueOut.toFixed(8)/util.COIN); t.valueOut = (valueOut.toFixed(8) / util.COIN);
} }
ios.sockets.in('inv').emit('tx', t); ios.sockets.in('inv').emit('tx', t);
} }
@ -55,3 +52,11 @@ module.exports.broadcastSyncInfo = function(historicSync) {
if (ios) if (ios)
ios.sockets.in('sync').emit('status', historicSync); ios.sockets.in('sync').emit('status', historicSync);
}; };
module.exports.broadcastMessage = function(from, to, ts, message) {
console.log('sending socket: %s, %s, %s, %s', from, to, ts, message);
if (ios) {
ios.sockets.in(to).emit(from + '-' + ts, message);
}
}

View File

@ -120,7 +120,7 @@ require('./config/routes')(expressApp);
// socket.io // socket.io
var server = require('http').createServer(expressApp); var server = require('http').createServer(expressApp);
var ios = require('socket.io')(server); var ios = require('socket.io')(server);
require('./app/controllers/socket.js').init(expressApp, ios); require('./app/controllers/socket.js').init(ios);
//Start the app by listening on <port> //Start the app by listening on <port>
server.listen(config.port, function() { server.listen(config.port, function() {

View File

@ -5,31 +5,50 @@ var config = require('../config/config');
var Rpc = imports.rpc || require('./Rpc'); var Rpc = imports.rpc || require('./Rpc');
var async = require('async'); var async = require('async');
var logger = require('./logger').logger; var logger = require('./logger').logger;
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var sockets = require('../app/controllers/socket.js');
var MESSAGE_PREFIX = 'msg-'; // msg-<sin1>-<sin2> => <message> var MESSAGE_PREFIX = 'msg-'; // msg-<sin1>-<sin2> => <message>
var MAX_OPEN_FILES = 500; var MAX_OPEN_FILES = 500;
var CONCURRENCY = 5; var CONCURRENCY = 5;
var db = imports.db || levelup(config.leveldb + '/messages', {
maxOpenFiles: MAX_OPEN_FILES
});
var d = logger.log; var d = logger.log;
var info = logger.info; var info = logger.info;
var db;
var MessageDb = function(opts) {
opts = opts || {};
this.path = config.leveldb + '/messages' + (opts.name ? ('-' + opts.name) : '')
if (!db) {
db = levelup(this.path, {
maxOpenFiles: MAX_OPEN_FILES
});
}
this.db = db;
this.initEvents();
};
util.inherits(MessageDb, EventEmitter);
var MessageDb = function() { MessageDb.prototype.initEvents = function() {
db.on('put', function(key, value) { this.db.on('put', function(key, value) {
console.log(key + '=>' + value); console.log('putting ' + key + '=>' + value);
var spl = key.split('-');
var from = spl[0];
var to = spl[1];
var ts = spl[2];
var message = value;
sockets.broadcastMessage(from, to, ts, message);
}); });
db.on('ready', function() { this.db.on('ready', function() {
console.log('Database ready!'); console.log('Database ready!');
}); });
}; };
MessageDb.prototype.close = function(cb) { MessageDb.prototype.close = function(cb) {
db.close(cb); this.db.close(cb);
}; };
@ -41,12 +60,12 @@ var messageKey = function(from, to, ts) {
MessageDb.prototype.addMessage = function(m, from, to, cb) { MessageDb.prototype.addMessage = function(m, from, to, cb) {
var key = messageKey(from, to); var key = messageKey(from, to);
var value = m; var value = m;
db.put(key, value, cb); this.db.put(key, value, cb);
}; };
MessageDb.prototype.getMessages = function(from, to, from_ts, to_ts, cb) { MessageDb.prototype.getMessages = function(from, to, from_ts, to_ts, cb) {
// TODO // TODO
db.get(messageKey(from, to), function(err, val) { this.db.get(messageKey(from, to), function(err, val) {
if (err && err.notFound) return cb(); if (err && err.notFound) return cb();
if (err) return cb(err); if (err) return cb(err);

View File

@ -9,13 +9,23 @@ var SIN = bitcore.SIN;
describe('MessageDb', function() { describe('MessageDb', function() {
it('should be able to create instance', function() { it('should be able to create instance', function() {
var mdb = new MessageDb(); var mdb = new MessageDb(opts);
}); });
it('should receive events', function(done) { it('should be able to close instance', function() {
var mdb = new MessageDb(); var mdb = new MessageDb(opts);
var message = {}; mdb.close();
var from = new SIN(new Buffer('dadbad00', 'hex')); });
var to = new SIN(new Buffer('bacacafe', 'hex')); var opts = {
name: 'test-MessageDb'
};
var from = new SIN(new Buffer('dadbad00', 'hex'));
var to = new SIN(new Buffer('bacacafe', 'hex'));
var message = {
a: 1,
b: 2
};
it('should be able to add messages', function(done) {
var mdb = new MessageDb(opts);
console.log(to.toString()); console.log(to.toString());
mdb.addMessage(message, from, to, function(err) { mdb.addMessage(message, from, to, function(err) {
done(); done();