Add language notifications; Add power door handel

This commit is contained in:
rasta5man 2025-05-31 22:03:23 +02:00
parent d97d90cf95
commit 0876e73c68
12 changed files with 445 additions and 581 deletions

View file

@ -55,7 +55,7 @@ let saveTelemetryOnError = true;//backup_on_failure overrides this value
//------------------------
let rollers;
if(createTelemetryBackup) rollers = require('streamroller');
if (createTelemetryBackup) rollers = require('streamroller');
const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304
let insertNoSqlCounter = 0;
@ -70,14 +70,14 @@ let lastRestoreTime = 0;
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
let sendClientError = true;
process.on('uncaughtException', function (err) {
process.on('uncaughtException', function(err) {
errLogger.error('uncaughtException:', err.message)
errLogger.error(err.stack);
//TODO
//send to service
//process.exit(1);
})
@ -96,22 +96,19 @@ exports.install = function(instance) {
let sendWsStatusVar = null;
let wsmqtt_status = 'disconnected';
function getWsmqttName(host)
{
if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
function getWsmqttName(host) {
if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
}
function sendWsStatus()
{
instance.send(SEND_TO.services, {[wsmqttName]: wsmqtt_status});
function sendWsStatus() {
instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status });
}
function main()
{
if(!FLOW.dbLoaded) return;
function main() {
if (!FLOW.dbLoaded) return;
loadSettings();
clearInterval(sendWsStatus);
@ -119,11 +116,9 @@ exports.install = function(instance) {
}
//set opts according to db settings
function loadSettings()
{
function loadSettings() {
if(instance.options.host !== "")
{
if (instance.options.host !== "") {
//override settings from database
var o = instance.options;
opts = {
@ -139,13 +134,12 @@ exports.install = function(instance) {
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
}
else
{
else {
const SETTINGS = FLOW.GLOBALS.settings;
backup_on_failure = SETTINGS.backup_on_failure;
saveTelemetryOnError = backup_on_failure;
restore_from_backup = SETTINGS.restore_from_backup;
restore_backup_wait = SETTINGS.restore_backup_wait;
@ -170,8 +164,7 @@ exports.install = function(instance) {
connectToTbServer();
}
function connectToTbServer()
{
function connectToTbServer() {
var url = "mqtt://" + opts.host + ":" + opts.port;
console.log("MQTT URL: ", url);
@ -179,7 +172,7 @@ exports.install = function(instance) {
client.on('connect', function() {
instance.status("Connected", "green");
monitor.info("MQTT client connected");
//monitor.info("MQTT client connected");
sendClientError = true;
clientReady = true;
@ -198,31 +191,31 @@ exports.install = function(instance) {
TRY(function() {
message = JSON.parse(message);
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
}
}, () => instance.debug('MQTT: Error parsing data', message));
}
instance.send(SEND_TO.rpcCall, {"topic":topic, "content":message });
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message });
});
client.on('close', function() {
clientReady = false;
wsmqtt_status = 'disconnected';
instance.status("Disconnected", "red");
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !"});
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
});
client.on('error', function(err) {
instance.status("Err: "+ err.code, "red");
instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
if(sendClientError) {
monitor.info('MQTT client error', err);
sendClientError = false;
instance.status("Err: " + err.code, "red");
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
if (sendClientError) {
monitor.info('MQTT client error', err);
sendClientError = false;
}
clientReady = false;
wsmqtt_status = 'disconnected';
@ -238,45 +231,39 @@ exports.install = function(instance) {
instance.on('1', function(data) {
if(clientReady)
{
//do we have some data in backup file? if any, process data from database
if(saveTelemetryOnError)
{
if (clientReady) {
//do we have some data in backup file? if any, process data from database
if (saveTelemetryOnError) {
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
if (!processingData) processDataFromDatabase();
}
let stringifiedJson = JSON.stringify(data.data);
client.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 });
//backup telemetry
if(createTelemetryBackup)
{
if (createTelemetryBackup) {
data.data.id = UID();
nosqlBackup.insert(data.data);
insertBackupNoSqlCounter++;
if(insertBackupNoSqlCounter > 150)
{
let options = {compress: true};
if (insertBackupNoSqlCounter > 150) {
let options = { compress: true };
let path = __dirname + "/../databases/backup/tbdata.nosql";
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
stream.write("");
stream.end();
insertBackupNoSqlCounter = 0;
}
}
}
}
else
{
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
}
else {
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
if (saveTelemetryOnError) {
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
@ -289,72 +276,63 @@ exports.install = function(instance) {
instance.close = function(done) {
if(clientReady){
if (clientReady) {
client.end();
clearInterval(sendWsStatusVar);
}
};
function getDbBackupFileCounter(type)
{
function getDbBackupFileCounter(type) {
var files = fs.readdirSync(__dirname + "/../databases");
let counter = 0;
for(var i = 0; i < files.length; i++)
{
for (var i = 0; i < files.length; i++) {
if(files[i] == "tbdata.nosql") continue;
if (files[i] == "tbdata.nosql") continue;
if(files[i].endsWith(".nosql"))
{
if (files[i].endsWith(".nosql")) {
let pos = files[i].indexOf(".");
if(pos > -1)
{
if (pos > -1) {
let fileCounter = counter;
let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit);
if(isNaN(fileCounter)) fileCounter = 0;
if (isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if(type == "max")
{
if(fileCounter > counter)
{
if (type == "max") {
if (fileCounter > counter) {
counter = fileCounter;
}
}
else if(type == "min")
{
if(counter == 0) counter = fileCounter;
else if (type == "min") {
if (counter == 0) counter = fileCounter;
if(fileCounter < counter)
{
if (fileCounter < counter) {
counter = fileCounter;
}
}
}
}
}
if(type == "max") counter++;
if (type == "max") counter++;
return counter;
}
const makeBackupFromDbFile = async () => {
if(!saveTelemetryOnError) return;
if (!saveTelemetryOnError) return;
//to avoid large file: tbdata.nosql
//init value is 0!
if(insertNoSqlCounter > 0)
{
if (insertNoSqlCounter > 0) {
--insertNoSqlCounter;
return;
}
@ -366,8 +344,7 @@ exports.install = function(instance) {
var stats = fs.statSync(source);
var fileSizeInBytes = stats.size;
if(fileSizeInBytes > noSqlFileSizeLimit)
{
if (fileSizeInBytes > noSqlFileSizeLimit) {
let counter = 1;
counter = getDbBackupFileCounter("max");
@ -381,21 +358,20 @@ exports.install = function(instance) {
//clear tbdata.nosql
fs.writeFileSync(source, "");
fs.truncateSync(source, 0);
}
}
const processDataFromDatabase = async () => {
if(restore_from_backup <= 0) return;
if (restore_from_backup <= 0) return;
//calculate diff
const now = new Date();
let currentTime = now.getTime();
let diff = currentTime - lastRestoreTime;
if( (diff / 1000) < restore_backup_wait)
{
if ((diff / 1000) < restore_backup_wait) {
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
return;
}
@ -409,60 +385,55 @@ exports.install = function(instance) {
let dataBase = 'tbdata';
var nosql;
if(counter == 0) dataBase = 'tbdata';
if (counter == 0) dataBase = 'tbdata';
else dataBase = counter + "." + 'tbdata';
nosql = NOSQL(dataBase);
//select all data - use limit restore_from_backup
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
for(let i = 0; i < records.length; i++)
{
if(clientReady) {
for (let i = 0; i < records.length; i++) {
if (clientReady) {
let item = records[i];
let id = item.id;
if(id !== undefined)
{
if (id !== undefined) {
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
try {
let message = JSON.parse(JSON.stringify(item));
delete message.id;
client.publish("v1/gateway/telemetry", JSON.stringify(message), {qos:1});
client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 });
//remove from database
await promisifyBuilder(nosql.remove().where("id", id));
} catch(error) {
} catch (error) {
//process error
console.log("processDataFromDatabase", error);
}
}
}
else
{
else {
processingData = false;
return;
}
}
if(records.length > 0)
{
if (records.length > 0) {
//clean backup file
if(counter > 0) nosql.clean();
if (counter > 0) nosql.clean();
}
//no data in db, remove
if(records.length == 0)
{
if(counter > 0) nosql.drop();
if (records.length == 0) {
if (counter > 0) nosql.drop();
}
const d = new Date();