Merge pull request #143 from maraoz/add/SIN-broker

Add encrypted message broker and chat example
This commit is contained in:
Matias Alejo Garcia 2014-08-05 20:26:18 -03:00
commit 8138e03908
12 changed files with 42523 additions and 224 deletions

View File

@ -88,6 +88,7 @@ INSIGHT_NETWORK [= 'livenet' | 'testnet']
INSIGHT_DB # Path where to store insight's internal DB. (defaults to $HOME/.insight)
INSIGHT_SAFE_CONFIRMATIONS=6 # Nr. of confirmation needed to start caching transaction information
INSIGHT_IGNORE_CACHE # True to ignore cache of spents in transaction, with more than INSIGHT_SAFE_CONFIRMATIONS confirmations. This is useful for tracking double spents for old transactions.
ENABLE_MESSAGE_BROKER # if "true" will enable message broker module
```

View File

@ -1,41 +1,85 @@
'use strict';
// server-side socket behaviour
// io is a variable already taken in express
var ios = null;
var ios = null; // io is already taken in express
var util = require('bitcore').util;
var mdb = require('../../lib/MessageDb').default();
var microtime = require('microtime');
var enableMessageBroker;
module.exports.init = function(app, io_ext) {
module.exports.init = function(io_ext, config) {
enableMessageBroker = config ? config.enableMessageBroker : false;
ios = io_ext;
ios.sockets.on('connection', function(socket) {
socket.on('subscribe', function(topic) {
socket.join(topic);
if (ios) {
// when a new socket connects
ios.sockets.on('connection', function(socket) {
// when it subscribes, make it join the according room
socket.on('subscribe', function(topic) {
if (socket.rooms.length === 1) {
console.log('subscribe to ' + topic);
socket.join(topic);
}
});
if (enableMessageBroker) {
// when it requests sync, send him all pending messages
socket.on('sync', function(ts) {
var rooms = socket.rooms;
if (rooms.length !== 2) {
socket.emit('insight-error', 'Must subscribe with public key before syncing');
return;
}
var to = rooms[1];
var upper_ts = Math.round(microtime.now());
mdb.getMessages(to, ts, upper_ts, function(err, messages) {
if (err) {
throw new Error('Couldn\'t get messages on sync request: ' + err);
}
for (var i = 0; i < messages.length; i++) {
broadcastMessage(messages[i], socket);
}
});
});
// when it sends a message, add it to db
socket.on('message', function(m) {
mdb.addMessage(m, function(err) {
if (err) {
throw new Error('Couldn\'t add message to database: ' + err);
}
});
});
}
});
if (enableMessageBroker)
mdb.on('message', broadcastMessage);
}
};
var simpleTx = function(tx) {
return {
txid: tx
};
};
var fullTx = function(tx) {
var t = {
txid: tx.txid,
size: tx.size,
};
// Outputs
var valueOut = 0;
tx.vout.forEach(function(o) {
valueOut += o.valueSat;
});
t.valueOut = (valueOut.toFixed(8) / util.COIN);
return t;
};
module.exports.broadcastTx = function(tx) {
if (ios) {
var t;
if (typeof tx === 'string') {
t = {
txid: tx
};
}
else {
t = {
txid: tx.txid,
size: tx.size,
};
// Outputs
var valueOut = 0;
tx.vout.forEach(function(o) {
valueOut += o.valueSat;
});
t.valueOut = (valueOut.toFixed(8)/util.COIN);
}
var t = (typeof tx === 'string') ? simpleTx(tx) : fullTx(tx);
ios.sockets.in('inv').emit('tx', t);
}
};
@ -55,3 +99,11 @@ module.exports.broadcastSyncInfo = function(historicSync) {
if (ios)
ios.sockets.in('sync').emit('status', historicSync);
};
var broadcastMessage = module.exports.broadcastMessage = function(message, socket) {
if (ios) {
var s = socket || ios.sockets.in(message.to);
s.emit('message', message);
}
}

View File

@ -76,46 +76,7 @@ var bitcoindConf = {
disableAgent: true
};
/*jshint multistr: true */
console.log(
'\n\
____ _ __ __ ___ _ \n\
/ _/___ _____(_)___ _/ /_ / /_ / | ____ (_)\n\
/ // __ \\/ ___/ / __ `/ __ \\/ __/ / /\| \| / __ \\/ / \n\
_/ // / / (__ ) / /_/ / / / / /_ / ___ |/ /_/ / / \n\
/___/_/ /_/____/_/\\__, /_/ /_/\\__/ /_/ |_/ .___/_/ \n\
/____/ /_/ \n\
\n\t\t\t\t\t\tv%s\n\
# Configuration:\n\
\t\tNetwork: %s\tINSIGHT_NETWORK\n\
\t\tDatabase Path: %s\tINSIGHT_DB\n\
\t\tSafe Confirmations: %s\tINSIGHT_SAFE_CONFIRMATIONS\n\
\t\tIgnore Cache: %s\tINSIGHT_IGNORE_CACHE\n\
# Bicoind Connection configuration:\n\
\t\tRPC Username: %s\tBITCOIND_USER\n\
\t\tRPC Password: %s\tBITCOIND_PASS\n\
\t\tRPC Protocol: %s\tBITCOIND_PROTO\n\
\t\tRPC Host: %s\tBITCOIND_HOST\n\
\t\tRPC Port: %s\tBITCOIND_PORT\n\
\t\tP2P Host: %s\tBITCOIND_P2P_HOST\n\
\t\tP2P Port: %s\tBITCOIND_P2P_PORT\n\
\t\tData Dir: %s\tBITCOIND_DATADIR\n\
\t\t%s\n\
\nChange setting by assigning the enviroment variables in the last column. Example:\n\
$ INSIGHT_NETWORK="testnet" BITCOIND_HOST="123.123.123.123" ./insight.js\
\n\n',
version,
network, home, safeConfirmations, ignoreCache ? 'yes' : 'no',
bitcoindConf.user,
bitcoindConf.pass ? 'Yes(hidden)' : 'No',
bitcoindConf.protocol,
bitcoindConf.host,
bitcoindConf.port,
bitcoindConf.p2pHost,
bitcoindConf.p2pPort,
dataDir + (network === 'testnet' ? '*' : ''), (network === 'testnet' ? '* (/testnet3 is added automatically)' : '')
);
var enableMessageBroker = process.env.ENABLE_MESSAGE_BROKER === 'true';
if (!fs.existsSync(db)) {
var err = fs.mkdirSync(db);
@ -128,6 +89,8 @@ if (!fs.existsSync(db)) {
}
module.exports = {
enableMessageBroker: enableMessageBroker,
version: version,
root: rootPath,
publicPath: process.env.INSIGHT_PUBLIC_PATH || false,
appName: 'Insight ' + env,

41841
examples/bitcore.js Normal file

File diff suppressed because one or more lines are too long

122
examples/messages.html Normal file
View File

@ -0,0 +1,122 @@
<!doctype html>
<html>
<head>
<title>Socket.IO chat</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font: 13px Helvetica, Arial;
}
form {
background: #000;
padding: 3px;
position: fixed;
bottom: 0;
width: 100%;
}
form input {
border: 0;
padding: 10px;
width: 40%;
margin-right: .5%;
}
form button {
width: 9%;
background: rgb(130, 224, 255);
border: none;
padding: 10px;
}
#messages {
list-style-type: none;
margin: 0;
padding: 0;
}
#messages li {
padding: 5px 10px;
}
#messages li:nth-child(odd) {
background: #eee;
}
</style>
</head>
<body>
<h3 id="pubkey">Generating public key...</h3>
<ul id="messages"></ul>
<form action="">
<input id="other" placeholder="other user's key" autocomplete="off" />
<input id="m" placeholder="Text message..." autocomplete="off" />
<button>Send</button>
</form>
<script src="http://localhost:3001/socket.io/socket.io.js"></script>
<script src="http://code.jquery.com/jquery-1.11.1.js"></script>
<script src="./bitcore.js"></script>
<script>
var fixedPK = prompt('Enter your private key or leave empty to generate one', 'a52e54d90c1ff4846e6f164e56405094cc96ecc0ea7732831e24418204c8ab55');
$(document).ready(function()
{
// load dependencies
var socket = io('http://localhost:3001');
var bitcore = require('bitcore');
var util = bitcore.util;
var Key = bitcore.Key;
var AuthMessage = bitcore.AuthMessage;
var Buffer = bitcore.Buffer;
// generate new identity
var pk = Key.generateSync();
if (fixedPK) {
pk.private = new Buffer(fixedPK, 'hex');
pk.regenerateSync();
}
console.log(pk.private.toString('hex'));
var pubkey = pk.public.toString('hex');
$('#pubkey').text('Your key: '+pubkey);
// show message
var show = function(from, text) {
$('#messages').append($('<li>').text(from+': ' + text));
};
// send chat handler
$('form').submit(function(e)
{
e.preventDefault();
var text = $('#m').val()
if (text.length === 0) {
return;
}
var otherPubkey = $('#other').val();
var data = AuthMessage.encode(otherPubkey, pk, text);
socket.emit('message', data);
show('You', text);
$('#m').val('');
return;
});
// receive chat handler
socket.emit('subscribe', pubkey);
var ts = undefined; // timestamp to sync, undefined syncs all
socket.emit('sync', ts);
socket.on('message', function(msg)
{
try {
var data = AuthMessage.decode(pk, msg);
} catch(e) {
alert(e);
}
show(msg.pubkey, data.payload);
});
});
</script>
</body>
</html>

View File

@ -15,6 +15,45 @@ var express = require('express'),
//Initializing system variables
var config = require('./config/config');
// text title
/*jshint multistr: true */
console.log(
'\n\
____ _ __ __ ___ _ \n\
/ _/___ _____(_)___ _/ /_ / /_ / | ____ (_)\n\
/ // __ \\/ ___/ / __ `/ __ \\/ __/ / /\| \| / __ \\/ / \n\
_/ // / / (__ ) / /_/ / / / / /_ / ___ |/ /_/ / / \n\
/___/_/ /_/____/_/\\__, /_/ /_/\\__/ /_/ |_/ .___/_/ \n\
/____/ /_/ \n\
\n\t\t\t\t\t\tv%s\n\
# Configuration:\n\
\tINSIGHT_NETWORK (Network): %s\n\
\tINSIGHT_DB (Database Path): %s\n\
\tINSIGHT_SAFE_CONFIRMATIONS (Safe Confirmations): %s\n\
\tINSIGHT_IGNORE_CACHE (Ignore Cache): %s\n\
# Bicoind Connection configuration:\n\
\tRPC Username: %s\t\tBITCOIND_USER\n\
\tRPC Password: %s\tBITCOIND_PASS\n\
\tRPC Protocol: %s\t\tBITCOIND_PROTO\n\
\tRPC Host: %s\t\tBITCOIND_HOST\n\
\tRPC Port: %s\t\t\tBITCOIND_PORT\n\
\tP2P Port: %s\t\t\tBITCOIND_P2P_PORT\n\
\tBITCOIND_DATADIR: %s\n\
\t%s\n\
\nChange setting by assigning the enviroment variables above. Example:\n\
$ INSIGHT_NETWORK="testnet" BITCOIND_HOST="123.123.123.123" ./insight.js\
\n\n',
config.version,
config.network, config.leveldb, config.safeConfirmations, config.ignoreCache ? 'yes' : 'no',
config.bitcoind.user,
config.bitcoind.pass ? 'Yes(hidden)' : 'No',
config.bitcoind.protocol,
config.bitcoind.host,
config.bitcoind.port,
config.bitcoind.p2pPort,
config.bitcoind.dataDir + (config.network === 'testnet' ? '*' : ''), (config.network === 'testnet' ? '* (/testnet3 is added automatically)' : '')
);
/**
* express app
*/
@ -32,8 +71,7 @@ var walk = function(path) {
if (/(.*)\.(js$)/.test(file)) {
require(newPath);
}
}
else if (stat.isDirectory()) {
} else if (stat.isDirectory()) {
walk(newPath);
}
});
@ -45,7 +83,9 @@ walk(models_path);
* p2pSync process
*/
var peerSync = new PeerSync({shouldBroadcast: true});
var peerSync = new PeerSync({
shouldBroadcast: true
});
if (!config.disableP2pSync) {
peerSync.run();
@ -60,16 +100,15 @@ var historicSync = new HistoricSync({
peerSync.historicSync = historicSync;
if (!config.disableHistoricSync) {
historicSync.start({}, function(err){
historicSync.start({}, function(err) {
if (err) {
var txt = 'ABORTED with error: ' + err.message;
console.log('[historic_sync] ' + txt);
}
if (peerSync) peerSync.allowReorgs = true;
});
}
else
if (peerSync) peerSync.allowReorgs = true;
} else
if (peerSync) peerSync.allowReorgs = true;
//express settings
@ -81,11 +120,11 @@ require('./config/routes')(expressApp);
// socket.io
var server = require('http').createServer(expressApp);
var ios = require('socket.io')(server);
require('./app/controllers/socket.js').init(expressApp, ios);
require('./app/controllers/socket.js').init(ios, config);
//Start the app by listening on <port>
server.listen(config.port, function(){
console.log('insight server listening on port %d in %s mode', server.address().port, process.env.NODE_ENV);
server.listen(config.port, function() {
console.log('insight server listening on port %d in %s mode', server.address().port, process.env.NODE_ENV);
});
//expose app

122
lib/MessageDb.js Normal file
View File

@ -0,0 +1,122 @@
'use strict';
var soop = require('soop');
var imports = soop.imports();
var levelup = require('levelup');
var config = require('../config/config');
var Rpc = imports.rpc || require('./Rpc');
var async = require('async');
var logger = require('./logger').logger;
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var microtime = require('microtime');
var bitcore = require('bitcore');
var AuthMessage = bitcore.AuthMessage;
var preconditions = require('preconditions').singleton();
var MESSAGE_PREFIX = 'msg-'; // msg-<recieving_pubkey>-<ts> => <message>
var MAX_OPEN_FILES = 500;
var CONCURRENCY = 5;
var d = logger.log;
var info = logger.info;
var db;
var MessageDb = function(opts) {
opts = opts || {};
this.path = config.leveldb + '/messages' + (opts.name ? ('-' + opts.name) : '');
this.db = opts.db || db || levelup(this.path, {
maxOpenFiles: MAX_OPEN_FILES,
valueEncoding: 'json'
});
this.initEvents();
db = this.db;
};
util.inherits(MessageDb, EventEmitter);
MessageDb.prototype.initEvents = function() {
if (db) return;
var self = this;
this.db.on('put', function(key, value) {
var data = {};
data.key = key;
data.value = value;
var message = MessageDb.fromStorage(data);
self.emit('message', message);
});
this.db.on('ready', function() {
//console.log('Database ready!');
});
};
MessageDb.prototype.close = function(cb) {
this.db.close(cb);
};
var messageKey = function(to, ts) {
preconditions.checkArgument(typeof to === 'string');
preconditions.checkArgument(to.length === 66);
preconditions.checkArgument(!ts || typeof ts === 'number');
if (!ts) ts = Math.round(microtime.now());
return MESSAGE_PREFIX + to.toString() + '-' + ts;
};
MessageDb.prototype.addMessage = function(m, cb) {
if (!this.authenticate(m)) {
cb(new Error('Authentication failed'));
return;
}
var key = messageKey(m.to);
var value = m;
this.db.put(key, value, cb);
};
MessageDb.prototype.authenticate = function(m) {
preconditions.checkArgument(m.pubkey);
preconditions.checkArgument(m.sig);
preconditions.checkArgument(m.encrypted);
var frompubkey = new Buffer(m.pubkey, 'hex');
var sig = new Buffer(m.sig, 'hex');
var encrypted = new Buffer(m.encrypted, 'hex');
return AuthMessage._verify(frompubkey, sig, encrypted);
};
MessageDb.fromStorage = function(data) {
var spl = data.key.split('-');
var to = spl[1];
var ts = +spl[2];
var message = data.value;
message.ts = ts;
message.to = to;
return message;
};
MessageDb.prototype.getMessages = function(to, lower_ts, upper_ts, cb) {
var list = [];
lower_ts = lower_ts || 1;
var opts = {
end: messageKey(to, lower_ts),
start: messageKey(to, upper_ts),
// limit: limit, TODO
reverse: true,
};
db.createReadStream(opts)
.on('data', function(data) {
var message = MessageDb.fromStorage(data);
list.push(message);
})
.on('error', function(err) {
return cb(err);
})
.on('end', function() {
return cb(null, list.reverse());
});
};
module.exports = soop(MessageDb);

View File

@ -1,6 +1,6 @@
'use strict';
var imports = require('soop').imports();
var imports = require('soop').imports();
@ -20,40 +20,42 @@ var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda
var CONCURRENCY = 10;
var DEFAULT_SAFE_CONFIRMATIONS = 6;
var MAX_OPEN_FILES = 500;
var END_OF_WORLD_TS = 1e13;
var MAX_OPEN_FILES = 500;
var END_OF_WORLD_TS = 1e13;
// var CONFIRMATION_NR_TO_NOT_CHECK = 10; //Spend
/**
* Module dependencies.
*/
* Module dependencies.
*/
var bitcore = require('bitcore'),
Rpc = imports.rpc || require('./Rpc'),
util = bitcore.util,
networks = bitcore.networks,
levelup = require('levelup'),
async = require('async'),
config = require('../config/config'),
assert = require('assert'),
Script = bitcore.Script,
bitcoreUtil = bitcore.util,
buffertools = require('buffertools');
var bitcore = require('bitcore'),
Rpc = imports.rpc || require('./Rpc'),
util = bitcore.util,
networks = bitcore.networks,
levelup = require('levelup'),
async = require('async'),
config = require('../config/config'),
assert = require('assert'),
Script = bitcore.Script,
bitcoreUtil = bitcore.util,
buffertools = require('buffertools');
var logger = require('./logger').logger;
var inf = logger.info;
var inf = logger.info;
var db = imports.db || levelup(config.leveldb + '/txs',{maxOpenFiles: MAX_OPEN_FILES} );
var PoolMatch = imports.poolMatch || require('soop').load('./PoolMatch',config);
var db = imports.db || levelup(config.leveldb + '/txs', {
maxOpenFiles: MAX_OPEN_FILES
});
var PoolMatch = imports.poolMatch || require('soop').load('./PoolMatch', config);
// This is 0.1.2 = > c++ version of base58-native
var base58 = require('base58-native').base58Check;
var encodedData = require('soop').load('bitcore/util/EncodedData',{
base58: base58
});
var versionedData= require('soop').load('bitcore/util/VersionedData',{
parent: encodedData
});
var base58 = require('base58-native').base58Check;
var encodedData = require('soop').load('bitcore/util/EncodedData', {
base58: base58
});
var versionedData = require('soop').load('bitcore/util/VersionedData', {
parent: encodedData
});
var Address = require('soop').load('bitcore/lib/Address',{
var Address = require('soop').load('bitcore/lib/Address', {
parent: versionedData
});
@ -76,7 +78,9 @@ TransactionDb.prototype.drop = function(cb) {
var path = config.leveldb + '/txs';
db.close(function() {
require('leveldown').destroy(path, function() {
db = levelup(path, {maxOpenFiles: 500});
db = levelup(path, {
maxOpenFiles: 500
});
return cb();
});
});
@ -278,13 +282,14 @@ TransactionDb.prototype.fromTxIdN = function(txid, n, cb) {
var k = OUTS_PREFIX + txid + '-' + n;
db.get(k, function(err, val) {
var ret;
var ret;
if (!val || (err && err.notFound)) {
err=null;
ret= { unconfirmedInput: 1 };
}
else {
err = null;
ret = {
unconfirmedInput: 1
};
} else {
var a = val.split(':');
ret = {
addr: a[0],
@ -312,7 +317,7 @@ TransactionDb.prototype.fromTxIdN = function(txid, n, cb) {
};
TransactionDb.prototype.deleteCacheForAddress = function(addr,cb) {
TransactionDb.prototype.deleteCacheForAddress = function(addr, cb) {
var k = ADDR_PREFIX + addr + '-';
var dbScript = [];
db.createReadStream({
@ -330,17 +335,17 @@ TransactionDb.prototype.deleteCacheForAddress = function(addr,cb) {
.on('error', function(err) {
return cb(err);
})
.on('end', function (){
db.batch(dbScript,cb);
.on('end', function() {
db.batch(dbScript, cb);
});
};
TransactionDb.prototype.cacheConfirmations = function(txouts,cb) {
TransactionDb.prototype.cacheConfirmations = function(txouts, cb) {
var self = this;
var dbScript=[];
for(var ii in txouts){
var txout=txouts[ii];
var dbScript = [];
for (var ii in txouts) {
var txout = txouts[ii];
//everything already cached?
if (txout.spentIsConfirmedCached) {
@ -354,15 +359,14 @@ TransactionDb.prototype.cacheConfirmations = function(txouts,cb) {
// if spent, we overwrite scriptPubKey cache (not needed anymore)
// First 1 = txout.isConfirmedCached (must be equal to 1 at this point)
infoToCache = [1, 1, txout.spentTxId, txout.spentIndex, txout.spentTs];
}
else {
if (!txout.isConfirmedCached) {
} else {
if (!txout.isConfirmedCached) {
infoToCache.push(1);
txout.confirmedWillBeCached=1;
txout.confirmedWillBeCached = 1;
}
}
//console.log('[TransactionDb.js.352:infoToCache:]',infoToCache); //TODO
if (infoToCache.length){
//console.log('[TransactionDb.js.352:infoToCache:]',infoToCache); //TODO
if (infoToCache.length) {
infoToCache.unshift(txout.value_sat);
dbScript.push({
@ -374,25 +378,24 @@ TransactionDb.prototype.cacheConfirmations = function(txouts,cb) {
}
}
//console.log('[TransactionDb.js.339:dbScript:]',dbScript); //TODO
db.batch(dbScript,cb);
//console.log('[TransactionDb.js.339:dbScript:]',dbScript); //TODO
db.batch(dbScript, cb);
};
TransactionDb.prototype.cacheScriptPubKey = function(txouts,cb) {
// console.log('[TransactionDb.js.381:cacheScriptPubKey:]'); //TODO
TransactionDb.prototype.cacheScriptPubKey = function(txouts, cb) {
// console.log('[TransactionDb.js.381:cacheScriptPubKey:]'); //TODO
var self = this;
var dbScript=[];
for(var ii in txouts){
var txout=txouts[ii];
var dbScript = [];
for (var ii in txouts) {
var txout = txouts[ii];
//everything already cached?
if (txout.scriptPubKeyCached || txout.spentTxId) {
continue;
}
if (txout.scriptPubKey) {
var infoToCache = [txout.value_sat,
(txout.isConfirmedCached || txout.confirmedWillBeCached)?1:0, txout.scriptPubKey];
var infoToCache = [txout.value_sat, (txout.isConfirmedCached || txout.confirmedWillBeCached) ? 1 : 0, txout.scriptPubKey];
dbScript.push({
type: 'put',
key: txout.key,
@ -400,7 +403,7 @@ TransactionDb.prototype.cacheScriptPubKey = function(txouts,cb) {
});
}
}
db.batch(dbScript,cb);
db.batch(dbScript, cb);
};
@ -424,12 +427,12 @@ TransactionDb.prototype._parseAddrData = function(k, data, ignoreCache) {
// v[1]== isConfirmedCached
// v[2]=== '1' -> is SpendCached -> [4]=spendTxId [5]=spentIndex [6]=spendTs
// v[2]!== '1' -> is ScriptPubkey -> [[2] = scriptPubkey
if (v[1]==='1'){
if (v[1] === '1') {
item.isConfirmed = 1;
item.isConfirmedCached = 1;
// console.log('[TransactionDb.js.356] CACHE HIT CONF:', item.key);
// Sent, confirmed
if (v[2] === '1'){
if (v[2] === '1') {
// console.log('[TransactionDb.js.356] CACHE HIT SPENT:', item.key);
item.spentIsConfirmed = 1;
item.spentIsConfirmedCached = 1;
@ -441,7 +444,7 @@ TransactionDb.prototype._parseAddrData = function(k, data, ignoreCache) {
else if (v[2]) {
item.scriptPubKey = v[2];
item.scriptPubKeyCached = 1;
// console.log('[TransactionDb.js.356] CACHE HIT SCRIPTPUBKEY:', item.key, v, item.scriptPubKey);
// console.log('[TransactionDb.js.356] CACHE HIT SCRIPTPUBKEY:', item.key, v, item.scriptPubKey);
}
}
return item;
@ -452,24 +455,26 @@ TransactionDb.prototype.fromAddr = function(addr, opts, cb) {
var self = this;
var k = ADDR_PREFIX + addr + '-';
var ret = [];
var unique={};
var unique = {};
db.createReadStream({
start: k,
end: k + '~',
limit: opts.txLimit>0 ? opts.txLimit: -1, // -1 means not limit
limit: opts.txLimit > 0 ? opts.txLimit : -1, // -1 means not limit
})
.on('data', function(data) {
var k = data.key.split('-');
var index = k[3]+k[4];
var index = k[3] + k[4];
if (!unique[index]) {
unique[index]=1;
unique[index] = 1;
ret.push(self._parseAddrData(k, data, opts.ignoreCache));
}
})
.on('error', cb)
.on('end', function() {
async.eachLimit(ret.filter(function(x){return !x.spentIsConfirmed;}), CONCURRENCY, function(o, e_c) {
async.eachLimit(ret.filter(function(x) {
return !x.spentIsConfirmed;
}), CONCURRENCY, function(o, e_c) {
var k = SPENT_PREFIX + o.txid + '-' + o.index + '-';
db.createReadStream({
start: k,
@ -488,18 +493,20 @@ TransactionDb.prototype.fromAddr = function(addr, opts, cb) {
});
};
TransactionDb.prototype._fromBuffer = function (buf) {
var buf2 = buffertools.reverse(buf);
return parseInt(buf2.toString('hex'), 16);
TransactionDb.prototype._fromBuffer = function(buf) {
var buf2 = buffertools.reverse(buf);
return parseInt(buf2.toString('hex'), 16);
};
TransactionDb.prototype.getStandardizedTx = function (tx, time, isCoinBase) {
TransactionDb.prototype.getStandardizedTx = function(tx, time, isCoinBase) {
var self = this;
tx.txid = bitcoreUtil.formatHashFull(tx.getHash());
var ti=0;
var ti = 0;
tx.vin = tx.ins.map(function(txin) {
var ret = {n: ti++};
var ret = {
n: ti++
};
if (isCoinBase) {
ret.isCoinBase = true;
} else {
@ -517,7 +524,9 @@ TransactionDb.prototype.getStandardizedTx = function (tx, time, isCoinBase) {
var addrs = new Address.fromScriptPubKey(s, config.network);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs && addrs.length === 1) {
val = {addresses: [addrs[0].toString() ] };
val = {
addresses: [addrs[0].toString()]
};
}
}
return {
@ -532,22 +541,23 @@ TransactionDb.prototype.getStandardizedTx = function (tx, time, isCoinBase) {
TransactionDb.prototype.fillScriptPubKey = function(txouts, cb) {
var self=this;
var self = this;
// Complete utxo info
async.eachLimit(txouts, CONCURRENCY, function (txout, a_c) {
async.eachLimit(txouts, CONCURRENCY, function(txout, a_c) {
self.fromIdInfoSimple(txout.txid, function(err, info) {
if (!info || !info.vout) return a_c(err);
txout.scriptPubKey = info.vout[txout.index].scriptPubKey.hex;
return a_c();
});
}, function(){
}, function() {
self.cacheScriptPubKey(txouts, cb);
});
};
TransactionDb.prototype.removeFromTxId = function(txid, cb) {
async.series([
function(c) {
db.createReadStream({
start: OUTS_PREFIX + txid + '-',
@ -579,15 +589,15 @@ TransactionDb.prototype.removeFromTxId = function(txid, cb) {
// relatedAddrs is an optional hash, to collect related addresses in the transaction
TransactionDb.prototype._addScript = function(tx, relatedAddrs) {
var dbScript = [];
var ts = tx.time;
var txid = tx.txid || tx.hash;
// var u=require('util');
// console.log('[TransactionDb.js.518]', u.inspect(tx,{depth:10})); //TODO
var dbScript = [];
var ts = tx.time;
var txid = tx.txid || tx.hash;
// var u=require('util');
// console.log('[TransactionDb.js.518]', u.inspect(tx,{depth:10})); //TODO
// Input Outpoints (mark them as spent)
for(var ii in tx.vin) {
for (var ii in tx.vin) {
var i = tx.vin[ii];
if (i.txid){
if (i.txid) {
var k = SPENT_PREFIX + i.txid + '-' + i.vout + '-' + txid + '-' + i.n;
dbScript.push({
type: 'put',
@ -597,27 +607,27 @@ TransactionDb.prototype._addScript = function(tx, relatedAddrs) {
}
}
for(var ii in tx.vout) {
for (var ii in tx.vout) {
var o = tx.vout[ii];
if ( o.scriptPubKey && o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported=> standard multisig
) {
var addr = o.scriptPubKey.addresses[0];
var sat = o.valueSat || ((o.value||0) * util.COIN).toFixed(0);
if (o.scriptPubKey && o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported=> standard multisig
) {
var addr = o.scriptPubKey.addresses[0];
var sat = o.valueSat || ((o.value || 0) * util.COIN).toFixed(0);
if (relatedAddrs) relatedAddrs[addr]=1;
var k = OUTS_PREFIX + txid + '-' + o.n;
var tsr = END_OF_WORLD_TS - ts;
dbScript.push({
type: 'put',
key: k,
value: addr + ':' + sat,
},{
type: 'put',
key: ADDR_PREFIX + addr + '-' + tsr + '-'+ txid + '-' + o.n,
value: sat,
});
}
if (relatedAddrs) relatedAddrs[addr] = 1;
var k = OUTS_PREFIX + txid + '-' + o.n;
var tsr = END_OF_WORLD_TS - ts;
dbScript.push({
type: 'put',
key: k,
value: addr + ':' + sat,
}, {
type: 'put',
key: ADDR_PREFIX + addr + '-' + tsr + '-' + txid + '-' + o.n,
value: sat,
});
}
}
return dbScript;
};
@ -626,12 +636,14 @@ TransactionDb.prototype._addScript = function(tx, relatedAddrs) {
TransactionDb.prototype.add = function(tx, cb) {
var relatedAddrs = {};
var dbScript = this._addScript(tx, relatedAddrs);
db.batch(dbScript, function(err) { return cb(err,relatedAddrs);});
db.batch(dbScript, function(err) {
return cb(err, relatedAddrs);
});
};
TransactionDb.prototype._addManyFromObjs = function(txs, next) {
var dbScript = [];
for(var ii in txs){
for (var ii in txs) {
var s = this._addScript(txs[ii]);
dbScript = dbScript.concat(s);
}
@ -639,17 +651,17 @@ TransactionDb.prototype._addManyFromObjs = function(txs, next) {
};
TransactionDb.prototype._addManyFromHashes = function(txs, next) {
var self=this;
var self = this;
var dbScript = [];
async.eachLimit(txs, CONCURRENCY, function(tx, each_cb) {
if (tx === genesisTXID)
return each_cb();
if (tx === genesisTXID)
return each_cb();
Rpc.getTxInfo(tx, function(err, inInfo) {
if (!inInfo) return each_cb(err);
dbScript = dbScript.concat(self._addScript(inInfo));
return each_cb();
});
Rpc.getTxInfo(tx, function(err, inInfo) {
if (!inInfo) return each_cb(err);
dbScript = dbScript.concat(self._addScript(inInfo));
return each_cb();
});
},
function(err) {
if (err) return next(err);
@ -661,10 +673,10 @@ TransactionDb.prototype._addManyFromHashes = function(txs, next) {
TransactionDb.prototype.addMany = function(txs, next) {
if (!txs) return next();
var fn = (typeof txs[0] ==='string') ?
var fn = (typeof txs[0] === 'string') ?
this._addManyFromHashes : this._addManyFromObjs;
return fn.apply(this,[txs, next]);
return fn.apply(this, [txs, next]);
};
@ -677,7 +689,7 @@ TransactionDb.prototype.getPoolInfo = function(txid, cb) {
if (txInfo && txInfo.isCoinBase)
ret = self.poolMatch.match(new Buffer(txInfo.vin[0].coinbase, 'hex'));
return cb(ret);
});
};
@ -685,16 +697,16 @@ TransactionDb.prototype.getPoolInfo = function(txid, cb) {
TransactionDb.prototype.checkVersion02 = function(cb) {
var k = 'txa-';
var isV2=1;
var isV2 = 1;
db.createReadStream({
start: k,
end: k + '~',
limit: 1,
})
.on('data', function(data) {
isV2=0;
isV2 = 0;
})
.on('end', function (){
.on('end', function() {
return cb(isV2);
});
};
@ -702,9 +714,9 @@ TransactionDb.prototype.checkVersion02 = function(cb) {
TransactionDb.prototype.migrateV02 = function(cb) {
var k = 'txa-';
var dbScript = [];
var c=0;
var c2=0;
var N=50000;
var c = 0;
var c2 = 0;
var N = 50000;
db.createReadStream({
start: k,
end: k + '~'
@ -714,22 +726,21 @@ TransactionDb.prototype.migrateV02 = function(cb) {
var v = data.value.split(':');
dbScript.push({
type: 'put',
key: ADDR_PREFIX + k[1] + '-' + (END_OF_WORLD_TS - parseInt(v[1]))
+ '-' + k[2] + '-' + k[3],
key: ADDR_PREFIX + k[1] + '-' + (END_OF_WORLD_TS - parseInt(v[1])) + '-' + k[2] + '-' + k[3],
value: v[0],
});
if (c++>N) {
console.log('\t%dM txs outs processed', ((c2+=N)/1e6).toFixed(3)); //TODO
db.batch(dbScript,function () {
c=0;
dbScript=[];
if (c++ > N) {
console.log('\t%dM txs outs processed', ((c2 += N) / 1e6).toFixed(3)); //TODO
db.batch(dbScript, function() {
c = 0;
dbScript = [];
});
}
})
.on('error', function(err) {
return cb(err);
})
.on('end', function (){
.on('end', function() {
return cb();
});
};

View File

@ -51,36 +51,39 @@
"start": "node node_modules/grunt-cli/bin/grunt"
},
"dependencies": {
"bitcore": "git://github.com/bitpay/bitcore.git#4d8af75ae9916984c52ee2eda1870d5980656341",
"base58-native": "0.1.2",
"async": "*",
"base58-native": "0.1.2",
"bignum": "*",
"bitcore": "git://github.com/bitpay/bitcore.git#aa41c70cff2583d810664c073a324376c39c8b36",
"bufferput": "git://github.com/bitpay/node-bufferput.git",
"buffertools": "*",
"commander": "*",
"express": "~3.4.7",
"glob": "*",
"leveldown": "*",
"levelup": "*",
"glob": "*",
"soop": "=0.1.5",
"commander": "*",
"bignum": "*",
"winston": "*",
"express": "~3.4.7",
"buffertools": "*",
"should": "~2.1.1",
"socket.io": "~1.0.4",
"moment": "~2.5.0",
"preconditions": "^1.0.7",
"should": "~2.1.1",
"sinon": "~1.7.3",
"xmlhttprequest": "~1.6.0",
"bufferput": "git://github.com/bitpay/node-bufferput.git"
"socket.io": "~1.0.4",
"soop": "=0.1.5",
"winston": "*",
"xmlhttprequest": "~1.6.0"
},
"devDependencies": {
"chai": "*",
"grunt": "~0.4.2",
"grunt-cli": "~0.1.11",
"grunt-env": "~0.4.1",
"grunt-concurrent": "~0.4.2",
"grunt-contrib-jshint": "~0.8.0",
"grunt-contrib-watch": "~0.5.3",
"grunt-concurrent": "~0.4.2",
"grunt-nodemon": "~0.2.0",
"grunt-env": "~0.4.1",
"grunt-markdown": "~0.5.0",
"grunt-mocha-test": "~0.8.1",
"should": "2.1.1",
"chai": "=1.9.1",
"grunt-markdown": "~0.5.0"
"grunt-nodemon": "~0.2.0",
"memdown": "^0.10.2",
"microtime": "^0.6.0",
"should": "2.1.1"
}
}

View File

@ -28,8 +28,7 @@ describe('PeerSync', function() {
it('should return with no errors', function() {
expect(function() {
ps.handleInv(inv_info);
}).not.to.
throw (Error);
}).not.to.throw(Error);
});
it('should call sendGetData', function() {
ps.handleInv(inv_info);

116
test/test.MessageDb.js Normal file
View File

@ -0,0 +1,116 @@
'use strict';
var chai = require('chai');
var should = chai.should;
var expect = chai.expect;
var levelup = require('levelup');
var memdown = require('memdown');
var microtime = require('microtime');
var MessageDb = require('../lib/MessageDb');
var bitcore = require('bitcore');
var SIN = bitcore.SIN;
var Key = bitcore.Key;
var AuthMessage = bitcore.AuthMessage;
describe('MessageDb', function() {
var opts = {
name: 'test-MessageDb',
db: levelup({
db: memdown,
sync: true,
valueEncoding: 'json'
})
};
it('should be able to create instance', function() {
var mdb = new MessageDb(opts);
expect(mdb).to.exist;
});
it('should be able to create default instance', function() {
var mdb = MessageDb.default();
expect(mdb).to.exist;
});
var fpk = Key.generateSync();
var tpk = Key.generateSync();
var from = fpk.public.toString('hex');
var to = tpk.public.toString('hex');
var messageData = {
a: 1,
b: 2
};
var messageData2 = {};
var messageData3 = ['a', 'b'];
var message = AuthMessage.encode(to, fpk, messageData);
var message2 = AuthMessage.encode(to, fpk, messageData2);
var message3 = AuthMessage.encode(to, fpk, messageData3);
it('should be able to add and read a message', function(done) {
var mdb = new MessageDb(opts);
var lower_ts = microtime.now();
mdb.addMessage(message, function(err) {
expect(err).to.not.exist;
var upper_ts = microtime.now();
mdb.getMessages(to, lower_ts, upper_ts, function(err, messages) {
expect(err).to.not.exist;
messages.length.should.equal(1);
messages[0].ts.should.be.below(upper_ts);
messages[0].ts.should.be.above(lower_ts);
var m = AuthMessage.decode(tpk, messages[0]).payload;
m.a.should.equal(1);
m.b.should.equal(2);
done();
});
});
});
var sharedMDB;
it('should be able to add many messages and read some', function(done) {
var mdb = new MessageDb(opts);
sharedMDB = mdb;
var lower_ts = microtime.now();
mdb.addMessage(message, function(err) {
expect(err).to.not.exist;
mdb.addMessage(message2, function(err) {
expect(err).to.not.exist;
var upper_ts = microtime.now();
setTimeout(function() {
mdb.addMessage(message3, function(err) {
expect(err).to.not.exist;
mdb.getMessages(to, lower_ts, upper_ts, function(err, messages) {
expect(err).to.not.exist;
messages.length.should.equal(2);
messages[0].ts.should.be.below(upper_ts);
messages[0].ts.should.be.above(lower_ts);
var m0 = AuthMessage.decode(tpk, messages[0]).payload;
JSON.stringify(m0).should.equal('{"a":1,"b":2}');
messages[1].ts.should.be.below(upper_ts);
messages[1].ts.should.be.above(lower_ts);
var m1 = AuthMessage.decode(tpk, messages[1]).payload;
JSON.stringify(m1).should.equal('{}');
done();
});
});
}, 10);
});
});
});
it('should be able to add many messages and read all', function(done) {
var mdb = sharedMDB;
mdb.getMessages(to, null, null, function(err, messages) {
expect(err).to.not.exist;
messages.length.should.equal(4);
var m0 = AuthMessage.decode(tpk, messages[0]).payload;
JSON.stringify(m0).should.equal('{"a":1,"b":2}');
var m1 = AuthMessage.decode(tpk, messages[1]).payload;
JSON.stringify(m1).should.equal('{"a":1,"b":2}');
var m2 = AuthMessage.decode(tpk, messages[2]).payload;
JSON.stringify(m2).should.equal('{}');
var m3 = AuthMessage.decode(tpk, messages[3]).payload;
JSON.stringify(m3).should.equal('["a","b"]');
done();
});
});
it('should be able to close instance', function() {
var mdb = new MessageDb(opts);
mdb.close();
expect(mdb).to.exist;
});
});

View File

@ -0,0 +1,30 @@
'use strict';
var chai = require('chai');
var should = chai.should;
var expect = chai.expect;
var sinon = require('sinon');
var socket = require('../app/controllers/socket');
var bitcore = require('bitcore');
var EventEmitter = require('events').EventEmitter;
describe('socket server', function() {
it('should be able to call init with no args', function() {
socket.init.should.not.throw();
});
it('should register socket handlers', function() {
var io = {
sockets: new EventEmitter()
}
socket.init(io);
var mockSocket = {};
mockSocket.on = sinon.spy();
io.sockets.emit('connection', mockSocket);
mockSocket.on.calledWith('subscribe');
mockSocket.on.calledWith('sync');
mockSocket.on.calledWith('message');
});
});