Wrap kBucket module inside cloud.js
This commit is contained in:
parent
62504d3a70
commit
39ca525822
@ -50,7 +50,7 @@ function processDataFromUser(data) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateFloID(data.receiverID))
|
if (!floCrypto.validateFloID(data.receiverID))
|
||||||
return reject(INVALID("Invalid receiverID"));
|
return reject(INVALID("Invalid receiverID"));
|
||||||
let closeNode = kBucket.closestNode(data.receiverID);
|
let closeNode = cloud.closestNode(data.receiverID);
|
||||||
if (!_list.serving.includes(closeNode))
|
if (!_list.serving.includes(closeNode))
|
||||||
return reject(INVALID("Incorrect Supernode"));
|
return reject(INVALID("Incorrect Supernode"));
|
||||||
if (!floCrypto.validateFloID(data.senderID))
|
if (!floCrypto.validateFloID(data.senderID))
|
||||||
@ -85,7 +85,7 @@ function processRequestFromUser(request) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateFloID(request.receiverID))
|
if (!floCrypto.validateFloID(request.receiverID))
|
||||||
return reject(INVALID("Invalid receiverID"));
|
return reject(INVALID("Invalid receiverID"));
|
||||||
let closeNode = kBucket.closestNode(request.receiverID);
|
let closeNode = cloud.closestNode(request.receiverID);
|
||||||
if (!_list.serving.includes(closeNode))
|
if (!_list.serving.includes(closeNode))
|
||||||
return reject(INVALID("Incorrect Supernode"));
|
return reject(INVALID("Incorrect Supernode"));
|
||||||
DB.searchData(closeNode, request)
|
DB.searchData(closeNode, request)
|
||||||
@ -98,7 +98,7 @@ function processTagFromUser(data) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateFloID(data.receiverID))
|
if (!floCrypto.validateFloID(data.receiverID))
|
||||||
return reject(INVALID("Invalid receiverID"));
|
return reject(INVALID("Invalid receiverID"));
|
||||||
let closeNode = kBucket.closestNode(data.receiverID);
|
let closeNode = cloud.closestNode(data.receiverID);
|
||||||
if (!_list.serving.includes(closeNode))
|
if (!_list.serving.includes(closeNode))
|
||||||
return reject(INVALID("Incorrect Supernode"));
|
return reject(INVALID("Incorrect Supernode"));
|
||||||
DB.getData(closeNode, data.vectorClock).then(result => {
|
DB.getData(closeNode, data.vectorClock).then(result => {
|
||||||
@ -129,7 +129,7 @@ function processNoteFromUser(data) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateFloID(data.receiverID))
|
if (!floCrypto.validateFloID(data.receiverID))
|
||||||
return reject(INVALID("Invalid receiverID"));
|
return reject(INVALID("Invalid receiverID"));
|
||||||
let closeNode = kBucket.closestNode(data.receiverID);
|
let closeNode = cloud.closestNode(data.receiverID);
|
||||||
if (!_list.serving.includes(closeNode))
|
if (!_list.serving.includes(closeNode))
|
||||||
return reject(INVALID("Incorrect Supernode"));
|
return reject(INVALID("Incorrect Supernode"));
|
||||||
DB.getData(closeNode, data.vectorClock).then(result => {
|
DB.getData(closeNode, data.vectorClock).then(result => {
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
module.exports = function K_Bucket(options = {}) {
|
function K_Bucket(masterID, nodeList) {
|
||||||
|
|
||||||
const decodeID = function(floID) {
|
const decodeID = function(floID) {
|
||||||
let k = bitjs.Base58.decode(floID);
|
let k = bitjs.Base58.decode(floID);
|
||||||
@ -13,16 +13,14 @@ module.exports = function K_Bucket(options = {}) {
|
|||||||
return nodeIdNewInt8Array;
|
return nodeIdNewInt8Array;
|
||||||
};
|
};
|
||||||
|
|
||||||
const list = options.list || Object.keys(floGlobals.supernodes);
|
|
||||||
const refID = options.masterID || floGlobals.SNStorageID;
|
|
||||||
const _KB = new BuildKBucket({
|
const _KB = new BuildKBucket({
|
||||||
localNodeId: decodeID(refID)
|
localNodeId: decodeID(masterID)
|
||||||
});
|
});
|
||||||
list.forEach(id => _KB.add({
|
nodeList.forEach(id => _KB.add({
|
||||||
id: decodeID(id),
|
id: decodeID(id),
|
||||||
floID: id
|
floID: id
|
||||||
}));
|
}));
|
||||||
const _CO = list.map(sn => [_KB.distance(decodeID(refID), decodeID(sn)), sn])
|
const _CO = nodeList.map(sn => [_KB.distance(decodeID(masterID), decodeID(sn)), sn])
|
||||||
.sort((a, b) => a[0] - b[0])
|
.sort((a, b) => a[0] - b[0])
|
||||||
.map(a => a[1]);
|
.map(a => a[1]);
|
||||||
|
|
||||||
@ -93,4 +91,23 @@ module.exports = function K_Bucket(options = {}) {
|
|||||||
.map(k => k.floID);
|
.map(k => k.floID);
|
||||||
return (N == 1 ? cNodes[0] : cNodes);
|
return (N == 1 ? cNodes[0] : cNodes);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var kBucket;
|
||||||
|
const cloud = module.exports = function Cloud(masterID, nodeList) {
|
||||||
|
kBucket = new K_Bucket(masterID, nodeList);
|
||||||
|
}
|
||||||
|
|
||||||
|
cloud.closestNode = (id, N = 1) => kBucket.closestNode(id, N);
|
||||||
|
cloud.prevNode = (id, N = 1) => kBucket.prevNode(id, N);
|
||||||
|
cloud.nextNode = (id, N = 1) => kBucket.nextNode(id, N);
|
||||||
|
cloud.innerNodes = (id1, id2) => kBucket.innerNodes(id1, id2);
|
||||||
|
cloud.outterNodes = (id1, id2) => kBucket.outterNodes(id1, id2);
|
||||||
|
Object.defineProperties(cloud, {
|
||||||
|
kb: {
|
||||||
|
get: () => kBucket
|
||||||
|
},
|
||||||
|
order: {
|
||||||
|
get: () => kBucket.order
|
||||||
|
}
|
||||||
|
});
|
||||||
28
src/intra.js
28
src/intra.js
@ -177,7 +177,7 @@ function connectToActiveNode(snID, reverse = false) {
|
|||||||
connectToNode(snID)
|
connectToNode(snID)
|
||||||
.then(ws => resolve(ws))
|
.then(ws => resolve(ws))
|
||||||
.catch(error => {
|
.catch(error => {
|
||||||
var next = (reverse ? kBucket.prevNode(snID) : kBucket.nextNode(snID));
|
var next = (reverse ? cloud.prevNode(snID) : cloud.nextNode(snID));
|
||||||
connectToActiveNode(next, reverse)
|
connectToActiveNode(next, reverse)
|
||||||
.then(ws => resolve(ws))
|
.then(ws => resolve(ws))
|
||||||
.catch(error => reject(error));
|
.catch(error => reject(error));
|
||||||
@ -190,7 +190,7 @@ function connectToNextNode(curNode = myFloID) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (curNode === myFloID && !(myFloID in floGlobals.supernodes))
|
if (curNode === myFloID && !(myFloID in floGlobals.supernodes))
|
||||||
return reject(`This (${myFloID}) is not a supernode`);
|
return reject(`This (${myFloID}) is not a supernode`);
|
||||||
let nextNodeID = kBucket.nextNode(curNode);
|
let nextNodeID = cloud.nextNode(curNode);
|
||||||
if (nextNodeID === myFloID)
|
if (nextNodeID === myFloID)
|
||||||
return reject("No other node online");
|
return reject("No other node online");
|
||||||
connectToNode(nextNodeID).then(ws => {
|
connectToNode(nextNodeID).then(ws => {
|
||||||
@ -336,7 +336,7 @@ function processTaskFromSupernode(packet, ws) {
|
|||||||
//Acknowledge handshake
|
//Acknowledge handshake
|
||||||
function handshakeMid(id, ws) {
|
function handshakeMid(id, ws) {
|
||||||
if (_prevNode.id && _prevNode.id in floGlobals.supernodes) {
|
if (_prevNode.id && _prevNode.id in floGlobals.supernodes) {
|
||||||
if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) {
|
if (cloud.innerNodes(_prevNode.id, myFloID).includes(id)) {
|
||||||
//close existing prev-node connection
|
//close existing prev-node connection
|
||||||
_prevNode.send(packet_.construct({
|
_prevNode.send(packet_.construct({
|
||||||
type: RECONNECT_NEXT_NODE
|
type: RECONNECT_NEXT_NODE
|
||||||
@ -364,7 +364,7 @@ function handshakeMid(id, ws) {
|
|||||||
if (!_nextNode.id)
|
if (!_nextNode.id)
|
||||||
reconnectNextNode();
|
reconnectNextNode();
|
||||||
//Reorder storelist
|
//Reorder storelist
|
||||||
let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID),
|
let nodes = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID),
|
||||||
req_sync = [],
|
req_sync = [],
|
||||||
new_order = [];
|
new_order = [];
|
||||||
nodes.forEach(n => {
|
nodes.forEach(n => {
|
||||||
@ -456,7 +456,7 @@ function reconnectNextNode() {
|
|||||||
function orderBackup(order) {
|
function orderBackup(order) {
|
||||||
let new_order = [],
|
let new_order = [],
|
||||||
req_sync = [];
|
req_sync = [];
|
||||||
let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID);
|
let cur_serve = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID);
|
||||||
for (let n in order) {
|
for (let n in order) {
|
||||||
if (!cur_serve.includes(n) && order[n] + 1 !== _list[n] && n in floGlobals.supernodes) {
|
if (!cur_serve.includes(n) && order[n] + 1 !== _list[n] && n in floGlobals.supernodes) {
|
||||||
if (order[n] >= floGlobals.sn_config.backupDepth)
|
if (order[n] >= floGlobals.sn_config.backupDepth)
|
||||||
@ -546,7 +546,7 @@ function dataSyncIndication(snID, status, from) {
|
|||||||
|
|
||||||
//Store (backup) data
|
//Store (backup) data
|
||||||
function storeBackupData(data, from, packet) {
|
function storeBackupData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = cloud.closestNode(data.receiverID);
|
||||||
if (_list.stored.includes(closestNode)) {
|
if (_list.stored.includes(closestNode)) {
|
||||||
DB.storeData(closestNode, data).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closestNode, data).then(_ => null).catch(e => console.error(e));
|
||||||
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
||||||
@ -556,7 +556,7 @@ function storeBackupData(data, from, packet) {
|
|||||||
|
|
||||||
//Tag (backup) data
|
//Tag (backup) data
|
||||||
function tagBackupData(data, from, packet) {
|
function tagBackupData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = cloud.closestNode(data.receiverID);
|
||||||
if (_list.stored.includes(closestNode)) {
|
if (_list.stored.includes(closestNode)) {
|
||||||
DB.storeTag(closestNode, data).then(_ => null).catch(e => console.error(e));
|
DB.storeTag(closestNode, data).then(_ => null).catch(e => console.error(e));
|
||||||
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
||||||
@ -566,7 +566,7 @@ function tagBackupData(data, from, packet) {
|
|||||||
|
|
||||||
//Note (backup) data
|
//Note (backup) data
|
||||||
function noteBackupData(data, from, packet) {
|
function noteBackupData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = cloud.closestNode(data.receiverID);
|
||||||
if (_list.stored.includes(closestNode)) {
|
if (_list.stored.includes(closestNode)) {
|
||||||
DB.storeNote(closestNode, data).then(_ => null).catch(e => console.error(e));
|
DB.storeNote(closestNode, data).then(_ => null).catch(e => console.error(e));
|
||||||
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
||||||
@ -576,7 +576,7 @@ function noteBackupData(data, from, packet) {
|
|||||||
|
|
||||||
//Store (migrated) data
|
//Store (migrated) data
|
||||||
function storeMigratedData(data) {
|
function storeMigratedData(data) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = cloud.closestNode(data.receiverID);
|
||||||
if (_list.serving.includes(closestNode)) {
|
if (_list.serving.includes(closestNode)) {
|
||||||
DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e));
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
@ -588,7 +588,7 @@ function storeMigratedData(data) {
|
|||||||
|
|
||||||
//Delete (migrated) data
|
//Delete (migrated) data
|
||||||
function deleteMigratedData(data, from, packet) {
|
function deleteMigratedData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = cloud.closestNode(data.receiverID);
|
||||||
if (data.snID !== closestNode && _list.stored.includes(data.snID)) {
|
if (data.snID !== closestNode && _list.stored.includes(data.snID)) {
|
||||||
DB.deleteData(data.snID, data.vectorClock).then(_ => null).catch(e => console.error(e));
|
DB.deleteData(data.snID, data.vectorClock).then(_ => null).catch(e => console.error(e));
|
||||||
if (_list[data.snID] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
if (_list[data.snID] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
||||||
@ -626,14 +626,14 @@ function dataMigration(node_change, flag) {
|
|||||||
(node_change[n] ? new_nodes : del_nodes).push(n);
|
(node_change[n] ? new_nodes : del_nodes).push(n);
|
||||||
if (_prevNode.id && del_nodes.includes(_prevNode.id))
|
if (_prevNode.id && del_nodes.includes(_prevNode.id))
|
||||||
_prevNode.close();
|
_prevNode.close();
|
||||||
const old_kb = kBucket;
|
const old_kb = cloud.kb;
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
//reconnect next node if current next node is deleted
|
//reconnect next node if current next node is deleted
|
||||||
if (_nextNode.id) {
|
if (_nextNode.id) {
|
||||||
if (del_nodes.includes(_nextNode.id))
|
if (del_nodes.includes(_nextNode.id))
|
||||||
reconnectNextNode();
|
reconnectNextNode();
|
||||||
else { //reconnect next node if there are newly added nodes in between self and current next node
|
else { //reconnect next node if there are newly added nodes in between self and current next node
|
||||||
let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id);
|
let innerNodes = cloud.innerNodes(myFloID, _nextNode.id);
|
||||||
if (new_nodes.filter(n => innerNodes.includes(n)).length)
|
if (new_nodes.filter(n => innerNodes.includes(n)).length)
|
||||||
reconnectNextNode();
|
reconnectNextNode();
|
||||||
};
|
};
|
||||||
@ -667,7 +667,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) {
|
|||||||
console.log(`START: Data migration for ${n}`);
|
console.log(`START: Data migration for ${n}`);
|
||||||
//TODO: efficiently handle large number of data instead of loading all into memory
|
//TODO: efficiently handle large number of data instead of loading all into memory
|
||||||
result.forEach(d => {
|
result.forEach(d => {
|
||||||
let closest = kBucket.closestNode(d.receiverID);
|
let closest = cloud.closestNode(d.receiverID);
|
||||||
if (_list.serving.includes(closest)) {
|
if (_list.serving.includes(closest)) {
|
||||||
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
||||||
if (_nextNode.id)
|
if (_nextNode.id)
|
||||||
@ -716,7 +716,7 @@ dataMigration.process_new = async function(new_nodes) {
|
|||||||
DB.readAllData(n, 0).then(result => {
|
DB.readAllData(n, 0).then(result => {
|
||||||
//TODO: efficiently handle large number of data instead of loading all into memory
|
//TODO: efficiently handle large number of data instead of loading all into memory
|
||||||
result.forEach(d => {
|
result.forEach(d => {
|
||||||
let closest = kBucket.closestNode(d.receiverID);
|
let closest = cloud.closestNode(d.receiverID);
|
||||||
if (new_nodes.includes(closest)) {
|
if (new_nodes.includes(closest)) {
|
||||||
if (_list.serving.includes(closest)) {
|
if (_list.serving.includes(closest)) {
|
||||||
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
||||||
|
|||||||
@ -2,7 +2,7 @@ const config = require('../args/config.json');
|
|||||||
global.floGlobals = require("./floGlobals");
|
global.floGlobals = require("./floGlobals");
|
||||||
require('./set_globals');
|
require('./set_globals');
|
||||||
require('./lib');
|
require('./lib');
|
||||||
const K_Bucket = require('./kBucket');
|
global.cloud = require('./cloud');
|
||||||
global.floCrypto = require('./floCrypto');
|
global.floCrypto = require('./floCrypto');
|
||||||
global.floBlockchainAPI = require('./floBlockchainAPI');
|
global.floBlockchainAPI = require('./floBlockchainAPI');
|
||||||
const Database = require("./database");
|
const Database = require("./database");
|
||||||
@ -96,8 +96,8 @@ function refreshBlockchainData(base, flag) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
readSupernodeConfigFromAPI(base, flag).then(result => {
|
readSupernodeConfigFromAPI(base, flag).then(result => {
|
||||||
console.log(result);
|
console.log(result);
|
||||||
global.kBucket = new K_Bucket();
|
cloud(floGlobals.SNStorageID, Object.keys(floGlobals.supernodes));
|
||||||
console.log("SNCO:", kBucket.order);
|
console.log("NodeList (ordered):", cloud.order);
|
||||||
readAppSubAdminListFromAPI(base)
|
readAppSubAdminListFromAPI(base)
|
||||||
.then(result => console.log(result))
|
.then(result => console.log(result))
|
||||||
.catch(warn => console.warn(warn))
|
.catch(warn => console.warn(warn))
|
||||||
@ -252,7 +252,7 @@ function selfDiskMigration(node_change) {
|
|||||||
DB.dropTable(n).then(_ => null).catch(e => console.error(e));
|
DB.dropTable(n).then(_ => null).catch(e => console.error(e));
|
||||||
DB.readAllData(n, 0).then(result => {
|
DB.readAllData(n, 0).then(result => {
|
||||||
result.forEach(d => {
|
result.forEach(d => {
|
||||||
let closest = kBucket.closestNode(d.receiverID);
|
let closest = cloud.closestNode(d.receiverID);
|
||||||
if (closest !== n)
|
if (closest !== n)
|
||||||
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
|
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
|
||||||
});
|
});
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user