diff --git a/src/client.js b/src/client.js index 9afa7d2..06cdb0a 100644 --- a/src/client.js +++ b/src/client.js @@ -50,7 +50,7 @@ function processDataFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateFloID(data.receiverID)) return reject(INVALID("Invalid receiverID")); - let closeNode = kBucket.closestNode(data.receiverID); + let closeNode = cloud.closestNode(data.receiverID); if (!_list.serving.includes(closeNode)) return reject(INVALID("Incorrect Supernode")); if (!floCrypto.validateFloID(data.senderID)) @@ -85,7 +85,7 @@ function processRequestFromUser(request) { return new Promise((resolve, reject) => { if (!floCrypto.validateFloID(request.receiverID)) return reject(INVALID("Invalid receiverID")); - let closeNode = kBucket.closestNode(request.receiverID); + let closeNode = cloud.closestNode(request.receiverID); if (!_list.serving.includes(closeNode)) return reject(INVALID("Incorrect Supernode")); DB.searchData(closeNode, request) @@ -98,7 +98,7 @@ function processTagFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateFloID(data.receiverID)) return reject(INVALID("Invalid receiverID")); - let closeNode = kBucket.closestNode(data.receiverID); + let closeNode = cloud.closestNode(data.receiverID); if (!_list.serving.includes(closeNode)) return reject(INVALID("Incorrect Supernode")); DB.getData(closeNode, data.vectorClock).then(result => { @@ -129,7 +129,7 @@ function processNoteFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateFloID(data.receiverID)) return reject(INVALID("Invalid receiverID")); - let closeNode = kBucket.closestNode(data.receiverID); + let closeNode = cloud.closestNode(data.receiverID); if (!_list.serving.includes(closeNode)) return reject(INVALID("Incorrect Supernode")); DB.getData(closeNode, data.vectorClock).then(result => { diff --git a/src/kBucket.js b/src/cloud.js similarity index 77% rename from src/kBucket.js rename to src/cloud.js index 8083c1b..57a6fa7 100644 --- a/src/kBucket.js +++ b/src/cloud.js @@ -1,6 +1,6 @@ 'use strict'; -module.exports = function K_Bucket(options = {}) { +function K_Bucket(masterID, nodeList) { const decodeID = function(floID) { let k = bitjs.Base58.decode(floID); @@ -13,16 +13,14 @@ module.exports = function K_Bucket(options = {}) { return nodeIdNewInt8Array; }; - const list = options.list || Object.keys(floGlobals.supernodes); - const refID = options.masterID || floGlobals.SNStorageID; const _KB = new BuildKBucket({ - localNodeId: decodeID(refID) + localNodeId: decodeID(masterID) }); - list.forEach(id => _KB.add({ + nodeList.forEach(id => _KB.add({ id: decodeID(id), floID: id })); - const _CO = list.map(sn => [_KB.distance(decodeID(refID), decodeID(sn)), sn]) + const _CO = nodeList.map(sn => [_KB.distance(decodeID(masterID), decodeID(sn)), sn]) .sort((a, b) => a[0] - b[0]) .map(a => a[1]); @@ -93,4 +91,23 @@ module.exports = function K_Bucket(options = {}) { .map(k => k.floID); return (N == 1 ? cNodes[0] : cNodes); }; -} \ No newline at end of file +} + +var kBucket; +const cloud = module.exports = function Cloud(masterID, nodeList) { + kBucket = new K_Bucket(masterID, nodeList); +} + +cloud.closestNode = (id, N = 1) => kBucket.closestNode(id, N); +cloud.prevNode = (id, N = 1) => kBucket.prevNode(id, N); +cloud.nextNode = (id, N = 1) => kBucket.nextNode(id, N); +cloud.innerNodes = (id1, id2) => kBucket.innerNodes(id1, id2); +cloud.outterNodes = (id1, id2) => kBucket.outterNodes(id1, id2); +Object.defineProperties(cloud, { + kb: { + get: () => kBucket + }, + order: { + get: () => kBucket.order + } +}); \ No newline at end of file diff --git a/src/intra.js b/src/intra.js index b784327..6f29104 100644 --- a/src/intra.js +++ b/src/intra.js @@ -177,7 +177,7 @@ function connectToActiveNode(snID, reverse = false) { connectToNode(snID) .then(ws => resolve(ws)) .catch(error => { - var next = (reverse ? kBucket.prevNode(snID) : kBucket.nextNode(snID)); + var next = (reverse ? cloud.prevNode(snID) : cloud.nextNode(snID)); connectToActiveNode(next, reverse) .then(ws => resolve(ws)) .catch(error => reject(error)); @@ -190,7 +190,7 @@ function connectToNextNode(curNode = myFloID) { return new Promise((resolve, reject) => { if (curNode === myFloID && !(myFloID in floGlobals.supernodes)) return reject(`This (${myFloID}) is not a supernode`); - let nextNodeID = kBucket.nextNode(curNode); + let nextNodeID = cloud.nextNode(curNode); if (nextNodeID === myFloID) return reject("No other node online"); connectToNode(nextNodeID).then(ws => { @@ -336,7 +336,7 @@ function processTaskFromSupernode(packet, ws) { //Acknowledge handshake function handshakeMid(id, ws) { if (_prevNode.id && _prevNode.id in floGlobals.supernodes) { - if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) { + if (cloud.innerNodes(_prevNode.id, myFloID).includes(id)) { //close existing prev-node connection _prevNode.send(packet_.construct({ type: RECONNECT_NEXT_NODE @@ -364,7 +364,7 @@ function handshakeMid(id, ws) { if (!_nextNode.id) reconnectNextNode(); //Reorder storelist - let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID), + let nodes = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID), req_sync = [], new_order = []; nodes.forEach(n => { @@ -456,7 +456,7 @@ function reconnectNextNode() { function orderBackup(order) { let new_order = [], req_sync = []; - let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID); + let cur_serve = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID); for (let n in order) { if (!cur_serve.includes(n) && order[n] + 1 !== _list[n] && n in floGlobals.supernodes) { if (order[n] >= floGlobals.sn_config.backupDepth) @@ -546,7 +546,7 @@ function dataSyncIndication(snID, status, from) { //Store (backup) data function storeBackupData(data, from, packet) { - let closestNode = kBucket.closestNode(data.receiverID); + let closestNode = cloud.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { DB.storeData(closestNode, data).then(_ => null).catch(e => console.error(e)); if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) @@ -556,7 +556,7 @@ function storeBackupData(data, from, packet) { //Tag (backup) data function tagBackupData(data, from, packet) { - let closestNode = kBucket.closestNode(data.receiverID); + let closestNode = cloud.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { DB.storeTag(closestNode, data).then(_ => null).catch(e => console.error(e)); if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) @@ -566,7 +566,7 @@ function tagBackupData(data, from, packet) { //Note (backup) data function noteBackupData(data, from, packet) { - let closestNode = kBucket.closestNode(data.receiverID); + let closestNode = cloud.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { DB.storeNote(closestNode, data).then(_ => null).catch(e => console.error(e)); if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) @@ -576,7 +576,7 @@ function noteBackupData(data, from, packet) { //Store (migrated) data function storeMigratedData(data) { - let closestNode = kBucket.closestNode(data.receiverID); + let closestNode = cloud.closestNode(data.receiverID); if (_list.serving.includes(closestNode)) { DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e)); _nextNode.send(packet_.construct({ @@ -588,7 +588,7 @@ function storeMigratedData(data) { //Delete (migrated) data function deleteMigratedData(data, from, packet) { - let closestNode = kBucket.closestNode(data.receiverID); + let closestNode = cloud.closestNode(data.receiverID); if (data.snID !== closestNode && _list.stored.includes(data.snID)) { DB.deleteData(data.snID, data.vectorClock).then(_ => null).catch(e => console.error(e)); if (_list[data.snID] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) @@ -626,14 +626,14 @@ function dataMigration(node_change, flag) { (node_change[n] ? new_nodes : del_nodes).push(n); if (_prevNode.id && del_nodes.includes(_prevNode.id)) _prevNode.close(); - const old_kb = kBucket; + const old_kb = cloud.kb; setTimeout(() => { //reconnect next node if current next node is deleted if (_nextNode.id) { if (del_nodes.includes(_nextNode.id)) reconnectNextNode(); else { //reconnect next node if there are newly added nodes in between self and current next node - let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id); + let innerNodes = cloud.innerNodes(myFloID, _nextNode.id); if (new_nodes.filter(n => innerNodes.includes(n)).length) reconnectNextNode(); }; @@ -667,7 +667,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) { console.log(`START: Data migration for ${n}`); //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => { - let closest = kBucket.closestNode(d.receiverID); + let closest = cloud.closestNode(d.receiverID); if (_list.serving.includes(closest)) { DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); if (_nextNode.id) @@ -716,7 +716,7 @@ dataMigration.process_new = async function(new_nodes) { DB.readAllData(n, 0).then(result => { //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => { - let closest = kBucket.closestNode(d.receiverID); + let closest = cloud.closestNode(d.receiverID); if (new_nodes.includes(closest)) { if (_list.serving.includes(closest)) { DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); diff --git a/src/main.js b/src/main.js index b64c0b9..615baf3 100644 --- a/src/main.js +++ b/src/main.js @@ -2,7 +2,7 @@ const config = require('../args/config.json'); global.floGlobals = require("./floGlobals"); require('./set_globals'); require('./lib'); -const K_Bucket = require('./kBucket'); +global.cloud = require('./cloud'); global.floCrypto = require('./floCrypto'); global.floBlockchainAPI = require('./floBlockchainAPI'); const Database = require("./database"); @@ -96,8 +96,8 @@ function refreshBlockchainData(base, flag) { return new Promise((resolve, reject) => { readSupernodeConfigFromAPI(base, flag).then(result => { console.log(result); - global.kBucket = new K_Bucket(); - console.log("SNCO:", kBucket.order); + cloud(floGlobals.SNStorageID, Object.keys(floGlobals.supernodes)); + console.log("NodeList (ordered):", cloud.order); readAppSubAdminListFromAPI(base) .then(result => console.log(result)) .catch(warn => console.warn(warn)) @@ -252,7 +252,7 @@ function selfDiskMigration(node_change) { DB.dropTable(n).then(_ => null).catch(e => console.error(e)); DB.readAllData(n, 0).then(result => { result.forEach(d => { - let closest = kBucket.closestNode(d.receiverID); + let closest = cloud.closestNode(d.receiverID); if (closest !== n) DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); });