Actual flowserver on Senica rvo

This commit is contained in:
rasta5man 2024-04-13 20:29:31 +02:00
parent 67c503d980
commit 86619fbcff
29 changed files with 5833 additions and 9853 deletions

View file

@ -119,6 +119,7 @@ process.on('uncaughtException', function (err) {
const nosql = NOSQL('tbdata');
const nosqlBackup = NOSQL('/backup/tbdata');
exports.install = function(instance) {
var broker;
@ -186,7 +187,7 @@ exports.install = function(instance) {
let mqtt_clientid = responseSettings[0]["mqtt_clientid"];
let mqtt_username = responseSettings[0]["mqtt_username"];
let mqtt_port = responseSettings[0]["mqtt_port"];
console.log("wsmqttpublich -> loadSettings from db", responseSettings[0]);
opts = {
@ -203,8 +204,7 @@ exports.install = function(instance) {
}
connectToTbServer();
}
}
function connectToTbServer()
{
@ -215,16 +215,18 @@ exports.install = function(instance) {
broker.on('connect', function() {
instance.status("Connected", "green");
brokerready = true;
monitor.info("MQTT broker connected");
brokerready = true;
FLOW.OMS_brokerready = brokerready;
wsmqtt_status = 'connected';
wsmqtt_status = 'connected';
});
broker.on('reconnect', function() {
instance.status("Reconnecting", "yellow");
brokerready = false;
FLOW.OMS_brokerready = brokerready;
FLOW.OMS_brokerready = brokerready;
});
broker.on('message', function(topic, message) {
@ -243,7 +245,6 @@ exports.install = function(instance) {
}
instance.send(instanceSendTo.rpcCall, {"topic":topic, "content":message });
});
broker.on('close', function(err) {
@ -258,11 +259,14 @@ exports.install = function(instance) {
instance.status("Disconnected", "red");
instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
}
broker.reconnect();
});
broker.on('error', function(err) {
instance.status("Err: "+ err.code, "red");
instance.send(instanceSendTo.debug, {"message":"Broker ERROR signal received !", "error":err, "opt":opts });
monitor.info('MQTT broker error', err);
brokerready = false;
FLOW.OMS_brokerready = brokerready;
@ -270,89 +274,66 @@ exports.install = function(instance) {
});
//broker = new Broker(opts);
//MQTT_BROKERS.push(broker);
//instance.status('Ready');
}
//set opts accortding to options
/*
instance.reconfigure = function() {
instance.on('data', function(data) {
var o = instance.options;
opts = {
host: o.host,
port: o.port,
keepalive: 10,
clientId: o.clientid,
username: o.username,
rejectUnauthorized: false,
resubscribe: false
};
//connectToTbServer();
};
*/
instance.on('data', function(data) {
if (brokerready)
if (brokerready)
{
//do we have some data in backup file?
//if any, process data from database
if(saveTelemetryOnError)
{
//do we have some data in backup file?
//if any, process data from database
if(saveTelemetryOnError)
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
}
}
if (brokerready)
{
let stringifiedJson = JSON.stringify(data.data);
broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
//backup telemetry
if(createTelemetryBackup)
{
data.data.id = UID();
nosqlBackup.insert(data.data);
insertBackupNoSqlCounter++;
if(insertBackupNoSqlCounter > 150)
{
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
}
let options = {compress: true};
let path = __dirname + "/../databases/backup/tbdata.nosql";
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
stream.write("");
stream.end();
insertBackupNoSqlCounter = 0;
}
}
}
else
{
if(logger) logger.debug("Broker unavailable. Data not sent !", data.data);
instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
//write to tb
data.data.id = UID();
nosql.insert(data.data);
}
if (brokerready)
{
let stringifiedJson = JSON.stringify(data.data);
broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
}
});
//backup telemetry
if(createTelemetryBackup)
{
data.data.id = UID();
nosqlBackup.insert(data.data);
insertBackupNoSqlCounter++;
if(insertBackupNoSqlCounter > 150)
{
let options = {compress: true};
let path = __dirname + "/../databases/backup/tbdata.nosql";
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
stream.write("");
stream.end();
insertBackupNoSqlCounter = 0;
}
}
}
else
{
if(logger) logger.debug("Broker unavailable. Data not sent !", data.data);
instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
//write to tb
data.data.id = UID();
nosql.insert(data.data);
}
}
});
instance.close = function(done) {
if (brokerready){
@ -449,7 +430,6 @@ exports.install = function(instance) {
fs.truncateSync(source, 0);
}
}
const processDataFromDatabase = async () => {