From ed0fe5b15d48a948d6ef3256495e56b59900c247 Mon Sep 17 00:00:00 2001 From: rasta5man Date: Wed, 2 Oct 2024 18:21:31 +0200 Subject: [PATCH] ReportOfflineNodeStatus-add setTimeout, Handle broadcast cmd to rsPort --- flow/cmd_manager.js | 55 ++++++------ flow/dido_controller.js | 2 +- flow/helper/DataToTbHandler.js | 6 -- flow/helper/serialport_helper.js | 139 ++++++++++++++++--------------- 4 files changed, 101 insertions(+), 101 deletions(-) diff --git a/flow/cmd_manager.js b/flow/cmd_manager.js index 43ffbcf..63c1a1f 100644 --- a/flow/cmd_manager.js +++ b/flow/cmd_manager.js @@ -8,7 +8,6 @@ exports.output = ['red', 'blue', 'yellow', 'blue', 'white']; //blue - send message to relays exports.input = 2; -exports.author = 'Daniel Segeš'; exports.icon = 'cloud-upload'; //exports.npm = ['serialport' , 'child_process']; @@ -140,7 +139,6 @@ let nodesData = {};//key is node, value data from db //helper container for counting resolved group of commands (commands related to set profile) let cmdCounter = {};//key is node, value is counter -let cmdNOKNodeCounter = {};//key is node, value is counter //END OF VARIABLE SETTINGS //-------------------------------- @@ -275,16 +273,14 @@ function processNodeProfile(node) return; } - logger.debug("processNodeProfile: start - set profile for ", node, profile); - let nodeProfile = nodeObj.profile; + logger.debug("processNodeProfile: start - set profile for ", node, nodeProfile); if(nodeProfile) { try { nodeProfile = JSON.parse(nodeProfile); - if(Object.keys(nodeProfile).length === 0) throw ("profile is not defined"); } catch (error) { - logger.debug("Error parsing node profile", error); + logger.debug("Cmd_manager - Error parsing node profile", error); } } @@ -894,29 +890,33 @@ exports.install = function(instance) { values["current"] = 0;//prúd values["status"] = "OFFLINE"; - for (let k in nodesData) { + // it happens, that some data did not get to tb after sending + // we setTimeout to make more time for db to process telemetry (eg 150 messages at once) + Object.keys(nodesData).forEach((node, index) => { + + setTimeout(function() { - //potrebujem nody k danej linii - if(line == nodesData[k].line || line == undefined) - { - let tbname = nodesData[k].tbname; - - //logger.debug("node:", tbname); - - let dataToTb = { - [tbname]: [ - { - "ts": Date.now(), - "values": values + //potrebujem nody k danej linii + if(line == nodesData[node].line || line == undefined) + { + let tbname = nodesData[node].tbname; + + let dataToTb = { + [tbname]: [ + { + "ts": Date.now(), + "values": values + } + ] } - ] - } - //instance.send(SEND_TO.tb, dataToTb); - tbHandler.sendToTb(dataToTb, instance); - } - } + //instance.send(SEND_TO.tb, dataToTb); + tbHandler.sendToTb(dataToTb, instance); + } + },(index+1) * 300); + }) + } @@ -1784,7 +1784,6 @@ exports.install = function(instance) { //toto sa reportuje po prijati dat z dido_controlera if(disconnected) { - let values = {"status": "OFFLINE"}; logger.debug("disconnected", values); @@ -1941,7 +1940,7 @@ exports.install = function(instance) { let resp = com_generic(nodeAddress, params.recipient, params.rw, register, params.name, params.byte1, params.byte2, params.byte3, params.byte4); let readBytes = 11; - let timeout = 5000; + let timeout = 4000; // await keyword is important, otherwise incorrect data is returned! await writeData(rsPort, resp, readBytes, timeout).then(function (data) { @@ -2170,6 +2169,7 @@ exports.install = function(instance) { } + /** * function handles requests from terminal * responseType can be "SUCCESS", "ERROR" or "FAILURE", depending on rsPort data. @@ -2401,7 +2401,6 @@ exports.install = function(instance) { if(flowdata.data.hasOwnProperty("cmd")) { let cmd = flowdata.data.cmd; - if(cmd == "buildTasks") { diff --git a/flow/dido_controller.js b/flow/dido_controller.js index 1e8010e..24f3269 100644 --- a/flow/dido_controller.js +++ b/flow/dido_controller.js @@ -235,7 +235,7 @@ exports.install = function(instance) { let responseStatus = await promisifyBuilder(dbStatus.find()); statusData = responseStatus[0]; // {thermometer: 'OK', em: 'OK', twilight_sensor: 'OK'} deviceStatus["temperature"] = statusData.thermometer; - + FLOW.OMS_rvo_tbname = relaysData[0].tbname; if(controller_type === "lm") diff --git a/flow/helper/DataToTbHandler.js b/flow/helper/DataToTbHandler.js index 6f532a3..f60c296 100644 --- a/flow/helper/DataToTbHandler.js +++ b/flow/helper/DataToTbHandler.js @@ -32,12 +32,6 @@ class DataToTbHandler { sendToTb(dataToTb, instance) { - if(!FLOW.OMS_brokerready) - { - instance.send(this.index, dataToTb); - return; - } - let keys = Object.keys(dataToTb); if(keys.length == 0) diff --git a/flow/helper/serialport_helper.js b/flow/helper/serialport_helper.js index a84ab84..98aa235 100644 --- a/flow/helper/serialport_helper.js +++ b/flow/helper/serialport_helper.js @@ -1,94 +1,101 @@ const { exec } = require('child_process'); function openPort(port){ - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { - var callbackError = function(err) { - port.removeListener('error', callbackError); - port.removeListener('open', callbackError); + var callbackError = function(err) { + port.removeListener('error', callbackError); + port.removeListener('open', callbackError); - reject(err.message); - }; + reject(err.message); + }; - var callbackOpen = function(data) { - port.removeListener('error', callbackError); - port.removeListener('open', callbackOpen); + var callbackOpen = function(data) { + port.removeListener('error', callbackError); + port.removeListener('open', callbackOpen); - resolve("port open: ok"); - }; + resolve("port open: ok"); + }; - port.on('error', callbackError); - port.on('open', callbackOpen); + port.on('error', callbackError); + port.on('open', callbackOpen); - port.open(); + port.open(); - }) - } + }) +} - function runSyncExec(command){ - return new Promise((resolve, reject) => { - - exec(command, (error, stdout, stderr) => { - if(error == null) resolve(stdout); - reject(error); - }); - - }) - } +function runSyncExec(command){ + return new Promise((resolve, reject) => { - async function writeData(port, data, readbytes, timeout){ - return new Promise((resolve, reject) => { + exec(command, (error, stdout, stderr) => { + if(error == null) resolve(stdout); + reject(error); + }); - //readbytes = 0 = broadcast - if(readbytes == undefined) readbytes = 0; - if(timeout == undefined) timeout = 10000;//10s, default timeout MASTERA je 3s + }) +} - //cmd-manager mame http route POST / terminal a tomu sa tiez nastavuje timeout!!! - - var callback = function(data) { - rsPortReceivedData.push(...data); - let l = rsPortReceivedData.length; +async function writeData(port, data, readbytes, timeout){ + return new Promise((resolve, reject) => { - if(l >= readbytes) - { - port.removeListener('data', callback); - - clearTimeout(t); - resolve(rsPortReceivedData); - } - }; - - port.removeListener('data', callback); - - let t = setTimeout(() => { - port.removeListener('data', callback); - - console.log("serialport helper: writeData TIMEOUT READING", rsPortReceivedData); - - reject("TIMEOUT READING"); - }, timeout); - - let rsPortReceivedData = []; - - if(readbytes > 0) port.on('data', callback); + // If first item in data array is 255, we just write broadcast command to rsPort + // We wait 3 seconds and resolve([ "b", "r", "o", "a", "d", "c", "a", "s", "t" ]) + // It is important to resolve with array + if(data[0] == 255) { port.write(Buffer.from(data), function(err) { if (err) { port.removeListener('data', callback); reject(err.message); } - - if(readbytes == 0) - { - resolve(rsPortReceivedData); - } - }); - }) + + setTimeout(resolve, 3000, [ "b", "r", "o", "a", "d", "c", "a", "s", "t" ]); + return; + } + + //cmd-manager mame http route POST / terminal a tomu sa tiez nastavuje timeout!!! + + var callback = function(data) { + rsPortReceivedData.push(...data); + let l = rsPortReceivedData.length; + + if(l >= readbytes) + { + port.removeListener('data', callback); + + clearTimeout(t); + resolve(rsPortReceivedData); + } + }; + + port.removeListener('data', callback); + + let t = setTimeout(() => { + port.removeListener('data', callback); + + console.log("serialport helper: writeData TIMEOUT READING", rsPortReceivedData); + + reject("TIMEOUT READING"); + }, timeout); + + let rsPortReceivedData = []; + + port.on('data', callback); + + port.write(Buffer.from(data), function(err) { + if (err) { + port.removeListener('data', callback); + reject(err.message); + } + + }); + }) } module.exports = { openPort, runSyncExec, writeData -} \ No newline at end of file +}