Compare commits

...

2 commits

Author SHA1 Message Date
06b289d7a3 Initial commit - testing version 2025-08-08 16:53:33 +02:00
cf16481324 version 2025-08-08; refactoring 2025-08-08 16:29:39 +02:00
11 changed files with 6636 additions and 3818 deletions

View file

@ -9,10 +9,10 @@ exports.output = 2;
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" }; exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
exports.html = `<div class="padding"> exports.html = `<div class="padding">
<div class="row"> <div class="row">
<div class="col-md-6"> <div class="col-md-6">
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div> <div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
</div> </div>
<div class="col-md-6"> <div class="col-md-6">
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div> <div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
</div> </div>
@ -27,16 +27,14 @@ exports.html = `<div class="padding">
</div> </div>
</div>`; </div>`;
const { promisifyBuilder } = require('./helper/db_helper'); const { promisifyBuilder } = require('./helper/db_helper');
const fs = require('fs'); const fs = require('fs');
const mqtt = require('mqtt'); const mqtt = require('mqtt');
const nosql = NOSQL('tbdatacloud'); const nosql = NOSQL('tbdatacloud');
const SEND_TO = { const SEND_TO = {
debug: 0, debug: 0,
rpcCall: 1, rpcCall: 1,
} }
//CONFIG //CONFIG
@ -56,319 +54,295 @@ let lastRestoreTime = 0;
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable // if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
let sendClientError = true; let sendClientError = true;
exports.install = function(instance) { exports.install = function(instance) {
var client; var client;
var opts; var opts;
var clientReady = false; var clientReady = false;
let o = null; //options let o = null; //options
function main() function main() {
{ loadSettings();
loadSettings(); }
}
//set opts according to db settings
//set opts according to db settings function loadSettings() {
function loadSettings()
{ o = instance.options;
if (!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic;
o = instance.options;
if(!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic; opts = {
host: o.host,
opts = { port: o.port,
host: o.host, clientId: o.clientid,
port: o.port, username: o.username,
clientId: o.clientid, rejectUnauthorized: false,
username: o.username, resubscribe: false
rejectUnauthorized: false, };
resubscribe: false
}; console.log("wsmqttpublich -> loadSettings from instance.options", o);
console.log("wsmqttpublich -> loadSettings from instance.options",o); connectToTbServer();
}
connectToTbServer();
} function connectToTbServer() {
var url = "mqtt://" + opts.host + ":" + opts.port;
function connectToTbServer() console.log("MQTT URL: ", url);
{
var url = "mqtt://" + opts.host + ":" + opts.port; client = mqtt.connect(url, opts);
console.log("MQTT URL: ", url);
client.on('connect', function() {
client = mqtt.connect(url, opts); client.subscribe(`${o.topic}_backward`, (err) => {
if (!err) {
client.on('connect', function() { console.log("MQTT subscribed");
client.subscribe(`${o.topic}_backward`, (err) => { }
if (!err) { });
console.log("MQTT subscribed"); instance.status("Connected", "green");
} clientReady = true;
}); sendClientError = true;
instance.status("Connected", "green"); });
clientReady = true;
sendClientError = true; client.on('reconnect', function() {
}); instance.status("Reconnecting", "yellow");
clientReady = false;
client.on('reconnect', function() { });
instance.status("Reconnecting", "yellow");
clientReady = false; client.on('message', function(topic, message) {
}); // message is type of buffer
message = message.toString();
client.on('message', function(topic, message) { if (message[0] === '{') {
// message is type of buffer
message = message.toString();
if (message[0] === '{') { try {
TRY(function() { message = JSON.parse(message);
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
message = JSON.parse(message); client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, {qos:1}); }
instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}}); } catch (e) { instance.debug('MQTT: Error parsing data', message) }
}
instance.send(SEND_TO.rpcCall, { "topic": o.topic, "content": message });
}, () => instance.debug('MQTT: Error parsing data', message)); }
} });
instance.send(SEND_TO.rpcCall, {"topic":o.topic, "content":message }); client.on('close', function() {
}); clientReady = false;
client.on('close', function() { instance.status("Disconnected", "red");
clientReady = false; instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
});
instance.status("Disconnected", "red");
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !"});
});
client.on('error', function(err) {
instance.status("Err: "+ err.code, "red");
instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
if(sendClientError) {
console.log('MQTT client error', err);
sendClientError = false;
}
clientReady = false;
});
}
instance.on('0', function(data) {
if(clientReady)
{
//do we have some data in backup file? if any, process data from database
if(saveTelemetryOnError)
{
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
}
let stringifiedJson = JSON.stringify(data.data)
client.publish(`${o.topic}_forward`, stringifiedJson, {qos: 1});
}
else
{
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
//write to tb
data.data.id = UID();
nosql.insert(data.data);
}
}
});
instance.on("1", _ => {
main();
})
instance.close = function(done) {
if(clientReady){
client.end();
}
};
function getDbBackupFileCounter(type)
{
var files = fs.readdirSync(__dirname + "/../databases");
let counter = 0;
for(var i = 0; i < files.length; i++)
{
if(files[i] == "tbdatacloud.nosql") continue;
if(files[i].endsWith(".nosql"))
{
let pos = files[i].indexOf(".");
if(pos > -1)
{
let fileCounter = counter;
let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit);
if(isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if(type == "max")
{
if(fileCounter > counter)
{
counter = fileCounter;
}
}
else if(type == "min")
{
if(counter == 0) counter = fileCounter;
if(fileCounter < counter)
{
counter = fileCounter;
}
}
}
}
}
if(type == "max") counter++;
return counter;
}
const makeBackupFromDbFile = async () => {
if(!saveTelemetryOnError) return;
//to avoid large file: tbdata.nosql
//init value is 0!
if(insertNoSqlCounter > 0)
{
--insertNoSqlCounter;
return;
}
insertNoSqlCounter = 100;
let source = __dirname + "/../databases/tbdatacloud.nosql";
var stats = fs.statSync(source);
var fileSizeInBytes = stats.size;
if(fileSizeInBytes > noSqlFileSizeLimit)
{
let counter = 1;
counter = getDbBackupFileCounter("max");
let destination = __dirname + "/../databases/" + counter + "." + "tbdatacloud.nosql";
//make backup file
fs.copyFileSync(source, destination);
//fs.renameSync(p, p + "." + counter);
//clear tbdata.nosql
fs.writeFileSync(source, "");
fs.truncateSync(source, 0);
}
}
const processDataFromDatabase = async () => {
if(restore_from_backup <= 0) return;
//calculate diff
const now = new Date();
let currentTime = now.getTime();
let diff = currentTime - lastRestoreTime;
if( (diff / 1000) < restore_backup_wait)
{
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
return;
}
processingData = true;
//get filename to process
let counter = getDbBackupFileCounter("min");
//we have some backup files
let dataBase = 'tbdata';
var nosql;
if(counter == 0) dataBase = 'tbdatacloud';
else dataBase = counter + "." + 'tbdatacloud';
nosql = NOSQL(dataBase);
//select all data - use limit restore_from_backup
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
for(let i = 0; i < records.length; i++)
{
if(clientReady) {
let item = records[i];
let id = item.id;
if(id !== undefined)
{
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
try {
let message = JSON.parse(JSON.stringify(item)); client.on('error', function(err) {
delete message.id; instance.status("Err: " + err.code, "red");
client.publish(`${o.topic}_forward`, JSON.stringify(message), {qos:1}); instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
if (sendClientError) {
//remove from database console.log('MQTT client error', err);
await promisifyBuilder(nosql.remove().where("id", id)); sendClientError = false;
}
clientReady = false;
});
} catch(error) { }
//process error
console.log("processDataFromDatabase", error);
}
}
}
else
{
processingData = false;
return;
}
}
if(records.length > 0) instance.on('0', function(data) {
{
//clean backup file
if(counter > 0) nosql.clean();
}
//no data in db, remove if (clientReady) {
if(records.length == 0) //do we have some data in backup file? if any, process data from database
{ if (saveTelemetryOnError) {
if(counter > 0) nosql.drop(); //read telemetry data and send back to server
} if (!processingData) processDataFromDatabase();
}
const d = new Date();
lastRestoreTime = d.getTime(); let stringifiedJson = JSON.stringify(data.data)
client.publish(`${o.topic}_forward`, stringifiedJson, { qos: 1 });
processingData = false; }
else {
} //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
instance.on('options', main);
if (saveTelemetryOnError) {
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
//write to tb
data.data.id = UID();
nosql.insert(data.data);
}
}
});
instance.on("1", _ => {
main();
})
instance.close = function(done) {
if (clientReady) {
client.end();
}
};
function getDbBackupFileCounter(type) {
var files = fs.readdirSync(__dirname + "/../databases");
let counter = 0;
for (var i = 0; i < files.length; i++) {
if (files[i] == "tbdatacloud.nosql") continue;
if (files[i].endsWith(".nosql")) {
let pos = files[i].indexOf(".");
if (pos > -1) {
let fileCounter = counter;
let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit);
if (isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if (type == "max") {
if (fileCounter > counter) {
counter = fileCounter;
}
}
else if (type == "min") {
if (counter == 0) counter = fileCounter;
if (fileCounter < counter) {
counter = fileCounter;
}
}
}
}
}
if (type == "max") counter++;
return counter;
}
const makeBackupFromDbFile = async () => {
if (!saveTelemetryOnError) return;
//to avoid large file: tbdata.nosql
//init value is 0!
if (insertNoSqlCounter > 0) {
--insertNoSqlCounter;
return;
}
insertNoSqlCounter = 100;
let source = __dirname + "/../databases/tbdatacloud.nosql";
var stats = fs.statSync(source);
var fileSizeInBytes = stats.size;
if (fileSizeInBytes > noSqlFileSizeLimit) {
let counter = 1;
counter = getDbBackupFileCounter("max");
let destination = __dirname + "/../databases/" + counter + "." + "tbdatacloud.nosql";
//make backup file
fs.copyFileSync(source, destination);
//fs.renameSync(p, p + "." + counter);
//clear tbdata.nosql
fs.writeFileSync(source, "");
fs.truncateSync(source, 0);
}
}
const processDataFromDatabase = async () => {
if (restore_from_backup <= 0) return;
//calculate diff
const now = new Date();
let currentTime = now.getTime();
let diff = currentTime - lastRestoreTime;
if ((diff / 1000) < restore_backup_wait) {
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
return;
}
processingData = true;
//get filename to process
let counter = getDbBackupFileCounter("min");
//we have some backup files
let dataBase = 'tbdata';
var nosql;
if (counter == 0) dataBase = 'tbdatacloud';
else dataBase = counter + "." + 'tbdatacloud';
nosql = NOSQL(dataBase);
//select all data - use limit restore_from_backup
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
for (let i = 0; i < records.length; i++) {
if (clientReady) {
let item = records[i];
let id = item.id;
if (id !== undefined) {
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
try {
let message = JSON.parse(JSON.stringify(item));
delete message.id;
client.publish(`${o.topic}_forward`, JSON.stringify(message), { qos: 1 });
//remove from database
await promisifyBuilder(nosql.remove().where("id", id));
} catch (error) {
//process error
console.log("processDataFromDatabase", error);
}
}
}
else {
processingData = false;
return;
}
}
if (records.length > 0) {
//clean backup file
if (counter > 0) nosql.clean();
}
//no data in db, remove
if (records.length == 0) {
if (counter > 0) nosql.drop();
}
const d = new Date();
lastRestoreTime = d.getTime();
processingData = false;
}
instance.on('options', main);
}; };

File diff suppressed because it is too large Load diff

View file

@ -6,14 +6,33 @@ exports.version = '1.0.2';
exports.icon = 'sign-out'; exports.icon = 'sign-out';
exports.output = 2; exports.output = 2;
exports.html = `<div class="padding">
<div class="row">
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
</div>
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
</div>
</div>
<div class="row">
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
</div>
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
</div>
</div>
</div>`;
exports.readme = ` exports.readme = `
# DB initialization # DB initialization
`; `;
const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js'); const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
const { initNotification } = require('./helper/notification_reporter'); const { initNotification } = require('./helper/notification_reporter');
const errorHandler = require('./helper/ErrorToServiceHandler'); const errorHandler = require('./helper/ErrorToServiceHandler');
const total_energy = require('../databases/total_energy');
const SEND_TO = { const SEND_TO = {
db_init: 0, db_init: 0,
@ -22,6 +41,7 @@ const SEND_TO = {
exports.install = async function(instance) { exports.install = async function(instance) {
const dbNodes = TABLE("nodes"); const dbNodes = TABLE("nodes");
const dbRelays = TABLE("relays"); const dbRelays = TABLE("relays");
const dbSettings = TABLE("settings"); const dbSettings = TABLE("settings");
@ -50,7 +70,7 @@ exports.install = async function(instance) {
Object.keys(dbs.nodesData).forEach(node => dbs.nodesData[node].readout = {}) Object.keys(dbs.nodesData).forEach(node => dbs.nodesData[node].readout = {})
dbs.settings = { dbs.settings = {
edge_fw_version: "2025-07-08", //rok-mesiac-den edge_fw_version: "2025-04-24", //rok-mesiac-den
language: responseSettings[0]["lang"], language: responseSettings[0]["lang"],
rvo_name: responseSettings[0]["rvo_name"], rvo_name: responseSettings[0]["rvo_name"],
project_id: responseSettings[0]["project_id"], project_id: responseSettings[0]["project_id"],
@ -78,11 +98,6 @@ exports.install = async function(instance) {
maintenance_mode: false, maintenance_mode: false,
} }
let rvo_number = responseSettings[0]["rvo_name"].match(/\D+(\d{1,2})_/)[1];
dbs.settings.energy_to_switch_lamps = total_energy[rvo_number];
if (dbs.settings.energy_to_switch_lamps === undefined) console.log('=============== db_init.js: energy_to_switch_lamps is undefined');
FLOW.dbLoaded = true; FLOW.dbLoaded = true;
errorHandler.setProjectId(dbs.settings.project_id); errorHandler.setProjectId(dbs.settings.project_id);
initNotification(); initNotification();

File diff suppressed because it is too large Load diff

2775
flow/designer.json_orig Normal file

File diff suppressed because it is too large Load diff

View file

@ -364,7 +364,7 @@ exports.install = function(instance) {
data.map(item => { data.map(item => {
let value = item['value']; let value = item['value'];
let pin = item["dev"] + item["circuit"]; // for example "relay1_03" or "input1_01" let pin = item["dev"] + item["circuit"]; // for example "ro1_03" or "di1_01"
if (pin == undefined) return; if (pin == undefined) return;
switchLogic(pin, value); switchLogic(pin, value);
@ -516,9 +516,9 @@ exports.install = function(instance) {
} }
else if (ws) { else if (ws) {
//pin = "relay1_03" or "input1_01" ... we must make just "1_01" with slice method //pin = "ro1_03" or "di1_01" ... we must make just "1_01" with slice method
monitor.info(`Dido: turnLine ${onOrOff} - (line, pin, force)`, line, pin, force, info); monitor.info(`Dido: turnLine ${onOrOff} - (line, pin, force)`, line, pin, force, info);
let cmd = { "cmd": "set", "dev": "relay", "circuit": pin.slice(5), "value": value }; let cmd = { "cmd": "set", "dev": "relay", "circuit": pin.slice(2), "value": value };
ws.send(JSON.stringify(cmd)); ws.send(JSON.stringify(cmd));
} }
@ -754,9 +754,9 @@ exports.install = function(instance) {
pins = [4, 6]; pins = [4, 6];
} }
} else if (controllerType === "unipi") { } else if (controllerType === "unipi") {
pins = ["input1_01", "input1_04", "input1_05"]; pins = ["di1_01", "di1_04", "di1_05"];
if (hasMainSwitch === 1) { if (hasMainSwitch === 1) {
pins = ["input1_01", "input1_04"]; pins = ["di1_01", "di1_04"];
} }
} }
@ -781,7 +781,7 @@ exports.install = function(instance) {
for (const pinIndex of pinIndexes) { for (const pinIndex of pinIndexes) {
if (previousValues[pinIndex] === 0) { if (previousValues[pinIndex] === 0) {
if ((pinIndex === 6 || pinIndex === 'input1_01' || pinIndex === 'input1_05') && SETTINGS.maintenance_mode) continue; if ((pinIndex === 6 || pinIndex === 'di1_01' || pinIndex === 'di1_05') && SETTINGS.maintenance_mode) continue;
status = "NOK"; status = "NOK";
break; break;
} }
@ -798,7 +798,7 @@ exports.install = function(instance) {
// we pass array to function in case of rsPort ==> switchLogic([55,3,0,1]) ==> [[55,3,0,1]] // we pass array to function in case of rsPort ==> switchLogic([55,3,0,1]) ==> [[55,3,0,1]]
// we pass two values in case of websocket ==> switchLogic("relay1_03",1) ==> ["relay1_03",1] // we pass two values in case of websocket ==> switchLogic("ro1_03",1) ==> ["ro1_03",1]
const switchLogic = (...args) => { const switchLogic = (...args) => {
let values = {}; let values = {};
@ -849,18 +849,18 @@ exports.install = function(instance) {
else if (type == "rotary_switch_state") { else if (type == "rotary_switch_state") {
// combination of these two pins required to get result // combination of these two pins required to get result
let pin2, pin3; let pin2, pin3;
if (pinIndex == 2 || pinIndex == "input1_02") { if (pinIndex == 2 || pinIndex == "di1_02") {
pin2 = newPinValue; pin2 = newPinValue;
pin3 = previousValues[3] || previousValues["input1_03"]; pin3 = previousValues[3] || previousValues["di1_03"];
if (pin3 == undefined) { if (pin3 == undefined) {
previousValues[pinIndex] = newPinValue; previousValues[pinIndex] = newPinValue;
return; return;
} }
} }
else if (pinIndex == 3 || pinIndex == "input1_03") { else if (pinIndex == 3 || pinIndex == "di1_03") {
pin3 = newPinValue; pin3 = newPinValue;
pin2 = previousValues[2] || previousValues["input1_02"]; pin2 = previousValues[2] || previousValues["di1_02"];
if (pin2 == undefined) { if (pin2 == undefined) {
previousValues[pinIndex] = newPinValue; previousValues[pinIndex] = newPinValue;
@ -913,7 +913,7 @@ exports.install = function(instance) {
} }
//Dverovy kontakt - pin 6 //Dverovy kontakt - pin 6
//! Ak je rvo s dvoma dverovymi kontaktami, ked pride z evoku signal z input1_05, co bol predytm "state_of_main switch" handlujeme ho teraz ako 'door_condition' //! Ak je rvo s dvoma dverovymi kontaktami, ked pride z evoku signal z di1_05, co bol predytm "state_of_main switch" handlujeme ho teraz ako 'door_condition'
else if (type == "door_condition" || type === "state_of_main_switch") { else if (type == "door_condition" || type === "state_of_main_switch") {
newPinValue === 0 ? value = "open" : value = "closed"; newPinValue === 0 ? value = "open" : value = "closed";
@ -1369,60 +1369,60 @@ exports.install = function(instance) {
//! pins.table --> from UNIPI //! pins.table --> from UNIPI
// pin:string|type:string|line:number // pin:string|type:string|line:number
// *|input1_01|state_of_main_switch|0|........... // *|di1_01|state_of_main_switch|0|...........
// *|input1_02|rotary_switch_state|0|........... // *|di1_02|rotary_switch_state|0|...........
// *|input1_03|rotary_switch_state|0|........... // *|di1_03|rotary_switch_state|0|...........
// *|intut1_04|power_supply|0|........... // *|intut1_04|power_supply|0|...........
// *|input1_05|door_condition|0|........... // *|di1_05|door_condition|0|...........
// *|input1_06|state_of_breaker|1|........... // *|di1_06|state_of_breaker|1|...........
// *|input1_07|state_of_breaker|2|........... // *|di1_07|state_of_breaker|2|...........
// *|input1_08|state_of_breaker|3|........... // *|di1_08|state_of_breaker|3|...........
// *|relay1_02|state_of_contactor|1|........... // *|ro1_02|state_of_contactor|1|...........
// *|relay1_03|state_of_contactor|2|........... // *|ro1_03|state_of_contactor|2|...........
// *|relay1_04|state_of_contactor|3|........... // *|ro1_04|state_of_contactor|3|...........
// *|287D8776E0013CE9|temperature|0|........... // *|287D8776E0013CE9|temperature|0|...........
//! pins_data --> from UNIPI //! pins_data --> from UNIPI
// { // {
// input1_01: { // di1_01: {
// pin: 'input1_01', // pin: 'di1_01',
// type: 'door_condition', // type: 'door_condition',
// line: 0, // line: 0,
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8' // tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
// }, // },
// input1_02: { // di1_02: {
// pin: 'input1_02', // pin: 'di1_02',
// type: 'rotary_switch_state', // type: 'rotary_switch_state',
// line: 0, // line: 0,
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8' // tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
// }, // },
// input1_03: { // di1_03: {
// pin: 'input1_03', // pin: 'di1_03',
// type: 'rotary_switch_state', // type: 'rotary_switch_state',
// line: 0, // line: 0,
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8' // tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
// }, // },
// input1_04: { // di1_04: {
// pin: 'input1_04', // pin: 'di1_04',
// type: 'power_supply', // type: 'power_supply',
// line: 0, // line: 0,
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8' // tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
// }, // },
// input1_05: { // di1_05: {
// pin: 'input1_05', // pin: 'di1_05',
// type: 'state_of_main_switch', // type: 'state_of_main_switch',
// line: 0, // line: 0,
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8' // tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
// }, // },
// input1_06: { // di1_06: {
// pin: 'input1_06', // pin: 'di1_06',
// type: 'state_of_breaker', // type: 'state_of_breaker',
// line: 1, // line: 1,
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo' // tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'
// }, // },
// relay1_02: { // ro1_02: {
// pin: 'relay1_02', // pin: 'ro1_02',
// type: 'state_of_contactor', // type: 'state_of_contactor',
// line: 1, // line: 1,
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo' // tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'

View file

@ -1,186 +1,186 @@
class DataToTbHandler { class DataToTbHandler {
constructor(index) { constructor(index) {
this.index = index; this.index = index;
// time, after new value for the given key will be resend to tb (e.g. {status: "OK"}) // time, after new value for the given key will be resend to tb (e.g. {status: "OK"})
this.timeToHoldTbValue = 30 * 60; //30 minutes this.timeToHoldTbValue = 30 * 60; //30 minutes
this.previousValues = {}; this.previousValues = {};
this.debug = false; this.debug = false;
this.messageCounter = 0; this.messageCounter = 0;
this.itIsNodeReadout = false; this.itIsNodeReadout = false;
this.sender = ""; this.sender = "";
// if attribute change difference is less than limit value, we do not send to tb. // if attribute change difference is less than limit value, we do not send to tb.
this.attributeChangeLimit = { this.attributeChangeLimit = {
temperature: 0.5, temperature: 0.5,
Phase_1_voltage: 2, Phase_1_voltage: 2,
Phase_2_voltage: 2, Phase_2_voltage: 2,
Phase_3_voltage: 2, Phase_3_voltage: 2,
Phase_1_current: 0.1, Phase_1_current: 0.1,
Phase_2_current: 0.1, Phase_2_current: 0.1,
Phase_3_current: 0.1, Phase_3_current: 0.1,
Phase_1_power: 2, Phase_1_power: 2,
Phase_2_power: 2, Phase_2_power: 2,
Phase_3_power: 2, Phase_3_power: 2,
total_power: 2, total_power: 2,
total_energy: 1, total_energy: 1,
Phase_1_pow_factor: 0.1, Phase_1_pow_factor: 0.1,
Phase_2_pow_factor: 0.1, Phase_2_pow_factor: 0.1,
Phase_3_pow_factor: 0.1, Phase_3_pow_factor: 0.1,
power_factor: 0.1, power_factor: 0.1,
lifetime: 2, lifetime: 2,
voltage: 2, voltage: 2,
power: 2, power: 2,
frequency: 3, frequency: 3,
energy: 0.1, energy: 0.1,
current: 2, current: 2,
inclination_x: 10, inclination_x: 10,
inclination_y: 10, inclination_y: 10,
inclination_z: 10 inclination_z: 10
}; };
} }
dump() { dump() {
console.log("----------------------------"); console.log("----------------------------");
console.log("previousValues", this.previousValues); console.log("previousValues", this.previousValues);
console.log("----------------------------"); console.log("----------------------------");
} }
setSender(sender) { setSender(sender) {
this.sender = sender; this.sender = sender;
} }
isEmptyObject(obj) { isEmptyObject(obj) {
for (var _ in obj) { for (var _ in obj) {
return false; return false;
} }
return true; return true;
} }
sendToTb(data, instance) { sendToTb(data, instance) {
//not to modify data object, we do deep copy: //not to modify data object, we do deep copy:
let dataCopy = JSON.parse(JSON.stringify(data)); let dataCopy = JSON.parse(JSON.stringify(data));
let keys = Object.keys(dataCopy); let keys = Object.keys(dataCopy);
if (keys.length == 0) { if (keys.length == 0) {
if (this.debug) console.log("sendToTb received empty object", dataCopy); if (this.debug) console.log("sendToTb received empty object", dataCopy);
return; return;
} }
let tbname = keys[0]; let tbname = keys[0];
let ts; let ts;
let arrayOfValues = dataCopy[tbname]; let arrayOfValues = dataCopy[tbname];
let arrayOfValuesToSend = []; let arrayOfValuesToSend = [];
for (let i = 0; i < arrayOfValues.length; i++) { for (let i = 0; i < arrayOfValues.length; i++) {
ts = arrayOfValues[i].ts; ts = arrayOfValues[i].ts;
let values = this.prepareValuesForTb(tbname, ts, arrayOfValues[i].values); let values = this.prepareValuesForTb(tbname, ts, arrayOfValues[i].values);
if (!this.isEmptyObject(values)) { if (!this.isEmptyObject(values)) {
arrayOfValuesToSend.push({ ts: ts, values: values }); arrayOfValuesToSend.push({ ts: ts, values: values });
} }
} }
if (arrayOfValuesToSend.length == 0) { if (arrayOfValuesToSend.length == 0) {
//if(this.debug) console.log("data not sent - empty array"); //if(this.debug) console.log("data not sent - empty array");
return; return;
} }
this.messageCounter++; this.messageCounter++;
let dataToTbModified = { let dataToTbModified = {
[tbname]: arrayOfValuesToSend [tbname]: arrayOfValuesToSend
} }
//console.log(this.sender + " DATA SEND TO TB ", tbname, this.messageCounter, new Date(ts), dataToTbModified[tbname][0].values, this.instance); //console.log(this.sender + " DATA SEND TO TB ", tbname, this.messageCounter, new Date(ts), dataToTbModified[tbname][0].values, this.instance);
//if(this.debug) console.log(this.sender + " DATA SEND TO TB ", this.index, tbname, arrayOfValuesToSend); //if(this.debug) console.log(this.sender + " DATA SEND TO TB ", this.index, tbname, arrayOfValuesToSend);
instance.send(this.index, dataToTbModified); instance.send(this.index, dataToTbModified);
} }
getDiffTimestamp(key) { getDiffTimestamp(key) {
//TODO set different value for given key!!! //TODO set different value for given key!!!
//if(key == "status") this.timeToHoldTbValue = 2*60*60;//2h //if(key == "status") this.timeToHoldTbValue = 2*60*60;//2h
return this.timeToHoldTbValue * 1000; return this.timeToHoldTbValue * 1000;
} }
prepareValuesForTb(tbname, timestamp, values) { prepareValuesForTb(tbname, timestamp, values) {
let keys = Object.keys(values); let keys = Object.keys(values);
if (keys.includes("lifetime")) this.itIsNodeReadout = true; if (keys.includes("lifetime")) this.itIsNodeReadout = true;
if (!this.previousValues.hasOwnProperty(tbname)) { if (!this.previousValues.hasOwnProperty(tbname)) {
this.previousValues[tbname] = {}; this.previousValues[tbname] = {};
} }
//if(this.debug) console.log("prepareValuesForTb", tbname, timestamp, values); //if(this.debug) console.log("prepareValuesForTb", tbname, timestamp, values);
for (let i = 0; i < keys.length; i++) { for (let i = 0; i < keys.length; i++) {
let key = keys[i]; let key = keys[i];
let value = values[key]; let value = values[key];
if (!this.previousValues[tbname].hasOwnProperty(key)) { if (!this.previousValues[tbname].hasOwnProperty(key)) {
this.previousValues[tbname][key] = { ts: timestamp, value: value }; this.previousValues[tbname][key] = { ts: timestamp, value: value };
continue; continue;
} }
// attributeData ==> {voltage: {ts:333333, value:5}} // attributeData ==> {voltage: {ts:333333, value:5}}
let attributeData = this.previousValues[tbname][key]; let attributeData = this.previousValues[tbname][key];
let attributeToChange = false; let attributeToChange = false;
if (key in this.attributeChangeLimit) attributeToChange = true; if (key in this.attributeChangeLimit) attributeToChange = true;
let limit = this.attributeChangeLimit[key]; let limit = this.attributeChangeLimit[key];
let timestampDiffToRemoveKey; let timestampDiffToRemoveKey;
//this will ensure "node statecode" will be sent just once an hour //this will ensure "node statecode" will be sent just once an hour
if (this.itIsNodeReadout && key === "statecode") { if (this.itIsNodeReadout && key === "statecode") {
attributeData.value = value; attributeData.value = value;
this.itIsNodeReadout = false; this.itIsNodeReadout = false;
timestampDiffToRemoveKey = 1 * 60 * 60 * 1000; // 1 hour timestampDiffToRemoveKey = 1 * 60 * 60 * 1000; // 1 hour
} }
if (key === "twilight_sensor" && value > 100) { if (key === "twilight_sensor" && value > 100) {
attributeData.value = value; attributeData.value = value;
} }
//if edge, master or node version do not change, send just once a day: //if edge, master or node version do not change, send just once a day:
if (["edge_fw_version", "master_node_version", "fw_version"].includes(key)) { if (["edge_fw_version", "master_node_version", "fw_version"].includes(key)) {
timestampDiffToRemoveKey = 24 * 60 * 60 * 1000; timestampDiffToRemoveKey = 24 * 60 * 60 * 1000;
} }
if (attributeData.value === value || attributeToChange && Math.abs(attributeData.value - value) < limit) { if (attributeData.value === value || attributeToChange && Math.abs(attributeData.value - value) < limit) {
let diff = timestamp - attributeData.ts; let diff = timestamp - attributeData.ts;
if (!timestampDiffToRemoveKey) timestampDiffToRemoveKey = this.getDiffTimestamp(key); if (!timestampDiffToRemoveKey) timestampDiffToRemoveKey = this.getDiffTimestamp(key);
if (diff > timestampDiffToRemoveKey) { if (diff > timestampDiffToRemoveKey) {
attributeData.ts = Date.now(); attributeData.ts = Date.now();
//if(this.debug) console.log(this.sender + ": update ts for key", key, "diff is", diff, "messageCounter", this.messageCounter); //if(this.debug) console.log(this.sender + ": update ts for key", key, "diff is", diff, "messageCounter", this.messageCounter);
} }
else { else {
delete values[key]; delete values[key];
//if(this.debug) console.log(this.sender + ": delete key", key, "diff is", diff, "messageCounter", this.messageCounter, timestampDiffToRemoveKey); //if(this.debug) console.log(this.sender + ": delete key", key, "diff is", diff, "messageCounter", this.messageCounter, timestampDiffToRemoveKey);
} }
} }
else { else {
attributeData.value = value; attributeData.value = value;
attributeData.ts = timestamp; attributeData.ts = timestamp;
} }
} }
return values; return values;
} }
} }
module.exports = DataToTbHandler; module.exports = DataToTbHandler;

View file

@ -1,112 +1,170 @@
function bytesToInt(bytes, numberOfBytes) { function bytesToInt_orig(bytes, numberOfBytes) {
let buffer = [];
if (Array.isArray(bytes)) { let buffer = [];
buffer = bytes.slice(0); if (Array.isArray(bytes)) {
if (numberOfBytes != undefined) { buffer = bytes.slice(0);
buffer = bytes.slice(bytes.length - numberOfBytes); if (numberOfBytes != undefined) {
} buffer = bytes.slice(bytes.length - numberOfBytes);
} }
else buffer.push(bytes); }
else buffer.push(bytes);
let result = 0; //var decimal = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3];
for (let i = 0; i < buffer.length; i++) {
result = (result << 8) | buffer[i]; let l = (buffer.length - 1) * 8;
} let decimal = 0;
for (let i = 0; i < buffer.length; i++) {
return result >>> 0; //ensure it's an unsigned 32-bit number var s = buffer[i] << l;
} if (l < 8) s = buffer[i]
decimal = decimal + s;
function resizeArray(arr, newSize, defaultValue) { l = l - 8;
while (newSize > arr.length) }
arr.push(defaultValue); // console.log("decimal utils.js: ", decimal);
arr.length = newSize;
} let decimal1 = 0n;
for (let i = 0; i < buffer.length; i++) {
longToByteArray = function(/*long*/long) { decimal1 += BigInt(buffer[i]) * (2n ** BigInt((buffer.length - 1 - i) * 8));
// we want to represent the input as a 8-bytes array }
var byteArray = [0, 0, 0, 0, 0, 0, 0, 0]; // console.log("decimal biging utils.js: ", decimal1);
return decimal;
for (var index = 0; index < byteArray.length; index++) { }
var byte = long & 0xff;
byteArray[index] = byte; //bytestouintBE
long = (long - byte) / 256; function bytesToInt(bytes, numberOfBytes) {
}
let buffer = [];
return byteArray; if (Array.isArray(bytes)) {
}; buffer = bytes.slice(0);
if (numberOfBytes != undefined) {
function addDays(date, days) { buffer = bytes.slice(bytes.length - numberOfBytes);
var result = new Date(date); }
result.setDate(result.getDate() + days); }
return result; else buffer.push(bytes);
}
console.log(bytes, buffer);
/*
sleep(2000).then(() => { let result = 0;
// Do something after the sleep! for (let i = 0; i < buffer.length; i++) {
result = (result << 8) | bytes[i];
}
}); // console.log("decimal biging utils.js: ", decimal1);
*/
console.log("originall: ", bytesToInt_orig(buffer));
function sleep(time) { console.log("uint little endian: ", bytesToUintLE(buffer));
return new Promise((resolve) => setTimeout(resolve, time)); console.log('neww: ', result >>> 0);
} return result >>> 0;
}
function isEmptyObject(obj) {
for (var name in obj) { function bytesToUintLE(bytes, numberOfBytes) {
return false;
} let buffer = [];
return true; if (Array.isArray(bytes)) {
} buffer = bytes.slice(0);
if (numberOfBytes != undefined) {
function convertUTCDateToLocalDate(date) { buffer = bytes.slice(bytes.length - numberOfBytes);
var newDate = new Date(date); }
newDate.setMinutes(date.getMinutes() + date.getTimezoneOffset()); }
return newDate; else buffer.push(bytes);
} //var decimal = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3];
function addZeroBefore(n) { let result = 0;
return (n < 10 ? '0' : '') + n; for (let i = buffer.length - 1; i <= 0; i--) {
} result = (result << 8) | bytes[i];
}
var convertBase = function() { return result >>> 0;
}
function convertBase(baseFrom, baseTo) {
return function(num) {
return parseInt(num, baseFrom).toString(baseTo); function resizeArray(arr, newSize, defaultValue) {
while (newSize > arr.length)
}; arr.push(defaultValue);
} arr.length = newSize;
}
// binary to decimal
convertBase.bin2dec = convertBase(2, 10); longToByteArray = function(/*long*/long) {
// we want to represent the input as a 8-bytes array
// binary to hexadecimal var byteArray = [0, 0, 0, 0, 0, 0, 0, 0];
convertBase.bin2hex = convertBase(2, 16);
for (var index = 0; index < byteArray.length; index++) {
// decimal to binary var byte = long & 0xff;
convertBase.dec2bin = convertBase(10, 2); byteArray[index] = byte;
long = (long - byte) / 256;
// decimal to hexadecimal }
convertBase.dec2hex = convertBase(10, 16);
return byteArray;
// hexadecimal to binary };
convertBase.hex2bin = convertBase(16, 2);
function addDays(date, days) {
// hexadecimal to decimal var result = new Date(date);
convertBase.hex2dec = convertBase(16, 10); result.setDate(result.getDate() + days);
return result;
return convertBase; }
}();
/*
module.exports = { sleep(2000).then(() => {
bytesToInt, // Do something after the sleep!
longToByteArray,
addDays,
addZeroBefore, });
resizeArray, */
isEmptyObject,
sleep, function sleep(time) {
convertUTCDateToLocalDate return new Promise((resolve) => setTimeout(resolve, time));
} }
function isEmptyObject(obj) {
for (var name in obj) {
return false;
}
return true;
}
function convertUTCDateToLocalDate(date) {
var newDate = new Date(date);
newDate.setMinutes(date.getMinutes() + date.getTimezoneOffset());
return newDate;
}
function addZeroBefore(n) {
return (n < 10 ? '0' : '') + n;
}
var convertBase = function() {
function convertBase(baseFrom, baseTo) {
return function(num) {
return parseInt(num, baseFrom).toString(baseTo);
};
}
// binary to decimal
convertBase.bin2dec = convertBase(2, 10);
// binary to hexadecimal
convertBase.bin2hex = convertBase(2, 16);
// decimal to binary
convertBase.dec2bin = convertBase(10, 2);
// decimal to hexadecimal
convertBase.dec2hex = convertBase(10, 16);
// hexadecimal to binary
convertBase.hex2bin = convertBase(16, 2);
// hexadecimal to decimal
convertBase.hex2dec = convertBase(16, 10);
return convertBase;
}();
module.exports = {
bytesToInt,
longToByteArray,
addDays,
addZeroBefore,
resizeArray,
isEmptyObject,
sleep,
convertUTCDateToLocalDate
}

View file

@ -16,7 +16,7 @@ exports.readme = `
`; `;
const modbus = require('jsmodbus'); const modbus = require('jsmodbus');
const SerialPort = require('serialport'); const {SerialPort} = require('serialport');
const { timeoutInterval, deviceConfig } = require("../databases/modbus_config"); const { timeoutInterval, deviceConfig } = require("../databases/modbus_config");
const { sendNotification } = require('./helper/notification_reporter'); const { sendNotification } = require('./helper/notification_reporter');
@ -36,7 +36,6 @@ let mainSocket;
let phases; let phases;
//phases where voltage is 0 (set) //phases where voltage is 0 (set)
let noVoltage; let noVoltage;
let energyToSwitchLamps;
exports.install = function(instance) { exports.install = function(instance) {
@ -77,13 +76,8 @@ exports.install = function(instance) {
let obj = this; let obj = this;
if (this.socket) { this.socket = new SerialPort({path: "/dev/ttymxc0",
this.socket.removeAllListeners(); baudRate: 9600
this.socket = null;
}
this.socket = new SerialPort("/dev/ttymxc0", {
baudRate: 9600,
}) })
// we create a client for every deviceAddress ( = address) in list and push them into dictionary // we create a client for every deviceAddress ( = address) in list and push them into dictionary
@ -92,11 +86,15 @@ exports.install = function(instance) {
} }
this.socket.on('error', function(e) { this.socket.on('error', function(e) {
console.log('Modbus_reader: Socket connection error', e); //'ECONNREFUSED' or 'ECONNRESET' ?? console.log('socket connection error', e);
if (e.code == 'ECONNREFUSED' || e.code == 'ECONNRESET') {
console.log(exports.title + ' Waiting 10 seconds before trying to connect again');
setTimeout(obj.startSocket, 10000);
}
}); });
this.socket.on('close', function() { this.socket.on('close', function() {
console.log('Modbus_reader: Socket connection closed - Waiting 10 seconds before connecting again'); console.log('Socket connection closed ' + exports.title + ' Waiting 10 seconds before trying to connect again');
setTimeout(obj.startSocket, 10000); setTimeout(obj.startSocket, 10000);
}); });
@ -117,8 +115,7 @@ exports.install = function(instance) {
this.deviceAddress = dev.deviceAddress; // 1 or 2 or any number this.deviceAddress = dev.deviceAddress; // 1 or 2 or any number
this.device = dev.device; //em340, twilight_sensor this.device = dev.device; //em340, twilight_sensor
//if we just start to loop devices from the beginning, or there is just 1 device in config, we wait whole timeoutInterval if (this.indexInDeviceConfig == 0) setTimeout(this.readRegisters, this.timeoutInterval);
if (this.indexInDeviceConfig == 0 || deviceConfig.length === 1) setTimeout(this.readRegisters, this.timeoutInterval);
else setTimeout(this.readRegisters, DELAY_BETWEEN_DEVICES); else setTimeout(this.readRegisters, DELAY_BETWEEN_DEVICES);
} }
@ -307,12 +304,15 @@ exports.install = function(instance) {
const actualTotalPower = values.total_power; const actualTotalPower = values.total_power;
if (actualTotalPower > energyToSwitchLamps && this.onNotificationSent == false) { const numberOfNodes = Object.keys(FLOW.GLOBALS.nodesData).length;
if (numberOfNodes == 0) numberOfNodes = 20; // to make sure, we send notification if totalPower is more than 300
if (actualTotalPower > numberOfNodes * 15 && this.onNotificationSent == false) {
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_on", {}, "", SEND_TO.tb, instance); sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_on", {}, "", SEND_TO.tb, instance);
this.onNotificationSent = true; this.onNotificationSent = true;
this.offNotificationSent = false; this.offNotificationSent = false;
} }
else if (actualTotalPower <= energyToSwitchLamps && this.offNotificationSent == false) { else if (actualTotalPower <= numberOfNodes * 15 && this.offNotificationSent == false) {
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_off", {}, "", SEND_TO.tb, instance); sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_off", {}, "", SEND_TO.tb, instance);
this.onNotificationSent = false; this.onNotificationSent = false;
this.offNotificationSent = true; this.offNotificationSent = true;
@ -330,9 +330,9 @@ exports.install = function(instance) {
phases = FLOW.GLOBALS.settings.phases; phases = FLOW.GLOBALS.settings.phases;
tbName = FLOW.GLOBALS.settings.rvoTbName; tbName = FLOW.GLOBALS.settings.rvoTbName;
noVoltage = FLOW.GLOBALS.settings.no_voltage; noVoltage = FLOW.GLOBALS.settings.no_voltage;
energyToSwitchLamps = FLOW.GLOBALS.settings.energy_to_switch_lamps / 2.5; //half value is enought to show if lamps are turned on or off mainSocket = new SocketWithClients();
if (deviceConfig.length) mainSocket = new SocketWithClients();
else console.log("Modbus_reader: no modbus device in configuration"); console.log("novoltage: ", noVoltage, typeof noVoltage);
// this notification is to show, that flow (unipi) has been restarted // this notification is to show, that flow (unipi) has been restarted
sendNotification("modbus_reader", tbName, "flow_restart", {}, "", SEND_TO.slack, instance); sendNotification("modbus_reader", tbName, "flow_restart", {}, "", SEND_TO.slack, instance);

0
flow/variables.txt Normal file
View file

View file

@ -44,9 +44,9 @@ const fs = require('fs');
const mqtt = require('mqtt'); const mqtt = require('mqtt');
const SEND_TO = { const SEND_TO = {
debug: 0, debug: 0,
rpcCall: 1, rpcCall: 1,
services: 2 services: 2
} }
//CONFIG //CONFIG
@ -72,13 +72,13 @@ let sendClientError = true;
process.on('uncaughtException', function(err) { process.on('uncaughtException', function(err) {
errLogger.error('uncaughtException:', err.message) errLogger.error('uncaughtException:', err.message)
errLogger.error(err.stack); errLogger.error(err.stack);
//TODO //TODO
//send to service //send to service
//process.exit(1); //process.exit(1);
}) })
const nosql = NOSQL('tbdata'); const nosql = NOSQL('tbdata');
@ -87,362 +87,364 @@ const nosqlBackup = NOSQL('/backup/tbdata');
exports.install = function(instance) { exports.install = function(instance) {
var client; var client;
var opts; var opts;
var clientReady = false; var clientReady = false;
// wsmqtt status for notification purposes on projects.worksys.io database // wsmqtt status for notification purposes on projects.worksys.io database
let wsmqttName = null; let wsmqttName = null;
let sendWsStatusVar = null; let sendWsStatusVar = null;
let wsmqtt_status = 'disconnected'; let wsmqtt_status = 'disconnected';
function getWsmqttName(host) { function getWsmqttName(host) {
if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo'; if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01'; else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01'; else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
} }
function sendWsStatus() { function sendWsStatus() {
instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status }); instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status });
} }
function main() { function main() {
if (!FLOW.dbLoaded) return; if (!FLOW.dbLoaded) return;
loadSettings(); loadSettings();
clearInterval(sendWsStatus); clearInterval(sendWsStatus);
sendWsStatusVar = setInterval(sendWsStatus, 180000); sendWsStatusVar = setInterval(sendWsStatus, 180000);
} }
//set opts according to db settings //set opts according to db settings
function loadSettings() { function loadSettings() {
if (instance.options.host !== "") { if (instance.options.host !== "") {
//override settings from database //override settings from database
var o = instance.options; var o = instance.options;
opts = { opts = {
host: o.host, host: o.host,
port: o.port, port: o.port,
clientId: o.clientid, clientId: o.clientid,
username: o.username, username: o.username,
rejectUnauthorized: false, rejectUnauthorized: false,
resubscribe: false resubscribe: false
}; };
wsmqttName = getWsmqttName(o.host); wsmqttName = getWsmqttName(o.host);
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options); console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
} }
else { else {
const SETTINGS = FLOW.GLOBALS.settings; const SETTINGS = FLOW.GLOBALS.settings;
backup_on_failure = SETTINGS.backup_on_failure; backup_on_failure = SETTINGS.backup_on_failure;
saveTelemetryOnError = backup_on_failure; saveTelemetryOnError = backup_on_failure;
restore_from_backup = SETTINGS.restore_from_backup; restore_from_backup = SETTINGS.restore_from_backup;
restore_backup_wait = SETTINGS.restore_backup_wait; restore_backup_wait = SETTINGS.restore_backup_wait;
let mqtt_host = SETTINGS.mqtt_host; let mqtt_host = SETTINGS.mqtt_host;
let mqtt_clientid = SETTINGS.mqtt_clientid; let mqtt_clientid = SETTINGS.mqtt_clientid;
let mqtt_username = SETTINGS.mqtt_username; let mqtt_username = SETTINGS.mqtt_username;
let mqtt_port = SETTINGS.mqtt_port; let mqtt_port = SETTINGS.mqtt_port;
opts = { opts = {
host: mqtt_host, host: mqtt_host,
port: mqtt_port, port: mqtt_port,
keepalive: 10, keepalive: 10,
clientId: mqtt_clientid, clientId: mqtt_clientid,
username: mqtt_username, username: mqtt_username,
rejectUnauthorized: false, rejectUnauthorized: false,
resubscribe: false resubscribe: false,
}; };
wsmqttName = getWsmqttName(mqtt_host); wsmqttName = getWsmqttName(mqtt_host);
} }
connectToTbServer(); connectToTbServer();
} }
function connectToTbServer() { function connectToTbServer() {
var url = "mqtt://" + opts.host + ":" + opts.port; var url = "mqtt://" + opts.host + ":" + opts.port;
console.log("MQTT URL: ", url); console.log("MQTT URL: ", url);
client = mqtt.connect(url, opts); client = mqtt.connect(url, opts);
client.on('connect', function() { client.on('connect', function() {
instance.status("Connected", "green"); instance.status("Connected", "green");
//monitor.info("MQTT client connected"); //monitor.info("MQTT client connected");
sendClientError = true; sendClientError = true;
clientReady = true; clientReady = true;
wsmqtt_status = 'connected'; wsmqtt_status = 'connected';
}); });
client.on('reconnect', function() { client.on('reconnect', function() {
instance.status("Reconnecting", "yellow"); instance.status("Reconnecting", "yellow");
clientReady = false; clientReady = false;
}); });
client.on('message', function(topic, message) { client.on('message', function(topic, message) {
// message is type of buffer // message is type of buffer
message = message.toString(); message = message.toString();
if (message[0] === '{') { if (message[0] === '{') {
TRY(function() {
try {
message = JSON.parse(message); message = JSON.parse(message);
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 }); client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } }); instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
} }
}, () => instance.debug('MQTT: Error parsing data', message)); } catch (e) {
} console.log('MQTT: Error parsing data', e);
}
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message }); }
});
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message });
client.on('close', function() { });
clientReady = false;
wsmqtt_status = 'disconnected'; client.on('close', function() {
clientReady = false;
instance.status("Disconnected", "red"); wsmqtt_status = 'disconnected';
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
}); instance.status("Disconnected", "red");
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
client.on('error', function(err) { });
instance.status("Err: " + err.code, "red");
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts }); client.on('error', function(err) {
if (sendClientError) { instance.status("Err: " + err.code, "red");
monitor.info('MQTT client error', err); instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
sendClientError = false; if (sendClientError) {
} monitor.info('MQTT client error', err);
clientReady = false; sendClientError = false;
wsmqtt_status = 'disconnected'; }
}); clientReady = false;
wsmqtt_status = 'disconnected';
} });
}
instance.on("0", _ => {
main();
}) instance.on("0", _ => {
main();
})
instance.on('1', function(data) { instance.on('1', function(data) {
if (clientReady) { if (clientReady) {
//do we have some data in backup file? if any, process data from database //do we have some data in backup file? if any, process data from database
if (saveTelemetryOnError) { if (saveTelemetryOnError) {
//read telemetry data and send back to server //read telemetry data and send back to server
if (!processingData) processDataFromDatabase(); if (!processingData) processDataFromDatabase();
} }
let stringifiedJson = JSON.stringify(data.data); let stringifiedJson = JSON.stringify(data.data);
client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 }); client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 });
//backup telemetry //backup telemetry
if (createTelemetryBackup) { if (createTelemetryBackup) {
data.data.id = UID(); data.data.id = UID();
nosqlBackup.insert(data.data); nosqlBackup.insert(data.data);
insertBackupNoSqlCounter++; insertBackupNoSqlCounter++;
if (insertBackupNoSqlCounter > 150) { if (insertBackupNoSqlCounter > 150) {
let options = { compress: true }; let options = { compress: true };
let path = __dirname + "/../databases/backup/tbdata.nosql"; let path = __dirname + "/../databases/backup/tbdata.nosql";
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options); var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
stream.write(""); stream.write("");
stream.end(); stream.end();
insertBackupNoSqlCounter = 0; insertBackupNoSqlCounter = 0;
} }
} }
} }
else { else {
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data)); //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data }); instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
if (saveTelemetryOnError) { if (saveTelemetryOnError) {
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql //create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile(); makeBackupFromDbFile();
//write to tb //write to tb
data.data.id = UID(); data.data.id = UID();
nosql.insert(data.data); nosql.insert(data.data);
} }
} }
}); });
instance.close = function(done) { instance.close = function(done) {
if (clientReady) { if (clientReady) {
client.end(); client.end();
clearInterval(sendWsStatusVar); clearInterval(sendWsStatusVar);
} }
}; };
function getDbBackupFileCounter(type) { function getDbBackupFileCounter(type) {
var files = fs.readdirSync(__dirname + "/../databases"); var files = fs.readdirSync(__dirname + "/../databases");
let counter = 0; let counter = 0;
for (var i = 0; i < files.length; i++) { for (var i = 0; i < files.length; i++) {
if (files[i] == "tbdata.nosql") continue; if (files[i] == "tbdata.nosql") continue;
if (files[i].endsWith(".nosql")) { if (files[i].endsWith(".nosql")) {
let pos = files[i].indexOf("."); let pos = files[i].indexOf(".");
if (pos > -1) { if (pos > -1) {
let fileCounter = counter; let fileCounter = counter;
let firstDigit = files[i].slice(0, pos); let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit); fileCounter = parseInt(firstDigit);
if (isNaN(fileCounter)) fileCounter = 0; if (isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type); //console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if (type == "max") { if (type == "max") {
if (fileCounter > counter) { if (fileCounter > counter) {
counter = fileCounter; counter = fileCounter;
} }
} }
else if (type == "min") { else if (type == "min") {
if (counter == 0) counter = fileCounter; if (counter == 0) counter = fileCounter;
if (fileCounter < counter) { if (fileCounter < counter) {
counter = fileCounter; counter = fileCounter;
} }
} }
} }
} }
} }
if (type == "max") counter++; if (type == "max") counter++;
return counter; return counter;
} }
const makeBackupFromDbFile = async () => { const makeBackupFromDbFile = async () => {
if (!saveTelemetryOnError) return; if (!saveTelemetryOnError) return;
//to avoid large file: tbdata.nosql //to avoid large file: tbdata.nosql
//init value is 0! //init value is 0!
if (insertNoSqlCounter > 0) { if (insertNoSqlCounter > 0) {
--insertNoSqlCounter; --insertNoSqlCounter;
return; return;
} }
insertNoSqlCounter = 100; insertNoSqlCounter = 100;
let source = __dirname + "/../databases/tbdata.nosql"; let source = __dirname + "/../databases/tbdata.nosql";
var stats = fs.statSync(source); var stats = fs.statSync(source);
var fileSizeInBytes = stats.size; var fileSizeInBytes = stats.size;
if (fileSizeInBytes > noSqlFileSizeLimit) { if (fileSizeInBytes > noSqlFileSizeLimit) {
let counter = 1; let counter = 1;
counter = getDbBackupFileCounter("max"); counter = getDbBackupFileCounter("max");
let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql"; let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql";
//make backup file //make backup file
fs.copyFileSync(source, destination); fs.copyFileSync(source, destination);
//fs.renameSync(p, p + "." + counter); //fs.renameSync(p, p + "." + counter);
//clear tbdata.nosql //clear tbdata.nosql
fs.writeFileSync(source, ""); fs.writeFileSync(source, "");
fs.truncateSync(source, 0); fs.truncateSync(source, 0);
} }
} }
const processDataFromDatabase = async () => { const processDataFromDatabase = async () => {
if (restore_from_backup <= 0) return; if (restore_from_backup <= 0) return;
//calculate diff //calculate diff
const now = new Date(); const now = new Date();
let currentTime = now.getTime(); let currentTime = now.getTime();
let diff = currentTime - lastRestoreTime; let diff = currentTime - lastRestoreTime;
if ((diff / 1000) < restore_backup_wait) { if ((diff / 1000) < restore_backup_wait) {
//console.log("*********restore_backup_wait", diff, restore_backup_wait); //console.log("*********restore_backup_wait", diff, restore_backup_wait);
return; return;
} }
processingData = true; processingData = true;
//get filename to process //get filename to process
let counter = getDbBackupFileCounter("min"); let counter = getDbBackupFileCounter("min");
//we have some backup files //we have some backup files
let dataBase = 'tbdata'; let dataBase = 'tbdata';
var nosql; var nosql;
if (counter == 0) dataBase = 'tbdata'; if (counter == 0) dataBase = 'tbdata';
else dataBase = counter + "." + 'tbdata'; else dataBase = counter + "." + 'tbdata';
nosql = NOSQL(dataBase); nosql = NOSQL(dataBase);
//select all data - use limit restore_from_backup //select all data - use limit restore_from_backup
let records = await promisifyBuilder(nosql.find().take(restore_from_backup)); let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
for (let i = 0; i < records.length; i++) { for (let i = 0; i < records.length; i++) {
if (clientReady) { if (clientReady) {
let item = records[i]; let item = records[i];
let id = item.id; let id = item.id;
if (id !== undefined) { if (id !== undefined) {
//console.log("------------processDataFromDatabase - remove", id, dataBase, i); //console.log("------------processDataFromDatabase - remove", id, dataBase, i);
try { try {
let message = JSON.parse(JSON.stringify(item)); let message = JSON.parse(JSON.stringify(item));
delete message.id; delete message.id;
client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 }); client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 });
//remove from database //remove from database
await promisifyBuilder(nosql.remove().where("id", id)); await promisifyBuilder(nosql.remove().where("id", id));
} catch (error) { } catch (error) {
//process error //process error
console.log("processDataFromDatabase", error); console.log("processDataFromDatabase", error);
} }
} }
} }
else { else {
processingData = false; processingData = false;
return; return;
} }
} }
if (records.length > 0) { if (records.length > 0) {
//clean backup file //clean backup file
if (counter > 0) nosql.clean(); if (counter > 0) nosql.clean();
} }
//no data in db, remove //no data in db, remove
if (records.length == 0) { if (records.length == 0) {
if (counter > 0) nosql.drop(); if (counter > 0) nosql.drop();
} }
const d = new Date(); const d = new Date();
lastRestoreTime = d.getTime(); lastRestoreTime = d.getTime();
processingData = false; processingData = false;
} }
instance.on('options', main); instance.on('options', main);
//instance.reconfigure(); //instance.reconfigure();
}; };