-
-
@(Name of this server)
-
@(Slack channel to receive the alerts)
@@ -170,11 +167,12 @@ exports.install = function(instance) {
FLOW["savedSlackMessages"] = [];
}
+ instance.options.name = FLOW.GLOBALS.settings.rvo_name;
if (instance.options.name) {
instance.status('Running');
running = true;
} else {
- instance.status('Please enter name', 'red');
+ instance.status('Please run options again', 'red');
running = false;
}
} catch (e) {
@@ -183,5 +181,7 @@ exports.install = function(instance) {
};
instance.on('options', instance.reconfigure);
- instance.reconfigure();
-};
\ No newline at end of file
+ setTimeout(instance.reconfigure, 10000);
+
+
+};
diff --git a/flow/thermometer.js b/flow/thermometer.js
index b5ee420..b3bb350 100644
--- a/flow/thermometer.js
+++ b/flow/thermometer.js
@@ -2,6 +2,7 @@ exports.id = 'thermometer';
exports.title = 'Thermometer';
exports.group = 'Worksys';
exports.color = '#5CB36D';
+exports.input = 1;
exports.version = '1.0.3';
exports.output = ["red", "white", "blue"];
exports.author = 'Rastislav Kovac';
@@ -9,7 +10,9 @@ exports.icon = 'thermometer-three-quarters';
exports.readme = `# Getting temperature values for RVO. In case of LM, you need device address. In case of unipi, evok sends values, in case thermometer is installed`;
-const instanceSendTo = {
+const { errLogger, logger, monitor } = require('./helper/logger');
+
+const SEND_TO = {
debug: 0,
tb: 1,
dido_controller: 2
@@ -18,56 +21,16 @@ const instanceSendTo = {
//read temperature - frequency
let timeoutMin = 5;//minutes
-var path = require('path');
-var log4js = require("log4js");
-
-log4js.configure({
- appenders: {
- errLogs: { type: 'file', filename: path.join(__dirname + "/../", 'err.txt') },
- monitorLogs: { type: 'file', compress:true, daysToKeep: 2, maxLogSize: 1048576, backups: 1, keepFileExt: true, filename: path.join(__dirname + "/../", 'monitor.txt') },
- console: { type: 'console' }
- },
- categories: {
- errLogs: { appenders: ['console', 'errLogs'], level: 'error' },
- monitorLogs: { appenders: ['console', 'monitorLogs'], level: 'trace' },
- //another: { appenders: ['console'], level: 'trace' },
- default: { appenders: ['console'], level: 'trace' }
- }
-});
-
-const errLogger = log4js.getLogger("errLogs");
-const logger = log4js.getLogger();
-const monitor = log4js.getLogger("monitorLogs");
-
-//logger.debug("text")
-//monitor.info('info');
-//errLogger.error("some error");
-
-const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper');
-const dbSettings = TABLE("settings");
-let temperatureAddress = "";
-
-async function loadSettings()
-{
- //todo global FLOW.OMS_edgeName is making problem, so we load it here as well, it should not be
- let responseSettings = await promisifyBuilder(dbSettings.find());
- temperatureAddress = responseSettings[0]["temperature_adress"];
-}
-
-loadSettings();
-
exports.install = function(instance) {
const { exec } = require('child_process');
- const { sendNotification, ERRWEIGHT } = require('./helper/notification_reporter');
+ const { sendNotification } = require('./helper/notification_reporter');
let startRead;
- let dataToTb;
let counter = 0;
-
- let edgeName = "";
-
+ let rvoTbName = "";
+ let temperatureAddress = "";
logger.debug(exports.title, "installed");
@@ -76,11 +39,11 @@ exports.install = function(instance) {
})
- const start = function() {
+ const main = function() {
try {
- if(FLOW.OMS_controller_type === "unipi")
+ if(FLOW.GLOBALS.settings.controller_type === "unipi")
{
clearInterval(startRead);
return;
@@ -88,71 +51,23 @@ exports.install = function(instance) {
if(temperatureAddress === "") throw "gettemperature: temperatureAddress is not defined";
- logger.debug("FLOW.OMS_temperature_adress", FLOW.OMS_temperature_adress);
-
exec(`owread -C ${temperatureAddress}/temperature`, (error, stdout, stderr) => {
- edgeName = FLOW.OMS_edgeName;
-
- if(edgeName !== "")
+ if(!error)
{
- if(error)
- {
-
- if(FLOW.OMS_brokerready == undefined)
- {
- logger.debug("gettemparature - FLOW.OMS_brokerready is undefined");
-
- setTimeout(function(){
- start();
- }, 3000);
-
- return;
- }
-
- if(FLOW.OMS_brokerready)
- {
- //sendNotification("start", edgeName, ERRWEIGHT.WARNING, "Thermometer is not responding", {"Error": error}, instanceSendTo.tb, instance, "thermometer");
- sendNotification("start", edgeName, "thermometer_is_not_responding", {}, {"Error": error}, instanceSendTo.tb, instance, "thermometer");
- }
-
- let status = "NOK";
- dataToTb = {
- [edgeName]: [
- {
- "ts": Date.now(),
- "values": {
- "status": status
- }
- }
- ]
- }
-
- monitor.info("Thermometer is not responding", error, FLOW.OMS_brokerready);
-
- // instance.send(instanceSendTo.tb, dataToTb); // poslat stav nok do tb, ak to handluje dido_controller ??
- instance.send(instanceSendTo.dido_controller, {status: "NOK-thermometer"});
- }
- else parseData(stdout);
- }
- else
- {
- monitor.info("gettemperature: edgeName is not defined", FLOW.OMS_edgeName);
-
- setTimeout(function(){
- start();
- }, 3000);
-
+ parseData(stdout)
return;
}
-
- //instance.send({"Temp":stdout,"stderr":stderr,"err":error});
+ sendNotification("main", rvoTbName, "thermometer_is_not_responding", {}, {"Error": error}, SEND_TO.tb, instance, "thermometer");
+ monitor.info("Thermometer is not responding", error);
+ instance.send(SEND_TO.dido_controller, {status: "NOK-thermometer"});
});
}
catch(err) {
errLogger.error(exports.title, err);
+ clearInterval(startRead);
}
}
@@ -166,30 +81,17 @@ exports.install = function(instance) {
if(counter > 290)
{
- instance.send(instanceSendTo.debug, "[Get temperature component] - temperature data are comming again from RVO after more than 1 day break");
-
- //sendNotification("parseData", edgeName, ERRWEIGHT.NOTICE, "Thermometer is working again", "", instanceSendTo.tb, instance, "thermometer");
- if(FLOW.OMS_brokerready) sendNotification("parseData", edgeName, "thermometer_is_responding_again", {}, "", instanceSendTo.tb, instance, "thermometer");
+ instance.send(SEND_TO.debug, "[Get temperature component] - temperature data are comming again from RVO after more than 1 day break");
+ sendNotification("parseData", rvoTbName, "thermometer_is_responding_again", {}, "", SEND_TO.tb, instance, "thermometer");
}
logger.debug("gettemperature", data);
+
const values = {
"temperature": Number(data.toFixed(2)),
- "status": "OK"
}
- dataToTb = {
- [edgeName]: [
- {
- "ts": Date.now(),
- "values":values
- }
- ]
- }
-
- instance.send(instanceSendTo.tb, dataToTb);
- instance.send(instanceSendTo.dido_controller, values);
-
+ instance.send(SEND_TO.dido_controller, {values: values});
counter = 0;
} else {
@@ -200,24 +102,21 @@ exports.install = function(instance) {
//ked je problem 1 den
let day = 24 * 60 / timeoutMin;
if ( counter > day && counter < day + 2 ) {
- //sendNotification("parseData", edgeName, ERRWEIGHT.WARNING, "Thermometer receives invalid data", "", instanceSendTo.tb, instance, "thermometer");
- sendNotification("parseData", edgeName, "thermometer_sends_invalid_data", {}, "", instanceSendTo.tb, instance, "thermometer");
+ //sendNotification("parseData", rvoTbName, ERRWEIGHT.WARNING, "Thermometer receives invalid data", "", SEND_TO.tb, instance, "thermometer");
+ sendNotification("parseData", rvoTbName, "thermometer_sends_invalid_data", {}, "", SEND_TO.tb, instance, "thermometer");
- instance.send(instanceSendTo.debug, "[Get temperature component] - no temperature data from RVO for more than 1 day");
- instance.send(instanceSendTo.dido_controller, {status: "NOK-thermometer"});
+ instance.send(SEND_TO.debug, "[Get temperature component] - no temperature data from RVO for more than 1 day");
+ instance.send(SEND_TO.dido_controller, {status: "NOK-thermometer"});
}
}
}
- setTimeout(function(){
- start();
- }, 10000);
-
- startRead = setInterval(start, timeoutMin * 1000 * 60);
-
- //testing
- //setInterval(() => {instance.send(instanceSendTo.dido_controller, {status: "NOK-thermometer"})}, 180000);
-
+ instance.on("data", _ => {
+ temperatureAddress = FLOW.GLOBALS.settings.temperature_address;
+ rvoTbName = FLOW.GLOBALS.settings.rvoTbName;
+ startRead = setInterval(main, timeoutMin * 1000 * 60);
+ main();
+ })
};
\ No newline at end of file
diff --git a/flow/wsmqttpublish.js b/flow/wsmqttpublish.js
index 20f64fa..e11d9eb 100644
--- a/flow/wsmqttpublish.js
+++ b/flow/wsmqttpublish.js
@@ -4,30 +4,27 @@ exports.group = 'MQTT';
exports.color = '#888600';
exports.version = '1.0.2';
exports.icon = 'sign-out';
-exports.input = 1;
-exports.output = ["red", "white", "blue"];
-exports.author = 'Daniel Segeš';
+exports.input = 2;
+exports.output = 4;
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
-exports.npm = ['mqtt'];
-
exports.html = `
-
-
-
Hostname or IP address (if not empty - setting will override db setting)
-
-
-
-
+
+
+
Hostname or IP address (if not empty - setting will override db setting)
+
+
+
+
`;
@@ -41,21 +38,22 @@ Added:
- rpc response
`;
-const instanceSendTo = {
+const { promisifyBuilder } = require('./helper/db_helper');
+const { errLogger, monitor } = require('./helper/logger');
+const fs = require('fs');
+const mqtt = require('mqtt');
+
+const SEND_TO = {
debug: 0,
rpcCall: 1,
services: 2
}
-const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
-
//CONFIG
-let useLog4js = true;
let createTelemetryBackup = true;
let saveTelemetryOnError = true;//backup_on_failure overrides this value
//------------------------
-var fs = require('fs');
let rollers;
if(createTelemetryBackup) rollers = require('streamroller');
@@ -64,56 +62,18 @@ let insertNoSqlCounter = 0;
let insertBackupNoSqlCounter = 0;
let processingData = false;
-let backup_on_failure = false;//== saveTelemetryOnError - create backup broker send failure
+let backup_on_failure = false;//== saveTelemetryOnError - create backup client send failure
let restore_from_backup = 0; //how many rows process at once?
let restore_backup_wait = 0;//wait seconds
let lastRestoreTime = 0;
-let errLogger;
-let logger;
-let monitor;
-
-//TODO brokerready and sendBrokerError seems to be the same. Moreover, we use FLOW_OMS_brokerready variable!!
-//
-// if there is an error in broker connection, flow logs to monitor.txt. Not to log messages every second, we use sendBrokerError variable
-let sendBrokerError = true;
-
-if(useLog4js)
-{
- var path = require('path');
- var log4js = require("log4js");
-
- log4js.configure({
- appenders: {
- errLogs: { type: 'file', filename: path.join(__dirname + "/../", 'err.txt') },
- monitorLogs: { type: 'file', compress:true, daysToKeep: 2, maxLogSize: 1048576, backups: 1, keepFileExt: true, filename: path.join(__dirname + "/../", 'monitor.txt') },
- console: { type: 'console' }
- },
- categories: {
- errLogs: { appenders: ['console', 'errLogs'], level: 'error' },
- monitorLogs: { appenders: ['console', 'monitorLogs'], level: 'trace' },
- //another: { appenders: ['console'], level: 'trace' },
- default: { appenders: ['console'], level: 'trace' }
- }
- });
-
- errLogger = log4js.getLogger("errLogs");
- logger = log4js.getLogger();
- monitor = log4js.getLogger("monitorLogs");
-
- //USAGE
- //logger.debug("text");
- //monitor.info('info');
- //errLogger.error("some error");
-}
+// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
+let sendClientError = true;
process.on('uncaughtException', function (err) {
- if(errLogger)
- {
- errLogger.error('uncaughtException:', err.message)
- errLogger.error(err.stack);
- }
+ errLogger.error('uncaughtException:', err.message)
+ errLogger.error(err.stack);
//TODO
//send to service
@@ -127,13 +87,9 @@ const nosqlBackup = NOSQL('/backup/tbdata');
exports.install = function(instance) {
- var broker;
+ var client;
var opts;
- var brokerready = false;
-
- instance.on('options', loadSettings);
-
- mqtt = require('mqtt');
+ var clientReady = false;
// wsmqtt status for notification purposes on projects.worksys.io database
let wsmqttName = null;
@@ -142,21 +98,28 @@ exports.install = function(instance) {
function getWsmqttName(host)
{
- if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
- else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
- else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
+ if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
+ else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
+ else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
}
function sendWsStatus()
{
- instance.send(instanceSendTo.services, {[wsmqttName]: wsmqtt_status});
+ instance.send(SEND_TO.services, {[wsmqttName]: wsmqtt_status});
}
- sendWsStatusVar = setInterval(sendWsStatus, 180000);
+ function main()
+ {
+ if(!FLOW.dbLoaded) return;
+
+ loadSettings();
+ clearInterval(sendWsStatus);
+ sendWsStatusVar = setInterval(sendWsStatus, 180000);
+ }
//set opts according to db settings
- async function loadSettings()
+ function loadSettings()
{
if(instance.options.host !== "")
@@ -179,21 +142,17 @@ exports.install = function(instance) {
else
{
- const dbSettings = TABLE("settings");
- let responseSettings = await promisifyBuilder(dbSettings.find());
-
- backup_on_failure = responseSettings[0]["backup_on_failure"];
+ const SETTINGS = FLOW.GLOBALS.settings;
+ backup_on_failure = SETTINGS.backup_on_failure;
saveTelemetryOnError = backup_on_failure;
- restore_from_backup = responseSettings[0]["restore_from_backup"];
- restore_backup_wait = responseSettings[0]["restore_backup_wait"];
+ restore_from_backup = SETTINGS.restore_from_backup;
+ restore_backup_wait = SETTINGS.restore_backup_wait;
- let mqtt_host = responseSettings[0]["mqtt_host"];
- let mqtt_clientid = responseSettings[0]["mqtt_clientid"];
- let mqtt_username = responseSettings[0]["mqtt_username"];
- let mqtt_port = responseSettings[0]["mqtt_port"];
-
- console.log("wsmqttpublich -> loadSettings from db", responseSettings[0]);
+ let mqtt_host = SETTINGS.mqtt_host;
+ let mqtt_clientid = SETTINGS.mqtt_clientid;
+ let mqtt_username = SETTINGS.mqtt_username;
+ let mqtt_port = SETTINGS.mqtt_port;
opts = {
host: mqtt_host,
@@ -216,27 +175,23 @@ exports.install = function(instance) {
var url = "mqtt://" + opts.host + ":" + opts.port;
console.log("MQTT URL: ", url);
- broker = mqtt.connect(url, opts);
+ client = mqtt.connect(url, opts);
- broker.on('connect', function() {
+ client.on('connect', function() {
instance.status("Connected", "green");
- monitor.info("MQTT broker connected");
+ monitor.info("MQTT client connected");
- sendBrokerError = true;
-
- brokerready = true;
- FLOW.OMS_brokerready = brokerready;
+ sendClientError = true;
+ clientReady = true;
wsmqtt_status = 'connected';
});
- broker.on('reconnect', function() {
+ client.on('reconnect', function() {
instance.status("Reconnecting", "yellow");
- brokerready = false;
-
- FLOW.OMS_brokerready = brokerready;
+ clientReady = false;
});
- broker.on('message', function(topic, message) {
+ client.on('message', function(topic, message) {
// message is type of buffer
message = message.toString();
if (message[0] === '{') {
@@ -244,50 +199,53 @@ exports.install = function(instance) {
message = JSON.parse(message);
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
- broker.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
- instance.send(instanceSendTo.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
+ client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
+ instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
}
}, () => instance.debug('MQTT: Error parsing data', message));
}
- instance.send(instanceSendTo.rpcCall, {"topic":topic, "content":message });
+ instance.send(SEND_TO.rpcCall, {"topic":topic, "content":message });
});
- broker.on('close', function(err) {
- brokerready = false;
- FLOW.OMS_brokerready = brokerready;
+ client.on('close', function(err) {
+ clientReady = false;
wsmqtt_status = 'disconnected';
if (err && err.toString().indexOf('Error')) {
instance.status("Err: "+err.code, "red");
- instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
+ instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts });
} else {
instance.status("Disconnected", "red");
- instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
+ instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts });
}
- broker.reconnect();
+ client.reconnect();
});
- broker.on('error', function(err) {
+ client.on('error', function(err) {
instance.status("Err: "+ err.code, "red");
- instance.send(instanceSendTo.debug, {"message":"Broker ERROR signal received !", "error":err, "opt":opts });
- if(sendBrokerError) {
- monitor.info('MQTT broker error', err);
- sendBrokerError = false;
+ instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
+ if(sendClientError) {
+ monitor.info('MQTT client error', err);
+ sendClientError = false;
}
- brokerready = false;
- FLOW.OMS_brokerready = brokerready;
+ clientReady = false;
wsmqtt_status = 'disconnected';
-
});
}
- instance.on('data', function(data) {
- if (brokerready)
+ instance.on("0", _ => {
+ main();
+ })
+
+
+ instance.on('1', function(data) {
+
+ if(clientReady)
{
//do we have some data in backup file?
//if any, process data from database
@@ -296,13 +254,14 @@ exports.install = function(instance) {
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
}
-
}
- if (brokerready)
+ if(clientReady)
{
let stringifiedJson = JSON.stringify(data.data);
- broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
+ client.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
+
+ instance.send(3, stringifiedJson);
//backup telemetry
if(createTelemetryBackup)
@@ -327,8 +286,8 @@ exports.install = function(instance) {
else
{
- if(logger) logger.debug("Broker unavailable. Data not sent !", JSON.stringify(data.data));
- instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data });
+ //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
+ instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
@@ -344,9 +303,9 @@ exports.install = function(instance) {
});
- instance.close = function(done) {
- if (brokerready){
- broker.end();
+ instance.close = function(done) {
+ if(clientReady){
+ client.end();
clearInterval(sendWsStatusVar);
}
};
@@ -373,7 +332,7 @@ exports.install = function(instance) {
let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit);
- if (isNaN(fileCounter)) fileCounter = 0;
+ if(isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if(type == "max")
@@ -443,10 +402,7 @@ exports.install = function(instance) {
const processDataFromDatabase = async () => {
- if(restore_from_backup <= 0)
- {
- return;
- }
+ if(restore_from_backup <= 0) return;
//calculate diff
const now = new Date();
@@ -478,7 +434,7 @@ exports.install = function(instance) {
for(let i = 0; i < records.length; i++)
{
- if (brokerready) {
+ if(clientReady) {
let item = records[i];
let id = item.id;
@@ -487,18 +443,19 @@ exports.install = function(instance) {
{
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
- try{
+ try {
let o = JSON.parse(JSON.stringify(item));
delete o.id;
let message = JSON.stringify(o);
- broker.publish("v1/gateway/telemetry", message, {qos:1});
+ client.publish("v1/gateway/telemetry", message, {qos:1});
+ instance.send(3, message);
//remove from database
await promisifyBuilder(nosql.remove().where("id", id));
- } catch (error) {
+ } catch(error) {
//process error
console.log("processDataFromDatabase", error);
}
@@ -533,8 +490,6 @@ exports.install = function(instance) {
}
- loadSettings();
-
- //instance.on('options', instance.reconfigure);
- //instance.reconfigure();
+ instance.on('options', main);
+ //instance.reconfigure();
};