Backup sync improvements
- Sync data block by block - Resync blocks with hash mismatch - moved intra into backup folder
This commit is contained in:
parent
6dff51560a
commit
f748bc7a4a
@ -1,28 +1,16 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
const keys = require('./keys');
|
const { DB } = require('../database');
|
||||||
|
const keys = require('../keys');
|
||||||
|
const TYPE_ = require('./message_types.json');
|
||||||
|
const sync = require('./sync');
|
||||||
|
|
||||||
//CONSTANTS
|
//CONSTANTS
|
||||||
const SUPERNODE_INDICATOR = '$',
|
const SUPERNODE_INDICATOR = '$',
|
||||||
//Message type
|
|
||||||
ORDER_BACKUP = "orderBackup",
|
|
||||||
STORE_BACKUP_DATA = "backupData",
|
|
||||||
STORE_MIGRATED_DATA = "migratedData",
|
|
||||||
DELETE_MIGRATED_DATA = "migratedDelete",
|
|
||||||
//DELETE_BACKUP_DATA = "backupDelete",
|
|
||||||
TAG_BACKUP_DATA = "backupTag",
|
|
||||||
NOTE_BACKUP_DATA = "backupNote",
|
|
||||||
//EDIT_BACKUP_DATA = "backupEdit",
|
|
||||||
INITIATE_REFRESH = "initiateRefresh",
|
|
||||||
DATA_REQUEST = "dataRequest",
|
|
||||||
DATA_SYNC = "dataSync",
|
|
||||||
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
|
|
||||||
BACKUP_HANDSHAKE_END = "handshakeEnd",
|
|
||||||
RECONNECT_NEXT_NODE = "reconnectNextNode",
|
|
||||||
RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins
|
RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins
|
||||||
MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins
|
MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins
|
||||||
|
|
||||||
var DB, refresher; //container for database and refresher
|
var refresher; //container for and refresher
|
||||||
|
|
||||||
//List of node backups stored
|
//List of node backups stored
|
||||||
const _list = {};
|
const _list = {};
|
||||||
@ -197,7 +185,7 @@ function connectToNextNode(curNode = keys.node_id) {
|
|||||||
connectToNode(nextNodeID).then(ws => {
|
connectToNode(nextNodeID).then(ws => {
|
||||||
_nextNode.set(nextNodeID, ws);
|
_nextNode.set(nextNodeID, ws);
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: BACKUP_HANDSHAKE_INIT
|
type_: TYPE_.BACKUP_HANDSHAKE_INIT
|
||||||
}));
|
}));
|
||||||
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID);
|
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID);
|
||||||
}).catch(error => {
|
}).catch(error => {
|
||||||
@ -245,24 +233,25 @@ function processTaskFromNextNode(packet) {
|
|||||||
} = packet_.parse(packet);
|
} = packet_.parse(packet);
|
||||||
if (message) {
|
if (message) {
|
||||||
message.forEach(task => {
|
message.forEach(task => {
|
||||||
switch (task.type) {
|
switch (task.type_) {
|
||||||
case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available
|
case TYPE_.RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available
|
||||||
reconnectNextNode();
|
reconnectNextNode(); break;
|
||||||
break;
|
case TYPE_.BACKUP_HANDSHAKE_END:
|
||||||
case BACKUP_HANDSHAKE_END:
|
handshakeEnd(); break;
|
||||||
handshakeEnd();
|
case TYPE_.REQ_HASH:
|
||||||
break;
|
sync.sendBlockHashes(task.node_i, _nextNode); break;
|
||||||
case DATA_REQUEST:
|
case TYPE_.RES_HASH:
|
||||||
sendStoredData(task.nodes, _nextNode);
|
sync.checkBlockHash(task.node_i, task.block_n, task.hash); break;
|
||||||
break;
|
case TYPE_.VAL_LAST_BLK:
|
||||||
case DATA_SYNC:
|
sync.setLastBlock(task.node_i, task.block_n); break;
|
||||||
dataSyncIndication(task.id, task.status, from);
|
case TYPE_.REQ_BLOCK:
|
||||||
break;
|
sync.sendBlockData(task.node_i, task.block_n, _nextNode); break;
|
||||||
case STORE_BACKUP_DATA:
|
case TYPE_.INDICATE_BLK:
|
||||||
storeBackupData(task.data, from, packet);
|
sync.syncIndicator(task.node_i, task.block_n, task.status, from); break;
|
||||||
break;
|
case TYPE_.STORE_BACKUP_DATA:
|
||||||
|
storeBackupData(task.data, from, packet); break;
|
||||||
default:
|
default:
|
||||||
console.warn("Invalid task type:" + task.type + "from next-node");
|
console.warn("Invalid task type_:" + task.type_ + "from next-node");
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -277,30 +266,29 @@ function processTaskFromPrevNode(packet) {
|
|||||||
} = packet_.parse(packet);
|
} = packet_.parse(packet);
|
||||||
if (message) {
|
if (message) {
|
||||||
message.forEach(task => {
|
message.forEach(task => {
|
||||||
switch (task.type) {
|
switch (task.type_) {
|
||||||
case ORDER_BACKUP:
|
case TYPE_.ORDER_BACKUP:
|
||||||
orderBackup(task.order);
|
orderBackup(task.order); break;
|
||||||
break;
|
case TYPE_.STORE_BACKUP_DATA:
|
||||||
case STORE_BACKUP_DATA:
|
storeBackupData(task.data, from, packet); break;
|
||||||
storeBackupData(task.data, from, packet);
|
case TYPE_.TAG_BACKUP_DATA:
|
||||||
break;
|
tagBackupData(task.data, from, packet); break;
|
||||||
case TAG_BACKUP_DATA:
|
case TYPE_.NOTE_BACKUP_DATA:
|
||||||
tagBackupData(task.data, from, packet);
|
noteBackupData(task.data, from, packet); break;
|
||||||
break;
|
case TYPE_.REQ_HASH:
|
||||||
case NOTE_BACKUP_DATA:
|
sync.sendBlockHashes(task.node_i, _nextNode); break;
|
||||||
noteBackupData(task.data, from, packet);
|
case TYPE_.RES_HASH:
|
||||||
break;
|
sync.checkBlockHash(task.node_i, task.block_n, task.hash); break;
|
||||||
case DATA_REQUEST:
|
case TYPE_.VAL_LAST_BLK:
|
||||||
sendStoredData(task.nodes, _prevNode);
|
sync.setLastBlock(task.node_i, task.block_n); break;
|
||||||
break;
|
case TYPE_.REQ_BLOCK:
|
||||||
case DATA_SYNC:
|
sync.sendBlockData(task.node_i, task.block_n, _nextNode); break;
|
||||||
dataSyncIndication(task.id, task.status, from);
|
case TYPE_.INDICATE_BLK:
|
||||||
break;
|
sync.syncIndicator(task.node_i, task.block_n, task.status, from); break;
|
||||||
case DELETE_MIGRATED_DATA:
|
case TYPE_.DELETE_MIGRATED_DATA:
|
||||||
deleteMigratedData(task.data, from, packet);
|
deleteMigratedData(task.data, from, packet); break;
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
console.warn("Invalid task type:" + task.type + "from prev-node");
|
console.warn("Invalid task type_:" + task.type_ + "from prev-node");
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -315,18 +303,15 @@ function processTaskFromSupernode(packet, ws) {
|
|||||||
} = packet_.parse(packet);
|
} = packet_.parse(packet);
|
||||||
if (message) {
|
if (message) {
|
||||||
message.forEach(task => {
|
message.forEach(task => {
|
||||||
switch (task.type) {
|
switch (task.type_) {
|
||||||
case BACKUP_HANDSHAKE_INIT:
|
case TYPE_.BACKUP_HANDSHAKE_INIT:
|
||||||
handshakeMid(from, ws);
|
handshakeMid(from, ws); break;
|
||||||
break;
|
case TYPE_.STORE_MIGRATED_DATA:
|
||||||
case STORE_MIGRATED_DATA:
|
storeMigratedData(task.data); break;
|
||||||
storeMigratedData(task.data);
|
case TYPE_.INITIATE_REFRESH:
|
||||||
break;
|
initiateRefresh(); break;
|
||||||
case INITIATE_REFRESH:
|
|
||||||
initiateRefresh();
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
console.warn("Invalid task type:" + task.type + "from super-node");
|
console.warn("Invalid task type_:" + task.type_ + "from super-node");
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -340,18 +325,18 @@ function handshakeMid(id, ws) {
|
|||||||
if (cloud.innerNodes(_prevNode.id, keys.node_id).includes(id)) {
|
if (cloud.innerNodes(_prevNode.id, keys.node_id).includes(id)) {
|
||||||
//close existing prev-node connection
|
//close existing prev-node connection
|
||||||
_prevNode.send(packet_.construct({
|
_prevNode.send(packet_.construct({
|
||||||
type: RECONNECT_NEXT_NODE
|
type_: TYPE_.RECONNECT_NEXT_NODE
|
||||||
}));
|
}));
|
||||||
_prevNode.close();
|
_prevNode.close();
|
||||||
//set the new prev-node connection
|
//set the new prev-node connection
|
||||||
_prevNode.set(id, ws);
|
_prevNode.set(id, ws);
|
||||||
_prevNode.send(packet_.construct({
|
_prevNode.send(packet_.construct({
|
||||||
type: BACKUP_HANDSHAKE_END
|
type_: TYPE_.BACKUP_HANDSHAKE_END
|
||||||
}));
|
}));
|
||||||
} else {
|
} else {
|
||||||
//Incorrect order, existing prev-node is already after the incoming node
|
//Incorrect order, existing prev-node is already after the incoming node
|
||||||
ws.send(packet_.construct({
|
ws.send(packet_.construct({
|
||||||
type: RECONNECT_NEXT_NODE
|
type_: TYPE_.RECONNECT_NEXT_NODE
|
||||||
}));
|
}));
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
@ -359,16 +344,16 @@ function handshakeMid(id, ws) {
|
|||||||
//set the new prev-node connection
|
//set the new prev-node connection
|
||||||
_prevNode.set(id, ws);
|
_prevNode.set(id, ws);
|
||||||
_prevNode.send(packet_.construct({
|
_prevNode.send(packet_.construct({
|
||||||
type: BACKUP_HANDSHAKE_END
|
type_: TYPE_.BACKUP_HANDSHAKE_END
|
||||||
}));
|
}));
|
||||||
};
|
};
|
||||||
if (!_nextNode.id)
|
if (!_nextNode.id)
|
||||||
reconnectNextNode();
|
reconnectNextNode();
|
||||||
//Reorder storelist
|
//Reorder storelist
|
||||||
let nodes = cloud.innerNodes(_prevNode.id, keys.node_id).concat(keys.node_id),
|
let nodes_inbtw = cloud.innerNodes(_prevNode.id, keys.node_id).concat(keys.node_id),
|
||||||
req_sync = [],
|
req_sync = [],
|
||||||
new_order = [];
|
new_order = [];
|
||||||
nodes.forEach(n => {
|
nodes_inbtw.forEach(n => {
|
||||||
switch (_list[n]) {
|
switch (_list[n]) {
|
||||||
case 0: //Do nothing
|
case 0: //Do nothing
|
||||||
break;
|
break;
|
||||||
@ -379,57 +364,22 @@ function handshakeMid(id, ws) {
|
|||||||
new_order.push(n);
|
new_order.push(n);
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
if (!req_sync.length && !new_order.length)
|
if (req_sync.length)
|
||||||
return; //No order change and no need for any data sync
|
req_sync.forEach(node_i => sync.requestDataSync(node_i, _nextNode));
|
||||||
else
|
if (new_order.length) {
|
||||||
handshakeMid.requestData(req_sync, new_order);
|
let synced_list = new_order.filter(n => !req_sync.includes(n)); //send order only for synced disks
|
||||||
};
|
_nextNode.send(packet_.construct({
|
||||||
|
type_: TYPE_.ORDER_BACKUP,
|
||||||
handshakeMid.requestData = function (req_sync, new_order) {
|
order: _list.get(synced_list)
|
||||||
if (handshakeMid.timeout) {
|
}));
|
||||||
clearTimeout(handshakeMid.timeout);
|
}
|
||||||
delete handshakeMid.timeout;
|
|
||||||
};
|
|
||||||
Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => {
|
|
||||||
let tasks = [],
|
|
||||||
lastlogs = {},
|
|
||||||
failed = [],
|
|
||||||
order = [],
|
|
||||||
failed_order = [];
|
|
||||||
|
|
||||||
req_sync.forEach((s, i) => {
|
|
||||||
if (result[i].status === "fulfilled")
|
|
||||||
lastlogs[s] = result[i].value;
|
|
||||||
else
|
|
||||||
failed.push(s);
|
|
||||||
});
|
|
||||||
if (Object.keys(lastlogs).length)
|
|
||||||
tasks.push({
|
|
||||||
type: DATA_REQUEST,
|
|
||||||
nodes: lastlogs
|
|
||||||
});
|
|
||||||
new_order.forEach(n => {
|
|
||||||
if (failed.includes(n))
|
|
||||||
failed_order.push(n);
|
|
||||||
else
|
|
||||||
order.push(n);
|
|
||||||
});
|
|
||||||
if (order.length)
|
|
||||||
tasks.push({
|
|
||||||
type: ORDER_BACKUP,
|
|
||||||
order: _list.get(order)
|
|
||||||
});
|
|
||||||
_nextNode.send(packet_.construct(tasks));
|
|
||||||
if (failed.length)
|
|
||||||
handshakeMid.timeout = setTimeout(_ => handshakeMid.requestData(failed, failed_order), RETRY_TIMEOUT);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
//Complete handshake
|
//Complete handshake
|
||||||
function handshakeEnd() {
|
function handshakeEnd() {
|
||||||
console.log("Backup connected: " + _nextNode.id);
|
console.log("Backup connected: " + _nextNode.id);
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: ORDER_BACKUP,
|
type_: TYPE_.ORDER_BACKUP,
|
||||||
order: _list.get()
|
order: _list.get()
|
||||||
}));
|
}));
|
||||||
};
|
};
|
||||||
@ -472,76 +422,15 @@ function orderBackup(order) {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
if (!req_sync.length && !new_order.length)
|
if (req_sync.length)
|
||||||
return; //No order change and no need for any data sync
|
req_sync.forEach(node_i => sync.requestDataSync(node_i, _prevNode));
|
||||||
else
|
if (new_order.length) {
|
||||||
orderBackup.requestData(req_sync, new_order);
|
let synced_list = new_order.filter(n => !req_sync.includes(n)); //send order only for synced disks
|
||||||
};
|
_nextNode.send(packet_.construct({
|
||||||
|
type_: TYPE_.ORDER_BACKUP,
|
||||||
orderBackup.requestData = function (req_sync, new_order) {
|
order: _list.get(synced_list)
|
||||||
Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => {
|
}));
|
||||||
let
|
}
|
||||||
lastlogs = {},
|
|
||||||
failed = [],
|
|
||||||
order = [],
|
|
||||||
failed_order = [];
|
|
||||||
|
|
||||||
req_sync.forEach((s, i) => {
|
|
||||||
if (result[i].status === "fulfilled")
|
|
||||||
lastlogs[s] = result[i].value;
|
|
||||||
else
|
|
||||||
failed.push(s);
|
|
||||||
});
|
|
||||||
if (Object.keys(lastlogs).length)
|
|
||||||
_prevNode.send(packet_.construct({
|
|
||||||
type: DATA_REQUEST,
|
|
||||||
nodes: lastlogs
|
|
||||||
}));
|
|
||||||
new_order.forEach(n => {
|
|
||||||
if (failed.includes(n))
|
|
||||||
failed_order.push(n);
|
|
||||||
else
|
|
||||||
order.push(n);
|
|
||||||
});
|
|
||||||
if (order.length) //TODO: maybe should wait for sync to finish?
|
|
||||||
_nextNode.send(packet_.construct({
|
|
||||||
type: ORDER_BACKUP,
|
|
||||||
order: _list.get(order)
|
|
||||||
}));
|
|
||||||
if (failed.length)
|
|
||||||
setTimeout(_ => orderBackup.requestData(failed, failed_order), RETRY_TIMEOUT);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
//Send stored data
|
|
||||||
function sendStoredData(lastlogs, node) {
|
|
||||||
for (let n in lastlogs) {
|
|
||||||
if (_list.stored.includes(n)) {
|
|
||||||
node.send(packet_.construct({
|
|
||||||
type: DATA_SYNC,
|
|
||||||
id: n,
|
|
||||||
status: true
|
|
||||||
}));
|
|
||||||
console.info(`START: ${n} data sync(send) to ${node.id}`);
|
|
||||||
DB.readAllDataStream(n, lastlogs[n], d => node.send(packet_.construct({
|
|
||||||
type: STORE_BACKUP_DATA,
|
|
||||||
data: d
|
|
||||||
}))).then(result => console.info(`END: ${n} data sync(send) to ${node.id} (${result} records)`))
|
|
||||||
.catch(error => {
|
|
||||||
console.info(`ERROR: ${n} data sync(send) to ${node.id}`)
|
|
||||||
console.error(error);
|
|
||||||
}).finally(_ => node.send(packet_.construct({
|
|
||||||
type: DATA_SYNC,
|
|
||||||
id: n,
|
|
||||||
status: false
|
|
||||||
})));
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
//Indicate sync of data
|
|
||||||
function dataSyncIndication(snID, status, from) {
|
|
||||||
console.info(`${status ? 'START' : 'END'}: ${snID} data sync(receive) form ${from}`);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
//Store (backup) data
|
//Store (backup) data
|
||||||
@ -580,7 +469,7 @@ function storeMigratedData(data) {
|
|||||||
if (_list.serving.includes(closestNode)) {
|
if (_list.serving.includes(closestNode)) {
|
||||||
DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e));
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: STORE_BACKUP_DATA,
|
type_: TYPE_.STORE_BACKUP_DATA,
|
||||||
data: data
|
data: data
|
||||||
}));
|
}));
|
||||||
};
|
};
|
||||||
@ -603,23 +492,23 @@ function initiateRefresh() {
|
|||||||
//Forward incoming to next node
|
//Forward incoming to next node
|
||||||
function forwardToNextNode(mode, data) {
|
function forwardToNextNode(mode, data) {
|
||||||
var modeMap = {
|
var modeMap = {
|
||||||
'TAG': TAG_BACKUP_DATA,
|
'TAG': TYPE_.TAG_BACKUP_DATA,
|
||||||
'NOTE': NOTE_BACKUP_DATA,
|
'NOTE': TYPE_.NOTE_BACKUP_DATA,
|
||||||
'DATA': STORE_BACKUP_DATA
|
'DATA': TYPE_.STORE_BACKUP_DATA
|
||||||
};
|
};
|
||||||
if (mode in modeMap && _nextNode.id)
|
if (mode in modeMap && _nextNode.id)
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: modeMap[mode],
|
type_: modeMap[mode],
|
||||||
data: data
|
data: data
|
||||||
}));
|
}));
|
||||||
};
|
};
|
||||||
|
|
||||||
//Data migration processor
|
//Data migration processor
|
||||||
function dataMigration(node_change, flag) {
|
function dataMigration(node_change, flag) {
|
||||||
|
if (flag) dataMigration.intimateAllNodes(); //Initmate All nodes to call refresher
|
||||||
if (!Object.keys(node_change).length)
|
if (!Object.keys(node_change).length)
|
||||||
return;
|
return;
|
||||||
console.log("Node list changed! Data migration required", node_change);
|
console.log("Node list changed! Data migration required", node_change);
|
||||||
if (flag) dataMigration.intimateAllNodes(); //Initmate All nodes to call refresher
|
|
||||||
let new_nodes = [],
|
let new_nodes = [],
|
||||||
del_nodes = [];
|
del_nodes = [];
|
||||||
for (let n in node_change)
|
for (let n in node_change)
|
||||||
@ -670,12 +559,12 @@ dataMigration.process_del = async function (del_nodes, old_kb) {
|
|||||||
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
||||||
if (_nextNode.id)
|
if (_nextNode.id)
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: STORE_BACKUP_DATA,
|
type_: TYPE_.STORE_BACKUP_DATA,
|
||||||
data: d
|
data: d
|
||||||
}));
|
}));
|
||||||
} else
|
} else
|
||||||
ws_connections[closest].send(packet_.construct({
|
ws_connections[closest].send(packet_.construct({
|
||||||
type: STORE_MIGRATED_DATA,
|
type_: TYPE_.STORE_MIGRATED_DATA,
|
||||||
data: d
|
data: d
|
||||||
}));
|
}));
|
||||||
}).then(result => {
|
}).then(result => {
|
||||||
@ -721,18 +610,18 @@ dataMigration.process_new = async function (new_nodes) {
|
|||||||
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
|
||||||
if (_nextNode.id)
|
if (_nextNode.id)
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: STORE_BACKUP_DATA,
|
type_: TYPE_.STORE_BACKUP_DATA,
|
||||||
data: d
|
data: d
|
||||||
}));
|
}));
|
||||||
} else
|
} else
|
||||||
ws_connections[closest].send(packet_.construct({
|
ws_connections[closest].send(packet_.construct({
|
||||||
type: STORE_MIGRATED_DATA,
|
type_: TYPE_.STORE_MIGRATED_DATA,
|
||||||
data: d
|
data: d
|
||||||
}));
|
}));
|
||||||
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
|
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
|
||||||
if (_nextNode.id)
|
if (_nextNode.id)
|
||||||
_nextNode.send(packet_.construct({
|
_nextNode.send(packet_.construct({
|
||||||
type: DELETE_MIGRATED_DATA,
|
type_: TYPE_.DELETE_MIGRATED_DATA,
|
||||||
data: {
|
data: {
|
||||||
vectorClock: d.vectorClock,
|
vectorClock: d.vectorClock,
|
||||||
receiverID: d.receiverID,
|
receiverID: d.receiverID,
|
||||||
@ -760,7 +649,7 @@ dataMigration.process_new = async function (new_nodes) {
|
|||||||
dataMigration.intimateAllNodes = function () {
|
dataMigration.intimateAllNodes = function () {
|
||||||
connectToAliveNodes().then(ws_connections => {
|
connectToAliveNodes().then(ws_connections => {
|
||||||
let packet = packet_.construct({
|
let packet = packet_.construct({
|
||||||
type: INITIATE_REFRESH
|
type_: TYPE_.INITIATE_REFRESH
|
||||||
});
|
});
|
||||||
for (let n in ws_connections)
|
for (let n in ws_connections)
|
||||||
if (ws_connections[n]) {
|
if (ws_connections[n]) {
|
||||||
@ -783,9 +672,9 @@ module.exports = {
|
|||||||
logInterval,
|
logInterval,
|
||||||
SUPERNODE_INDICATOR,
|
SUPERNODE_INDICATOR,
|
||||||
_list,
|
_list,
|
||||||
set DB(db) {
|
_nextNode,
|
||||||
DB = db;
|
_prevNode,
|
||||||
},
|
packet_,
|
||||||
set refresher(r) {
|
set refresher(r) {
|
||||||
refresher = r;
|
refresher = r;
|
||||||
}
|
}
|
||||||
27
src/backup/message_types.json
Normal file
27
src/backup/message_types.json
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
{
|
||||||
|
"REQ_HASH": "req_hash",
|
||||||
|
"RES_HASH": "block_hash",
|
||||||
|
"REQ_BLOCK": "req_block",
|
||||||
|
"VAL_LAST_BLK": "set_last_block",
|
||||||
|
"INDICATE_BLK": "indicate_block",
|
||||||
|
"STORE_BACKUP_DATA": "store_data",
|
||||||
|
|
||||||
|
"ORDER_BACKUP": "orderBackup",
|
||||||
|
|
||||||
|
"STORE_MIGRATED_DATA": "migratedData",
|
||||||
|
"DELETE_MIGRATED_DATA": "migratedDelete",
|
||||||
|
|
||||||
|
"DELETE_BACKUP_DATA": "backupDelete___NOT_USED",
|
||||||
|
"TAG_BACKUP_DATA": "backupTag",
|
||||||
|
"NOTE_BACKUP_DATA": "backupNote",
|
||||||
|
"EDIT_BACKUP_DATA": "backupEdit___NOT_USED",
|
||||||
|
|
||||||
|
"INITIATE_REFRESH": "initiateRefresh",
|
||||||
|
|
||||||
|
"DATA_REQUEST": "dataRequest",
|
||||||
|
"DATA_SYNC": "dataSync",
|
||||||
|
|
||||||
|
"BACKUP_HANDSHAKE_INIT": "handshakeInitate",
|
||||||
|
"BACKUP_HANDSHAKE_END": "handshakeEnd",
|
||||||
|
"RECONNECT_NEXT_NODE": "reconnectNextNode"
|
||||||
|
}
|
||||||
190
src/backup/sync.js
Normal file
190
src/backup/sync.js
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
const Database = require("../database");
|
||||||
|
const DB = Database.DB;
|
||||||
|
const floGlobals = require("../floGlobals");
|
||||||
|
const { H_struct } = require("../data_structure.json");
|
||||||
|
const { packet_, _nextNode, _list } = require("./intra");
|
||||||
|
const TYPE = require('./message_types.json');
|
||||||
|
|
||||||
|
const queueSync = {
|
||||||
|
list: {},
|
||||||
|
|
||||||
|
init(node_i, ws) {
|
||||||
|
if (node_i in this.list)
|
||||||
|
return console.debug("Another sync is in process for ", node_i);
|
||||||
|
console.init(`START: Data sync for ${node_i}`)
|
||||||
|
this.list[node_i] = { ws, ts: Date.now(), q: [], cur: [], hashes: {} };
|
||||||
|
DB.createTable(node_i)
|
||||||
|
.then(result => ws.send(packet_.construct({ type_: TYPE.REQ_HASH, node_i })))
|
||||||
|
.catch(error => console.error(error));
|
||||||
|
},
|
||||||
|
|
||||||
|
add_block(node_i, block_n, hash) {
|
||||||
|
if (!(node_i in this.list))
|
||||||
|
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
||||||
|
let r = this.list[node_i];
|
||||||
|
r.hashes[block_n] = hash;
|
||||||
|
if (r.cur.length) //atleast a block is already in syncing process
|
||||||
|
r.q.push(block_n);
|
||||||
|
else {
|
||||||
|
console.debug(`Queue-Sync: Started block sync for ${node_i}#${block_n}`);
|
||||||
|
r.cur.push(block_n);
|
||||||
|
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n }))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
end_block(node_i, block_n) {
|
||||||
|
if (!(node_i in this.list))
|
||||||
|
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
||||||
|
|
||||||
|
let r = this.list[node_i];
|
||||||
|
let p = r.cur.indexOf(block_n);
|
||||||
|
if (p == -1)
|
||||||
|
return console.warn(`Queue-Sync: Block ${block_n} not currently syncing`);
|
||||||
|
r.cur.splice(p, 1);
|
||||||
|
console.debug(`Queue-Sync: Finished block sync for ${node_i}#${block_n}`);
|
||||||
|
|
||||||
|
if (r.last_block && block_n === r.last_block)
|
||||||
|
r.last_block_done = true;
|
||||||
|
|
||||||
|
if (!r.cur.length) {
|
||||||
|
if (r.q.length) {
|
||||||
|
//request next block
|
||||||
|
let n = r.q.shift();
|
||||||
|
console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`);
|
||||||
|
r.cur.push(n);
|
||||||
|
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n }))
|
||||||
|
} else if (r.last_block_done) {
|
||||||
|
//last block is synced and queue is empty. close the sync
|
||||||
|
delete this.list[node_i];
|
||||||
|
//indicate next node for ordering
|
||||||
|
_nextNode.send(packet_.construct({
|
||||||
|
type_: TYPE_.ORDER_BACKUP,
|
||||||
|
order: _list.get([node_i])
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
last_block(node_i, block_n) {
|
||||||
|
if (!(node_i in this.list))
|
||||||
|
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
||||||
|
this.list[node_i].last_block = block_n;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const _ = {
|
||||||
|
get block_calc_sql() {
|
||||||
|
return `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n}))`
|
||||||
|
},
|
||||||
|
t_name(node_i) {
|
||||||
|
return '_' + node_i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getColumns(t_name) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
Database.query("SHOW COLUMNS FROM " + t_name)
|
||||||
|
.then(result => resolve(result.map(r => r["Field"]).sort()))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
//R: request hashes for node_i
|
||||||
|
function requestDataSync(node_i, ws) {
|
||||||
|
queueSync.init(node_i, ws);
|
||||||
|
}
|
||||||
|
|
||||||
|
//S: send hashes for node_i
|
||||||
|
function sendBlockHashes(node_i, ws) {
|
||||||
|
let t_name = _.t_name(node_i);
|
||||||
|
getColumns(t_name).then(columns => {
|
||||||
|
let statement = `SELECT ${_.block_calc_sql} AS block_n,`
|
||||||
|
+ `MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash`
|
||||||
|
+ `FROM ${t_name} GROUP BY block_n ORDER BY block_n`;
|
||||||
|
let last_block;
|
||||||
|
Database.query_stream(statement, r => {
|
||||||
|
ws.send(packet_.construct({
|
||||||
|
type_: TYPE.RES_HASH,
|
||||||
|
node_i: node_i,
|
||||||
|
block_n: r.block_n,
|
||||||
|
hash: r.hash
|
||||||
|
}));
|
||||||
|
last_block = r.block_n;
|
||||||
|
}).then(result => {
|
||||||
|
console.debug(`Send ${result} block hashes`);
|
||||||
|
ws.send(packet_.construct({
|
||||||
|
type_: TYPE.VAL_LAST_BLK,
|
||||||
|
node_i: node_i,
|
||||||
|
block_n: last_block,
|
||||||
|
}));
|
||||||
|
}).catch(error => console.error(error))
|
||||||
|
}).catch(error => console.error(error));
|
||||||
|
}
|
||||||
|
|
||||||
|
//R: check hash and request data if hash mismatch
|
||||||
|
function checkBlockHash(node_i, block_n, hash) {
|
||||||
|
let t_name = _.t_name(node_i);
|
||||||
|
getColumns(t_name).then(columns => {
|
||||||
|
let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash`
|
||||||
|
+ `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`;
|
||||||
|
Database.query(statement, [block_n]).then(result => {
|
||||||
|
if (!result.length || result[0].hash != hash) {
|
||||||
|
//ReSync Block
|
||||||
|
let clear_statement = `DELETE FROM ${t_name} WHERE `
|
||||||
|
+ `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`;
|
||||||
|
Database.query(clear_statement, [block_n])
|
||||||
|
.then(_ => queueSync.add_block(node_i, block_n, hash))
|
||||||
|
.catch(error => console.error(error))
|
||||||
|
}
|
||||||
|
}).catch(error => console.error(error))
|
||||||
|
}).catch(error => console.error(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
//R: set last block number
|
||||||
|
function setLastBlock(node_i, block_n) {
|
||||||
|
queueSync.last_block(node_i, block_n);
|
||||||
|
}
|
||||||
|
|
||||||
|
//S: send data for block
|
||||||
|
function sendBlockData(node_i, block_n, ws) {
|
||||||
|
let t_name = _.t_name(node_i);
|
||||||
|
ws.send(packet_.construct({
|
||||||
|
type_: TYPE.INDICATE_BLK,
|
||||||
|
node_i, block_n,
|
||||||
|
status: true
|
||||||
|
}))
|
||||||
|
console.debug(`START: Sync-send ${node_i}#${block_n} to ${ws.id}`);
|
||||||
|
let statement = `SELECT * FROM ${t_name} WHERE ${_.block_calc_sql} = ?`;
|
||||||
|
Database.query_stream(statement, [block_n], d => ws.send(packet_.construct({
|
||||||
|
type_: TYPE.STORE_BACKUP_DATA,
|
||||||
|
data: d
|
||||||
|
}))).then(result => console.debug(`END: Sync-send ${node_i}#${block_n} to ${ws.id} (${result} records)`))
|
||||||
|
.catch(error => {
|
||||||
|
console.debug(`ERROR: Sync-send ${node_i}#${block_n} to ${ws.id}`)
|
||||||
|
console.error(error);
|
||||||
|
}).finally(_ => ws.send(packet_.construct({
|
||||||
|
type_: TYPE.INDICATE_BLK,
|
||||||
|
node_i, block_n,
|
||||||
|
status: false
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
//R: end block
|
||||||
|
function endBlockSync(node_i, block_n) {
|
||||||
|
queueSync.end_block(node_i, block_n);
|
||||||
|
}
|
||||||
|
|
||||||
|
function syncIndicator(node_i, block_n, status, from) {
|
||||||
|
console.debug(`${status ? 'START' : 'END'}: Sync-receive ${node_i}#${block_n} from ${from}`);
|
||||||
|
if (!status)
|
||||||
|
endBlockSync(node_i, block_n);
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
requestDataSync,
|
||||||
|
sendBlockHashes,
|
||||||
|
checkBlockHash,
|
||||||
|
setLastBlock,
|
||||||
|
sendBlockData,
|
||||||
|
syncIndicator
|
||||||
|
}
|
||||||
@ -1,5 +1,5 @@
|
|||||||
const { DB } = require("./database");
|
const { DB } = require("./database");
|
||||||
const { _list } = require("./intra");
|
const { _list } = require("./backup/intra");
|
||||||
|
|
||||||
global.INVALID = function (message) {
|
global.INVALID = function (message) {
|
||||||
if (!(this instanceof INVALID))
|
if (!(this instanceof INVALID))
|
||||||
|
|||||||
@ -5,7 +5,7 @@ global.cloud = require('./cloud');
|
|||||||
global.floCrypto = require('./floCrypto');
|
global.floCrypto = require('./floCrypto');
|
||||||
global.floBlockchainAPI = require('./floBlockchainAPI');
|
global.floBlockchainAPI = require('./floBlockchainAPI');
|
||||||
const Database = require("./database");
|
const Database = require("./database");
|
||||||
const intra = require('./intra');
|
const intra = require('./backup/intra');
|
||||||
const Server = require('./server');
|
const Server = require('./server');
|
||||||
const keys = require("./keys");
|
const keys = require("./keys");
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
const http = require('http');
|
const http = require('http');
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
const url = require('url');
|
const url = require('url');
|
||||||
const intra = require('./intra');
|
const intra = require('./backup/intra');
|
||||||
const client = require('./client');
|
const client = require('./client');
|
||||||
|
|
||||||
const INVALID_E_CODE = 400,
|
const INVALID_E_CODE = 400,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user