diff --git a/src/backupProcess.js b/src/backupProcess.js index 9264f06..0246166 100644 --- a/src/backupProcess.js +++ b/src/backupProcess.js @@ -1,4 +1,7 @@ 'use strict'; +const { + stat +} = require('fs'); const WebSocket = require('ws'); //CONSTANTS @@ -21,10 +24,10 @@ const SUPERNODE_INDICATOR = '$', BACKUP_HANDSHAKE_INIT = "handshakeInitate", BACKUP_HANDSHAKE_END = "handshakeEnd"; -/*Supernode Backup and migration functions*/ //const backupNodes = []; var backupDepth, supernodeList, appList, kBucket; +//List of node backups stored const _list = {}; Object.defineProperty(_list, 'delete', { value: function(id) { @@ -55,32 +58,77 @@ Object.defineProperty(_list, 'serving', { } }); -const _nextNode = {}; -Object.defineProperty(_nextNode, 'set', { - value: function(id, ws) { - this.id = id; - this.ws = ws; - } -}) -Object.defineProperty(_nextNode, 'send', { - value: function(packet) { - this.ws.send(packet); - } -}); +//Node container +function NodeContainer() { + var _ws, _id, _onmessage, _onclose; + Object.defineProperty(this, 'set', { + value: function(id, ws) { + if (_ws !== undefined) + this.close(); + _id = id; + _ws = ws; + if (_onmessage) + _ws.onmessage = _onmessage; + if (_onclose) + _ws.onclose = _onclose; + } + }) + Object.defineProperty(this, 'id', { + get: function() { + return _id; + } + }) + Object.defineProperty(this, 'readyState', { + get: function() { + if (_ws instanceof WebSocket) + return _ws.readyState; + else + return null; + } + }) + Object.defineProperty(this, 'send', { + value: function(packet) { + _ws.send(packet); + } + }); + Object.defineProperty(this, 'onmessage', { + set: function(fn) { + if (fn instanceof Function) + _onmessage = fn; + } + }) + Object.defineProperty(this, 'onclose', { + set: function(fn) { + if (fn instanceof Function) + _onclose = fn; + } + }) + Object.defineProperty(this, 'is', { + value: function(ws) { + return ws === _ws; + } + }) + Object.defineProperty(this, 'close', { + value: function() { + if (_ws.readyState === 1) { + _ws.onclose = () => console.warn('Closing: ' + _id) + _ws.close(); + } + _ws = _id = undefined; + } + }) +} -const _prevNode = {}; -Object.defineProperty(_prevNode, 'set', { - value: function(id, ws) { - this.id = id; - this.ws = ws; - } -}) -Object.defineProperty(_prevNode, 'send', { - value: function(packet) { - this.ws.send(packet); - } -}); +//Container for next-node +const _nextNode = new NodeContainer(); +_nextNode.onmessage = evt => processTaskFromNextNode(evt.data); +_nextNode.onclose = evt => reconnectNextNode(); +//Container for prev-node +const _prevNode = new NodeContainer(); +_prevNode.onmessage = evt => processTaskFromPrevNode(evt.data); +_prevNode.onclose = evt => _prevNode.close(); +//Packet processing const packet_ = {} packet_.constuct = function(message) { const packet = { @@ -103,6 +151,7 @@ packet_.parse = function(str) { } } +//Set parameters from blockchain function setBlockchainParameters(depth, supernodes, apps, KB, delay) { backupDepth = depth; supernodeList = supernodes; @@ -111,31 +160,137 @@ function setBlockchainParameters(depth, supernodes, apps, KB, delay) { delayTime = delay; } -function connectToNextNode(nodeID = myFloID) { +//-----NODE CONNECTORS (WEBSOCKET)----- + +//Connect to Node websocket +function connectToNode(snID) { return new Promise((resolve, reject) => { - let nextNodeID = kBucket.nextNode(nodeID); - + if (!(snID in supernodeList)) + return reject(`${snID} is not a supernode`) const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/"); - ws.on("error", () => { - connectToNextNode(nextNodeID) - .then(result => resolve(result)) - .catch(error => reject(error)) - }); - - ws.on('open', function open() { - _nextNode.set(nextNodeID, ws) - ws.send(packet_.constuct({ - type: BACKUP_HANDSHAKE_INIT - })); - resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID) - }); - - ws.on('message', function incoming(packet) { - processTaskFromNextNode(packet) - }); + ws.on("error", () => reject(`${snID} is offline`)); + ws.on('open', () => resolve(ws)); }) } +//Connect to Node websocket thats online +function connectToActiveNode(snID, reverse = false) { + return new Promise((resolve, reject) => { + if (!(snID in supernodeList)) + return reject(`${snID} is not a supernode`) + if (snID === myFloID) + return reject(`Reached end of circle. Next node avaiable is self`) + connectToNode(snID) + .then(ws => resolve(ws)) + .catch(error => { + var next = reverse ? kBucket.prevNode(snID) : kBucket.nextNode(snID) + connectToActiveNode(next, reverse) + .then(ws => resolve(ws)) + .catch(error => reject(error)) + }) + }) +} + +//Connect to next available node +function connectToNextNode() { + return new Promise((resolve, reject) => { + let nextNodeID = kBucket.nextNode(nodeID); + connectToActiveNode(nextNodeID).then(ws => { + _nextNode.set(nextNodeID, ws) + _nextNode.send(packet_.constuct({ + type: BACKUP_HANDSHAKE_INIT + })); + resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID) + }).catch(error => reject(error)) + }) +} + +//-----PROCESS TASKS----- + +//Tasks from next-node +function processTaskFromNextNode(packet) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available + reconnectNextNode() + break; + case BACKUP_HANDSHAKE_END: + handshakeEnd() + break; + case DATA_REQUEST: + sendStoredData(task.nodes, _nextNode) + break; + case DATA_SYNC: + dataSyncIndication(task.id, task.status, from) + break; + case STORE_BACKUP_DATA: + storeBackupData(task.data) + break; + } + }) + } +} + +//Tasks from prev-node +function processTaskFromPrevNode(packet) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case ORDER_BACKUP: + orderBackup(task.order) + break; + case STORE_BACKUP_DATA: + storeBackupData(task.data) + break; + case TAG_BACKUP_DATA: + tagBackupData(task.data) + break; + case DATA_REQUEST: + sendStoredData(task.nodes, _prevNode) + break; + case DATA_SYNC: + dataSyncIndication(task.id, task.status, from) + break; + case INITIATE_REFRESH: //TODO + initiateRefresh() + break; + } + }); + } +} + +//Tasks from any supernode +function processTaskFromSupernode(packet, ws) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case BACKUP_HANDSHAKE_INIT: + handshakeMid(from, ws) + break; + case STORE_MIGRATED_DATA: //TODO + storeMigratedData(task.data) + break; + } + }); + } +} + +//-----HANDSHAKE PROCESS----- + +//Acknowledge handshake function handshakeMid(id, ws) { if (_prevNode.id) { if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) { @@ -200,6 +355,7 @@ function handshakeMid(id, ws) { }) } +//Complete handshake function handshakeEnd() { console.log("Backup connected: " + _nextNode.id) _nextNode.send(packet_.constuct({ @@ -208,12 +364,17 @@ function handshakeEnd() { })) } +//Reconnect to next available node function reconnectNextNode() { + _nextNode.close(); connectToNextNode() .then(result => console.log(result)) .catch(error => console.error(error)) } +//-----BACKUP TASKS----- + +//Order the stored backup function orderBackup(order) { let new_order = []; for (let n in order) { @@ -234,102 +395,52 @@ function orderBackup(order) { } } +//Send stored data +function sendStoredData(lastlogs, node) { + for (n in lastlogs) { + if (_list.stored.includes(n)) { + db.getData(n, lastlogs[n]).then(result => { + node.send(packet_.constuct({ + type: DATA_SYNC, + id: n, + status: true + })) + //TODO: efficiently handle large number of data instead of loading all into memory + result.forEach(d => node.send(packet_.constuct({ + type: STORE_BACKUP_DATA, + data: d + }))) + node.send(packet_.constuct({ + type: DATA_SYNC, + id: n, + status: false + })) + }).catch(error => reject(error)) + } + } +} + +function dataSyncIndication(snID, status, from) { + console.info(`${status ? 'START':'END'}: ${snID} data sync(receive) form ${from}`); +} + +//Store (backup) data function storeBackupData(data) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) db.storeData(closestNode, data); } - +//tag (backup) data function tagBackupData(data) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) db.storeTag(closestNode, data); } -function processTaskFromNextNode(packet) { - var { - from, - message - } = packet_.parse(packet) - if (message) { - message.forEach(task => { - switch (task.type) { - case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available - reconnectNextNode(); - break; - case BACKUP_HANDSHAKE_END: - handshakeEnd(); - break; - case DATA_REQUEST: - sendStoredData(task, from) - break; - case DATA_SYNC: - dataSyncIndication(task, from) - break; - } - }) - } -} - -function processTaskFromPrevNode(packet) { - var { - from, - message - } = packet_.parse(packet) - if (message) { - message.forEach(task => { - switch (task.type) { - case ORDER_BACKUP: - orderBackup(task.order); - break; - case STORE_BACKUP_DATA: - storeBackupData(task.data) - break; - case TAG_BACKUP_DATA: - tagBackupData(task) - break; - case DATA_REQUEST: - sendStoredData(data.sn_msg.snID, data.from) - break; - case DATA_SYNC: - dataSyncIndication(data.sn_msg.snID, data.sn_msg.mode, data.from) - break; - case INITIATE_REFRESH: - initiateRefresh() - break; - } - }); - } -} - -function processTaskFromSupernode(packet, ws) { - var { - from, - message - } = packet_.parse(packet) - if (message) { - message.forEach(task => { - switch (task.type) { - case BACKUP_HANDSHAKE_INIT: - handshakeMid(from, ws) - break; - case STORE_MIGRATED_DATA: - storeMigratedData(data.sn_msg) - break; - } - }); - } -} - -function processTask(packet, ws) { - if (_prevNode.is(ws)) - processTaskFromPrevNode(packet) - else - processTaskFromSupernode(packet, ws) -} +//-----EXPORTS----- module.exports = { - processTask, + processTaskFromSupernode, setBlockchainParameters, SUPERNODE_INDICATOR } \ No newline at end of file diff --git a/src/database.js b/src/database.js index 4acecb0..5bf3d8b 100644 --- a/src/database.js +++ b/src/database.js @@ -1,52 +1,36 @@ 'use strict'; var mysql = require('mysql'); -(function() { - var db = module.exports = {} +const H_struct = { + VECTOR_CLOCK: "vectorClock", + SENDER_ID: "senderID", + RECEIVER_ID: "receiverID", + TYPE: "type", + APPLICATION: "application" +} - const H_struct = { - VECTOR_CLOCK: "vectorClock", - SENDER_ID: "senderID", - RECEIVER_ID: "receiverID", - TYPE: "type", - APPLICATION: "application" - } +const B_struct = { + TIME: "time", + MESSAGE: "message", + SIGNATURE: "sign", + PUB_KEY: "pubKey", + COMMENT: "comment" +} - const B_struct = { - TIME: "time", - MESSAGE: "message", - SIGNATURE: "sign", - PUB_KEY: "pubKey", - COMMENT: "comment" - } +const L_struct = { + STATUS: "status_n", + LOG_TIME: "log_time" +} - const L_struct = { - STATUS: "status_n", - LOG_TIME: "log_time" - } +const T_struct = { + TAG: "tag", + TAG_TIME: "tag_time", + TAG_KEY: "tag_key", + TAG_SIGN: "tag_sign" +} - const T_struct = { - TAG: "tag", - TAG_TIME: "tag_time", - TAG_KEY: "tag_key", - TAG_SIGN: "tag_sign" - } - - db.connect = function(user, password, db, host = 'localhost') { - return new Promise((resolve, reject) => { - let conn = mysql.createConnection({ - host: host, - user: user, - password: password, - database: db - }); - conn.connect((err) => { - if (err) return reject(err); - db.conn = conn; - resolve("Connected") - }) - }) - } +function Database(user, password, dbname, host = 'localhost') { + const db = {}; db.createTable = function(snID) { return new Promise((resolve, reject) => { @@ -195,4 +179,24 @@ var mysql = require('mysql'); db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); }) } -})() \ No newline at end of file + + db.close = function() { + db.conn.end(); + } + + return new Promise((resolve, reject) => { + let conn = mysql.createConnection({ + host: host, + user: user, + password: password, + database: dbname + }); + conn.connect((err) => { + if (err) return reject(err); + db.conn = conn; + resolve(db) + }); + }) +} + +module.exports = Database; \ No newline at end of file diff --git a/src/server.js b/src/server.js index 646952e..6bf1fbb 100644 --- a/src/server.js +++ b/src/server.js @@ -39,12 +39,13 @@ const wsServer = new WebSocket.Server({ server }); wsServer.on('connection', function connection(ws) { - ws.on('message', message => { + ws.onmessage = function(evt){ + let message = evt.data; if (message.startsWith(backupProcess.SUPERNODE_INDICATOR)) - backupProcess.processTask(message, ws); + backupProcess.processTaskFromSupernode(message, ws); else supernode.processRequestFromUser(JSON.parse(message)) .then(result => ws.send(JSON.parse(result[0]))) .catch(error => ws.send(error)) - }); + } }); \ No newline at end of file