Initial commit - testing version
This commit is contained in:
parent
cf16481324
commit
06b289d7a3
11 changed files with 6636 additions and 3819 deletions
|
|
@ -9,10 +9,10 @@ exports.output = 2;
|
||||||
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
|
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
|
||||||
|
|
||||||
exports.html = `<div class="padding">
|
exports.html = `<div class="padding">
|
||||||
<div class="row">
|
<div class="row">
|
||||||
<div class="col-md-6">
|
<div class="col-md-6">
|
||||||
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
||||||
</div>
|
</div>
|
||||||
<div class="col-md-6">
|
<div class="col-md-6">
|
||||||
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
@ -27,16 +27,14 @@ exports.html = `<div class="padding">
|
||||||
</div>
|
</div>
|
||||||
</div>`;
|
</div>`;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
const { promisifyBuilder } = require('./helper/db_helper');
|
const { promisifyBuilder } = require('./helper/db_helper');
|
||||||
const fs = require('fs');
|
const fs = require('fs');
|
||||||
const mqtt = require('mqtt');
|
const mqtt = require('mqtt');
|
||||||
const nosql = NOSQL('tbdatacloud');
|
const nosql = NOSQL('tbdatacloud');
|
||||||
|
|
||||||
const SEND_TO = {
|
const SEND_TO = {
|
||||||
debug: 0,
|
debug: 0,
|
||||||
rpcCall: 1,
|
rpcCall: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
//CONFIG
|
//CONFIG
|
||||||
|
|
@ -56,319 +54,295 @@ let lastRestoreTime = 0;
|
||||||
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
|
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
|
||||||
let sendClientError = true;
|
let sendClientError = true;
|
||||||
|
|
||||||
|
|
||||||
exports.install = function(instance) {
|
exports.install = function(instance) {
|
||||||
|
|
||||||
var client;
|
var client;
|
||||||
var opts;
|
var opts;
|
||||||
var clientReady = false;
|
var clientReady = false;
|
||||||
|
|
||||||
let o = null; //options
|
let o = null; //options
|
||||||
|
|
||||||
function main()
|
function main() {
|
||||||
{
|
loadSettings();
|
||||||
loadSettings();
|
}
|
||||||
}
|
|
||||||
|
//set opts according to db settings
|
||||||
//set opts according to db settings
|
function loadSettings() {
|
||||||
function loadSettings()
|
|
||||||
{
|
o = instance.options;
|
||||||
|
if (!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic;
|
||||||
o = instance.options;
|
|
||||||
if(!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic;
|
opts = {
|
||||||
|
host: o.host,
|
||||||
opts = {
|
port: o.port,
|
||||||
host: o.host,
|
clientId: o.clientid,
|
||||||
port: o.port,
|
username: o.username,
|
||||||
clientId: o.clientid,
|
rejectUnauthorized: false,
|
||||||
username: o.username,
|
resubscribe: false
|
||||||
rejectUnauthorized: false,
|
};
|
||||||
resubscribe: false
|
|
||||||
};
|
console.log("wsmqttpublich -> loadSettings from instance.options", o);
|
||||||
|
|
||||||
console.log("wsmqttpublich -> loadSettings from instance.options",o);
|
connectToTbServer();
|
||||||
|
}
|
||||||
connectToTbServer();
|
|
||||||
}
|
function connectToTbServer() {
|
||||||
|
var url = "mqtt://" + opts.host + ":" + opts.port;
|
||||||
function connectToTbServer()
|
console.log("MQTT URL: ", url);
|
||||||
{
|
|
||||||
var url = "mqtt://" + opts.host + ":" + opts.port;
|
client = mqtt.connect(url, opts);
|
||||||
console.log("MQTT URL: ", url);
|
|
||||||
|
client.on('connect', function() {
|
||||||
client = mqtt.connect(url, opts);
|
client.subscribe(`${o.topic}_backward`, (err) => {
|
||||||
|
if (!err) {
|
||||||
client.on('connect', function() {
|
console.log("MQTT subscribed");
|
||||||
client.subscribe(`${o.topic}_backward`, (err) => {
|
}
|
||||||
if (!err) {
|
});
|
||||||
console.log("MQTT subscribed");
|
instance.status("Connected", "green");
|
||||||
}
|
clientReady = true;
|
||||||
});
|
sendClientError = true;
|
||||||
instance.status("Connected", "green");
|
});
|
||||||
clientReady = true;
|
|
||||||
sendClientError = true;
|
client.on('reconnect', function() {
|
||||||
});
|
instance.status("Reconnecting", "yellow");
|
||||||
|
clientReady = false;
|
||||||
client.on('reconnect', function() {
|
});
|
||||||
instance.status("Reconnecting", "yellow");
|
|
||||||
clientReady = false;
|
client.on('message', function(topic, message) {
|
||||||
});
|
// message is type of buffer
|
||||||
|
message = message.toString();
|
||||||
client.on('message', function(topic, message) {
|
if (message[0] === '{') {
|
||||||
// message is type of buffer
|
|
||||||
message = message.toString();
|
|
||||||
if (message[0] === '{') {
|
try {
|
||||||
TRY(function() {
|
message = JSON.parse(message);
|
||||||
|
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
||||||
message = JSON.parse(message);
|
client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
|
||||||
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
|
||||||
client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
|
}
|
||||||
instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
|
} catch (e) { instance.debug('MQTT: Error parsing data', message) }
|
||||||
}
|
|
||||||
|
instance.send(SEND_TO.rpcCall, { "topic": o.topic, "content": message });
|
||||||
}, () => instance.debug('MQTT: Error parsing data', message));
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
instance.send(SEND_TO.rpcCall, {"topic":o.topic, "content":message });
|
client.on('close', function() {
|
||||||
});
|
clientReady = false;
|
||||||
|
|
||||||
client.on('close', function() {
|
instance.status("Disconnected", "red");
|
||||||
clientReady = false;
|
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
|
||||||
|
});
|
||||||
instance.status("Disconnected", "red");
|
|
||||||
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !"});
|
|
||||||
});
|
|
||||||
|
|
||||||
client.on('error', function(err) {
|
|
||||||
instance.status("Err: "+ err.code, "red");
|
|
||||||
instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
|
|
||||||
if(sendClientError) {
|
|
||||||
console.log('MQTT client error', err);
|
|
||||||
sendClientError = false;
|
|
||||||
}
|
|
||||||
clientReady = false;
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
instance.on('0', function(data) {
|
|
||||||
|
|
||||||
if(clientReady)
|
|
||||||
{
|
|
||||||
//do we have some data in backup file? if any, process data from database
|
|
||||||
if(saveTelemetryOnError)
|
|
||||||
{
|
|
||||||
//read telemetry data and send back to server
|
|
||||||
if(!processingData) processDataFromDatabase();
|
|
||||||
}
|
|
||||||
|
|
||||||
let stringifiedJson = JSON.stringify(data.data)
|
|
||||||
client.publish(`${o.topic}_forward`, stringifiedJson, {qos: 1});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
|
||||||
instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
|
|
||||||
|
|
||||||
if(saveTelemetryOnError)
|
|
||||||
{
|
|
||||||
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
|
||||||
makeBackupFromDbFile();
|
|
||||||
|
|
||||||
//write to tb
|
|
||||||
data.data.id = UID();
|
|
||||||
nosql.insert(data.data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
instance.on("1", _ => {
|
|
||||||
main();
|
|
||||||
})
|
|
||||||
|
|
||||||
instance.close = function(done) {
|
client.on('error', function(err) {
|
||||||
if(clientReady){
|
instance.status("Err: " + err.code, "red");
|
||||||
client.end();
|
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
|
||||||
}
|
if (sendClientError) {
|
||||||
};
|
console.log('MQTT client error', err);
|
||||||
|
sendClientError = false;
|
||||||
|
}
|
||||||
function getDbBackupFileCounter(type)
|
clientReady = false;
|
||||||
{
|
});
|
||||||
var files = fs.readdirSync(__dirname + "/../databases");
|
|
||||||
|
|
||||||
let counter = 0;
|
|
||||||
for(var i = 0; i < files.length; i++)
|
|
||||||
{
|
|
||||||
|
|
||||||
if(files[i] == "tbdatacloud.nosql") continue;
|
|
||||||
|
|
||||||
if(files[i].endsWith(".nosql"))
|
}
|
||||||
{
|
|
||||||
|
|
||||||
let pos = files[i].indexOf(".");
|
|
||||||
if(pos > -1)
|
|
||||||
{
|
|
||||||
|
|
||||||
let fileCounter = counter;
|
instance.on('0', function(data) {
|
||||||
let firstDigit = files[i].slice(0, pos);
|
|
||||||
|
|
||||||
fileCounter = parseInt(firstDigit);
|
if (clientReady) {
|
||||||
if(isNaN(fileCounter)) fileCounter = 0;
|
//do we have some data in backup file? if any, process data from database
|
||||||
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
if (saveTelemetryOnError) {
|
||||||
|
//read telemetry data and send back to server
|
||||||
|
if (!processingData) processDataFromDatabase();
|
||||||
|
}
|
||||||
|
|
||||||
if(type == "max")
|
let stringifiedJson = JSON.stringify(data.data)
|
||||||
{
|
client.publish(`${o.topic}_forward`, stringifiedJson, { qos: 1 });
|
||||||
if(fileCounter > counter)
|
}
|
||||||
{
|
else {
|
||||||
counter = fileCounter;
|
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
||||||
}
|
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
|
||||||
}
|
|
||||||
else if(type == "min")
|
|
||||||
{
|
|
||||||
if(counter == 0) counter = fileCounter;
|
|
||||||
|
|
||||||
if(fileCounter < counter)
|
if (saveTelemetryOnError) {
|
||||||
{
|
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
||||||
counter = fileCounter;
|
makeBackupFromDbFile();
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
//write to tb
|
||||||
|
data.data.id = UID();
|
||||||
|
nosql.insert(data.data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if(type == "max") counter++;
|
instance.on("1", _ => {
|
||||||
|
main();
|
||||||
|
})
|
||||||
|
|
||||||
return counter;
|
instance.close = function(done) {
|
||||||
}
|
if (clientReady) {
|
||||||
|
client.end();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const makeBackupFromDbFile = async () => {
|
|
||||||
|
|
||||||
if(!saveTelemetryOnError) return;
|
function getDbBackupFileCounter(type) {
|
||||||
|
var files = fs.readdirSync(__dirname + "/../databases");
|
||||||
|
|
||||||
//to avoid large file: tbdata.nosql
|
let counter = 0;
|
||||||
|
for (var i = 0; i < files.length; i++) {
|
||||||
|
|
||||||
//init value is 0!
|
if (files[i] == "tbdatacloud.nosql") continue;
|
||||||
if(insertNoSqlCounter > 0)
|
|
||||||
{
|
|
||||||
--insertNoSqlCounter;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
insertNoSqlCounter = 100;
|
if (files[i].endsWith(".nosql")) {
|
||||||
|
|
||||||
let source = __dirname + "/../databases/tbdatacloud.nosql";
|
let pos = files[i].indexOf(".");
|
||||||
|
if (pos > -1) {
|
||||||
|
|
||||||
var stats = fs.statSync(source);
|
let fileCounter = counter;
|
||||||
var fileSizeInBytes = stats.size;
|
let firstDigit = files[i].slice(0, pos);
|
||||||
|
|
||||||
if(fileSizeInBytes > noSqlFileSizeLimit)
|
fileCounter = parseInt(firstDigit);
|
||||||
{
|
if (isNaN(fileCounter)) fileCounter = 0;
|
||||||
|
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
||||||
|
|
||||||
let counter = 1;
|
if (type == "max") {
|
||||||
counter = getDbBackupFileCounter("max");
|
if (fileCounter > counter) {
|
||||||
|
counter = fileCounter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (type == "min") {
|
||||||
|
if (counter == 0) counter = fileCounter;
|
||||||
|
|
||||||
let destination = __dirname + "/../databases/" + counter + "." + "tbdatacloud.nosql";
|
if (fileCounter < counter) {
|
||||||
|
counter = fileCounter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//make backup file
|
}
|
||||||
fs.copyFileSync(source, destination);
|
|
||||||
//fs.renameSync(p, p + "." + counter);
|
|
||||||
|
|
||||||
//clear tbdata.nosql
|
if (type == "max") counter++;
|
||||||
fs.writeFileSync(source, "");
|
|
||||||
fs.truncateSync(source, 0);
|
|
||||||
|
|
||||||
}
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
const processDataFromDatabase = async () => {
|
const makeBackupFromDbFile = async () => {
|
||||||
|
|
||||||
if(restore_from_backup <= 0) return;
|
if (!saveTelemetryOnError) return;
|
||||||
|
|
||||||
//calculate diff
|
//to avoid large file: tbdata.nosql
|
||||||
const now = new Date();
|
|
||||||
let currentTime = now.getTime();
|
|
||||||
let diff = currentTime - lastRestoreTime;
|
|
||||||
|
|
||||||
if( (diff / 1000) < restore_backup_wait)
|
//init value is 0!
|
||||||
{
|
if (insertNoSqlCounter > 0) {
|
||||||
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
--insertNoSqlCounter;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
processingData = true;
|
insertNoSqlCounter = 100;
|
||||||
|
|
||||||
//get filename to process
|
let source = __dirname + "/../databases/tbdatacloud.nosql";
|
||||||
let counter = getDbBackupFileCounter("min");
|
|
||||||
|
|
||||||
//we have some backup files
|
var stats = fs.statSync(source);
|
||||||
let dataBase = 'tbdata';
|
var fileSizeInBytes = stats.size;
|
||||||
|
|
||||||
var nosql;
|
if (fileSizeInBytes > noSqlFileSizeLimit) {
|
||||||
if(counter == 0) dataBase = 'tbdatacloud';
|
|
||||||
else dataBase = counter + "." + 'tbdatacloud';
|
|
||||||
|
|
||||||
nosql = NOSQL(dataBase);
|
let counter = 1;
|
||||||
|
counter = getDbBackupFileCounter("max");
|
||||||
|
|
||||||
//select all data - use limit restore_from_backup
|
let destination = __dirname + "/../databases/" + counter + "." + "tbdatacloud.nosql";
|
||||||
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
|
||||||
|
|
||||||
for(let i = 0; i < records.length; i++)
|
//make backup file
|
||||||
{
|
fs.copyFileSync(source, destination);
|
||||||
if(clientReady) {
|
//fs.renameSync(p, p + "." + counter);
|
||||||
|
|
||||||
let item = records[i];
|
//clear tbdata.nosql
|
||||||
let id = item.id;
|
fs.writeFileSync(source, "");
|
||||||
|
fs.truncateSync(source, 0);
|
||||||
|
|
||||||
if(id !== undefined)
|
}
|
||||||
{
|
}
|
||||||
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
|
||||||
|
|
||||||
try {
|
const processDataFromDatabase = async () => {
|
||||||
|
|
||||||
let message = JSON.parse(JSON.stringify(item));
|
if (restore_from_backup <= 0) return;
|
||||||
delete message.id;
|
|
||||||
client.publish(`${o.topic}_forward`, JSON.stringify(message), {qos:1});
|
|
||||||
|
|
||||||
//remove from database
|
//calculate diff
|
||||||
await promisifyBuilder(nosql.remove().where("id", id));
|
const now = new Date();
|
||||||
|
let currentTime = now.getTime();
|
||||||
|
let diff = currentTime - lastRestoreTime;
|
||||||
|
|
||||||
} catch(error) {
|
if ((diff / 1000) < restore_backup_wait) {
|
||||||
//process error
|
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
||||||
console.log("processDataFromDatabase", error);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
processingData = true;
|
||||||
|
|
||||||
}
|
//get filename to process
|
||||||
else
|
let counter = getDbBackupFileCounter("min");
|
||||||
{
|
|
||||||
processingData = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(records.length > 0)
|
//we have some backup files
|
||||||
{
|
let dataBase = 'tbdata';
|
||||||
//clean backup file
|
|
||||||
if(counter > 0) nosql.clean();
|
|
||||||
}
|
|
||||||
|
|
||||||
//no data in db, remove
|
var nosql;
|
||||||
if(records.length == 0)
|
if (counter == 0) dataBase = 'tbdatacloud';
|
||||||
{
|
else dataBase = counter + "." + 'tbdatacloud';
|
||||||
if(counter > 0) nosql.drop();
|
|
||||||
}
|
|
||||||
|
|
||||||
const d = new Date();
|
nosql = NOSQL(dataBase);
|
||||||
lastRestoreTime = d.getTime();
|
|
||||||
|
|
||||||
processingData = false;
|
//select all data - use limit restore_from_backup
|
||||||
|
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
||||||
|
|
||||||
}
|
for (let i = 0; i < records.length; i++) {
|
||||||
|
if (clientReady) {
|
||||||
|
|
||||||
instance.on('options', main);
|
let item = records[i];
|
||||||
|
let id = item.id;
|
||||||
|
|
||||||
|
if (id !== undefined) {
|
||||||
|
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
let message = JSON.parse(JSON.stringify(item));
|
||||||
|
delete message.id;
|
||||||
|
client.publish(`${o.topic}_forward`, JSON.stringify(message), { qos: 1 });
|
||||||
|
|
||||||
|
//remove from database
|
||||||
|
await promisifyBuilder(nosql.remove().where("id", id));
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
//process error
|
||||||
|
console.log("processDataFromDatabase", error);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
processingData = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (records.length > 0) {
|
||||||
|
//clean backup file
|
||||||
|
if (counter > 0) nosql.clean();
|
||||||
|
}
|
||||||
|
|
||||||
|
//no data in db, remove
|
||||||
|
if (records.length == 0) {
|
||||||
|
if (counter > 0) nosql.drop();
|
||||||
|
}
|
||||||
|
|
||||||
|
const d = new Date();
|
||||||
|
lastRestoreTime = d.getTime();
|
||||||
|
|
||||||
|
processingData = false;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.on('options', main);
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
|
||||||
5135
flow/cmd_manager.js
5135
flow/cmd_manager.js
File diff suppressed because it is too large
Load diff
|
|
@ -6,14 +6,33 @@ exports.version = '1.0.2';
|
||||||
exports.icon = 'sign-out';
|
exports.icon = 'sign-out';
|
||||||
exports.output = 2;
|
exports.output = 2;
|
||||||
|
|
||||||
|
exports.html = `<div class="padding">
|
||||||
|
<div class="row">
|
||||||
|
<div class="col-md-6">
|
||||||
|
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
||||||
|
</div>
|
||||||
|
<div class="col-md-6">
|
||||||
|
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div class="row">
|
||||||
|
<div class="col-md-6">
|
||||||
|
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
|
||||||
|
</div>
|
||||||
|
<div class="col-md-6">
|
||||||
|
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>`;
|
||||||
|
|
||||||
|
|
||||||
exports.readme = `
|
exports.readme = `
|
||||||
# DB initialization
|
# DB initialization
|
||||||
`;
|
`;
|
||||||
|
|
||||||
const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
|
const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
|
||||||
const { initNotification } = require('./helper/notification_reporter');
|
const { initNotification } = require('./helper/notification_reporter');
|
||||||
const errorHandler = require('./helper/ErrorToServiceHandler');
|
const errorHandler = require('./helper/ErrorToServiceHandler');
|
||||||
const total_energy = require('../databases/total_energy');
|
|
||||||
|
|
||||||
const SEND_TO = {
|
const SEND_TO = {
|
||||||
db_init: 0,
|
db_init: 0,
|
||||||
|
|
@ -22,6 +41,7 @@ const SEND_TO = {
|
||||||
|
|
||||||
|
|
||||||
exports.install = async function(instance) {
|
exports.install = async function(instance) {
|
||||||
|
|
||||||
const dbNodes = TABLE("nodes");
|
const dbNodes = TABLE("nodes");
|
||||||
const dbRelays = TABLE("relays");
|
const dbRelays = TABLE("relays");
|
||||||
const dbSettings = TABLE("settings");
|
const dbSettings = TABLE("settings");
|
||||||
|
|
@ -50,7 +70,7 @@ exports.install = async function(instance) {
|
||||||
Object.keys(dbs.nodesData).forEach(node => dbs.nodesData[node].readout = {})
|
Object.keys(dbs.nodesData).forEach(node => dbs.nodesData[node].readout = {})
|
||||||
|
|
||||||
dbs.settings = {
|
dbs.settings = {
|
||||||
edge_fw_version: "2025-08-08", //rok-mesiac-den
|
edge_fw_version: "2025-04-24", //rok-mesiac-den
|
||||||
language: responseSettings[0]["lang"],
|
language: responseSettings[0]["lang"],
|
||||||
rvo_name: responseSettings[0]["rvo_name"],
|
rvo_name: responseSettings[0]["rvo_name"],
|
||||||
project_id: responseSettings[0]["project_id"],
|
project_id: responseSettings[0]["project_id"],
|
||||||
|
|
@ -78,11 +98,6 @@ exports.install = async function(instance) {
|
||||||
maintenance_mode: false,
|
maintenance_mode: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let rvo_number = responseSettings[0]["rvo_name"].match(/\D+(\d{1,2})_/)[1];
|
|
||||||
dbs.settings.energy_to_switch_lamps = total_energy[rvo_number];
|
|
||||||
if (dbs.settings.energy_to_switch_lamps === undefined) console.log('=============== db_init.js: energy_to_switch_lamps is undefined');
|
|
||||||
|
|
||||||
FLOW.dbLoaded = true;
|
FLOW.dbLoaded = true;
|
||||||
errorHandler.setProjectId(dbs.settings.project_id);
|
errorHandler.setProjectId(dbs.settings.project_id);
|
||||||
initNotification();
|
initNotification();
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
2775
flow/designer.json_orig
Normal file
2775
flow/designer.json_orig
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -364,7 +364,7 @@ exports.install = function(instance) {
|
||||||
data.map(item => {
|
data.map(item => {
|
||||||
|
|
||||||
let value = item['value'];
|
let value = item['value'];
|
||||||
let pin = item["dev"] + item["circuit"]; // for example "relay1_03" or "input1_01"
|
let pin = item["dev"] + item["circuit"]; // for example "ro1_03" or "di1_01"
|
||||||
|
|
||||||
if (pin == undefined) return;
|
if (pin == undefined) return;
|
||||||
switchLogic(pin, value);
|
switchLogic(pin, value);
|
||||||
|
|
@ -516,9 +516,9 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (ws) {
|
else if (ws) {
|
||||||
//pin = "relay1_03" or "input1_01" ... we must make just "1_01" with slice method
|
//pin = "ro1_03" or "di1_01" ... we must make just "1_01" with slice method
|
||||||
monitor.info(`Dido: turnLine ${onOrOff} - (line, pin, force)`, line, pin, force, info);
|
monitor.info(`Dido: turnLine ${onOrOff} - (line, pin, force)`, line, pin, force, info);
|
||||||
let cmd = { "cmd": "set", "dev": "relay", "circuit": pin.slice(5), "value": value };
|
let cmd = { "cmd": "set", "dev": "relay", "circuit": pin.slice(2), "value": value };
|
||||||
ws.send(JSON.stringify(cmd));
|
ws.send(JSON.stringify(cmd));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -754,9 +754,9 @@ exports.install = function(instance) {
|
||||||
pins = [4, 6];
|
pins = [4, 6];
|
||||||
}
|
}
|
||||||
} else if (controllerType === "unipi") {
|
} else if (controllerType === "unipi") {
|
||||||
pins = ["input1_01", "input1_04", "input1_05"];
|
pins = ["di1_01", "di1_04", "di1_05"];
|
||||||
if (hasMainSwitch === 1) {
|
if (hasMainSwitch === 1) {
|
||||||
pins = ["input1_01", "input1_04"];
|
pins = ["di1_01", "di1_04"];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -781,7 +781,7 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
for (const pinIndex of pinIndexes) {
|
for (const pinIndex of pinIndexes) {
|
||||||
if (previousValues[pinIndex] === 0) {
|
if (previousValues[pinIndex] === 0) {
|
||||||
if ((pinIndex === 6 || pinIndex === 'input1_01' || pinIndex === 'input1_05') && SETTINGS.maintenance_mode) continue;
|
if ((pinIndex === 6 || pinIndex === 'di1_01' || pinIndex === 'di1_05') && SETTINGS.maintenance_mode) continue;
|
||||||
status = "NOK";
|
status = "NOK";
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -798,7 +798,7 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
|
|
||||||
// we pass array to function in case of rsPort ==> switchLogic([55,3,0,1]) ==> [[55,3,0,1]]
|
// we pass array to function in case of rsPort ==> switchLogic([55,3,0,1]) ==> [[55,3,0,1]]
|
||||||
// we pass two values in case of websocket ==> switchLogic("relay1_03",1) ==> ["relay1_03",1]
|
// we pass two values in case of websocket ==> switchLogic("ro1_03",1) ==> ["ro1_03",1]
|
||||||
const switchLogic = (...args) => {
|
const switchLogic = (...args) => {
|
||||||
|
|
||||||
let values = {};
|
let values = {};
|
||||||
|
|
@ -849,18 +849,18 @@ exports.install = function(instance) {
|
||||||
else if (type == "rotary_switch_state") {
|
else if (type == "rotary_switch_state") {
|
||||||
// combination of these two pins required to get result
|
// combination of these two pins required to get result
|
||||||
let pin2, pin3;
|
let pin2, pin3;
|
||||||
if (pinIndex == 2 || pinIndex == "input1_02") {
|
if (pinIndex == 2 || pinIndex == "di1_02") {
|
||||||
pin2 = newPinValue;
|
pin2 = newPinValue;
|
||||||
pin3 = previousValues[3] || previousValues["input1_03"];
|
pin3 = previousValues[3] || previousValues["di1_03"];
|
||||||
|
|
||||||
if (pin3 == undefined) {
|
if (pin3 == undefined) {
|
||||||
previousValues[pinIndex] = newPinValue;
|
previousValues[pinIndex] = newPinValue;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (pinIndex == 3 || pinIndex == "input1_03") {
|
else if (pinIndex == 3 || pinIndex == "di1_03") {
|
||||||
pin3 = newPinValue;
|
pin3 = newPinValue;
|
||||||
pin2 = previousValues[2] || previousValues["input1_02"];
|
pin2 = previousValues[2] || previousValues["di1_02"];
|
||||||
|
|
||||||
if (pin2 == undefined) {
|
if (pin2 == undefined) {
|
||||||
previousValues[pinIndex] = newPinValue;
|
previousValues[pinIndex] = newPinValue;
|
||||||
|
|
@ -913,7 +913,7 @@ exports.install = function(instance) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//Dverovy kontakt - pin 6
|
//Dverovy kontakt - pin 6
|
||||||
//! Ak je rvo s dvoma dverovymi kontaktami, ked pride z evoku signal z input1_05, co bol predytm "state_of_main switch" handlujeme ho teraz ako 'door_condition'
|
//! Ak je rvo s dvoma dverovymi kontaktami, ked pride z evoku signal z di1_05, co bol predytm "state_of_main switch" handlujeme ho teraz ako 'door_condition'
|
||||||
else if (type == "door_condition" || type === "state_of_main_switch") {
|
else if (type == "door_condition" || type === "state_of_main_switch") {
|
||||||
newPinValue === 0 ? value = "open" : value = "closed";
|
newPinValue === 0 ? value = "open" : value = "closed";
|
||||||
|
|
||||||
|
|
@ -1369,60 +1369,60 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
//! pins.table --> from UNIPI
|
//! pins.table --> from UNIPI
|
||||||
// pin:string|type:string|line:number
|
// pin:string|type:string|line:number
|
||||||
// *|input1_01|state_of_main_switch|0|...........
|
// *|di1_01|state_of_main_switch|0|...........
|
||||||
// *|input1_02|rotary_switch_state|0|...........
|
// *|di1_02|rotary_switch_state|0|...........
|
||||||
// *|input1_03|rotary_switch_state|0|...........
|
// *|di1_03|rotary_switch_state|0|...........
|
||||||
// *|intut1_04|power_supply|0|...........
|
// *|intut1_04|power_supply|0|...........
|
||||||
// *|input1_05|door_condition|0|...........
|
// *|di1_05|door_condition|0|...........
|
||||||
// *|input1_06|state_of_breaker|1|...........
|
// *|di1_06|state_of_breaker|1|...........
|
||||||
// *|input1_07|state_of_breaker|2|...........
|
// *|di1_07|state_of_breaker|2|...........
|
||||||
// *|input1_08|state_of_breaker|3|...........
|
// *|di1_08|state_of_breaker|3|...........
|
||||||
// *|relay1_02|state_of_contactor|1|...........
|
// *|ro1_02|state_of_contactor|1|...........
|
||||||
// *|relay1_03|state_of_contactor|2|...........
|
// *|ro1_03|state_of_contactor|2|...........
|
||||||
// *|relay1_04|state_of_contactor|3|...........
|
// *|ro1_04|state_of_contactor|3|...........
|
||||||
// *|287D8776E0013CE9|temperature|0|...........
|
// *|287D8776E0013CE9|temperature|0|...........
|
||||||
|
|
||||||
|
|
||||||
//! pins_data --> from UNIPI
|
//! pins_data --> from UNIPI
|
||||||
// {
|
// {
|
||||||
// input1_01: {
|
// di1_01: {
|
||||||
// pin: 'input1_01',
|
// pin: 'di1_01',
|
||||||
// type: 'door_condition',
|
// type: 'door_condition',
|
||||||
// line: 0,
|
// line: 0,
|
||||||
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
||||||
// },
|
// },
|
||||||
// input1_02: {
|
// di1_02: {
|
||||||
// pin: 'input1_02',
|
// pin: 'di1_02',
|
||||||
// type: 'rotary_switch_state',
|
// type: 'rotary_switch_state',
|
||||||
// line: 0,
|
// line: 0,
|
||||||
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
||||||
// },
|
// },
|
||||||
// input1_03: {
|
// di1_03: {
|
||||||
// pin: 'input1_03',
|
// pin: 'di1_03',
|
||||||
// type: 'rotary_switch_state',
|
// type: 'rotary_switch_state',
|
||||||
// line: 0,
|
// line: 0,
|
||||||
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
||||||
// },
|
// },
|
||||||
// input1_04: {
|
// di1_04: {
|
||||||
// pin: 'input1_04',
|
// pin: 'di1_04',
|
||||||
// type: 'power_supply',
|
// type: 'power_supply',
|
||||||
// line: 0,
|
// line: 0,
|
||||||
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
||||||
// },
|
// },
|
||||||
// input1_05: {
|
// di1_05: {
|
||||||
// pin: 'input1_05',
|
// pin: 'di1_05',
|
||||||
// type: 'state_of_main_switch',
|
// type: 'state_of_main_switch',
|
||||||
// line: 0,
|
// line: 0,
|
||||||
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
// tbname: 'PLBJzmK1r3Gynd6OW0gGYz0e5wV4vx9bDEqNgYR8'
|
||||||
// },
|
// },
|
||||||
// input1_06: {
|
// di1_06: {
|
||||||
// pin: 'input1_06',
|
// pin: 'di1_06',
|
||||||
// type: 'state_of_breaker',
|
// type: 'state_of_breaker',
|
||||||
// line: 1,
|
// line: 1,
|
||||||
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'
|
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'
|
||||||
// },
|
// },
|
||||||
// relay1_02: {
|
// ro1_02: {
|
||||||
// pin: 'relay1_02',
|
// pin: 'ro1_02',
|
||||||
// type: 'state_of_contactor',
|
// type: 'state_of_contactor',
|
||||||
// line: 1,
|
// line: 1,
|
||||||
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'
|
// tbname: '52dD6ZlV1QaOpRBmbAqK8bkKnGzWMLj4eJq38Pgo'
|
||||||
|
|
|
||||||
|
|
@ -1,186 +1,186 @@
|
||||||
class DataToTbHandler {
|
class DataToTbHandler {
|
||||||
|
|
||||||
constructor(index) {
|
constructor(index) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
|
|
||||||
// time, after new value for the given key will be resend to tb (e.g. {status: "OK"})
|
// time, after new value for the given key will be resend to tb (e.g. {status: "OK"})
|
||||||
this.timeToHoldTbValue = 30 * 60; //30 minutes
|
this.timeToHoldTbValue = 30 * 60; //30 minutes
|
||||||
this.previousValues = {};
|
this.previousValues = {};
|
||||||
this.debug = false;
|
this.debug = false;
|
||||||
this.messageCounter = 0;
|
this.messageCounter = 0;
|
||||||
this.itIsNodeReadout = false;
|
this.itIsNodeReadout = false;
|
||||||
this.sender = "";
|
this.sender = "";
|
||||||
|
|
||||||
// if attribute change difference is less than limit value, we do not send to tb.
|
// if attribute change difference is less than limit value, we do not send to tb.
|
||||||
this.attributeChangeLimit = {
|
this.attributeChangeLimit = {
|
||||||
temperature: 0.5,
|
temperature: 0.5,
|
||||||
Phase_1_voltage: 2,
|
Phase_1_voltage: 2,
|
||||||
Phase_2_voltage: 2,
|
Phase_2_voltage: 2,
|
||||||
Phase_3_voltage: 2,
|
Phase_3_voltage: 2,
|
||||||
Phase_1_current: 0.1,
|
Phase_1_current: 0.1,
|
||||||
Phase_2_current: 0.1,
|
Phase_2_current: 0.1,
|
||||||
Phase_3_current: 0.1,
|
Phase_3_current: 0.1,
|
||||||
Phase_1_power: 2,
|
Phase_1_power: 2,
|
||||||
Phase_2_power: 2,
|
Phase_2_power: 2,
|
||||||
Phase_3_power: 2,
|
Phase_3_power: 2,
|
||||||
total_power: 2,
|
total_power: 2,
|
||||||
total_energy: 1,
|
total_energy: 1,
|
||||||
Phase_1_pow_factor: 0.1,
|
Phase_1_pow_factor: 0.1,
|
||||||
Phase_2_pow_factor: 0.1,
|
Phase_2_pow_factor: 0.1,
|
||||||
Phase_3_pow_factor: 0.1,
|
Phase_3_pow_factor: 0.1,
|
||||||
power_factor: 0.1,
|
power_factor: 0.1,
|
||||||
lifetime: 2,
|
lifetime: 2,
|
||||||
voltage: 2,
|
voltage: 2,
|
||||||
power: 2,
|
power: 2,
|
||||||
frequency: 3,
|
frequency: 3,
|
||||||
energy: 0.1,
|
energy: 0.1,
|
||||||
current: 2,
|
current: 2,
|
||||||
inclination_x: 10,
|
inclination_x: 10,
|
||||||
inclination_y: 10,
|
inclination_y: 10,
|
||||||
inclination_z: 10
|
inclination_z: 10
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dump() {
|
dump() {
|
||||||
console.log("----------------------------");
|
console.log("----------------------------");
|
||||||
console.log("previousValues", this.previousValues);
|
console.log("previousValues", this.previousValues);
|
||||||
console.log("----------------------------");
|
console.log("----------------------------");
|
||||||
}
|
}
|
||||||
|
|
||||||
setSender(sender) {
|
setSender(sender) {
|
||||||
this.sender = sender;
|
this.sender = sender;
|
||||||
}
|
}
|
||||||
|
|
||||||
isEmptyObject(obj) {
|
isEmptyObject(obj) {
|
||||||
for (var _ in obj) {
|
for (var _ in obj) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sendToTb(data, instance) {
|
sendToTb(data, instance) {
|
||||||
|
|
||||||
//not to modify data object, we do deep copy:
|
//not to modify data object, we do deep copy:
|
||||||
let dataCopy = JSON.parse(JSON.stringify(data));
|
let dataCopy = JSON.parse(JSON.stringify(data));
|
||||||
|
|
||||||
let keys = Object.keys(dataCopy);
|
let keys = Object.keys(dataCopy);
|
||||||
|
|
||||||
if (keys.length == 0) {
|
if (keys.length == 0) {
|
||||||
if (this.debug) console.log("sendToTb received empty object", dataCopy);
|
if (this.debug) console.log("sendToTb received empty object", dataCopy);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let tbname = keys[0];
|
let tbname = keys[0];
|
||||||
let ts;
|
let ts;
|
||||||
|
|
||||||
let arrayOfValues = dataCopy[tbname];
|
let arrayOfValues = dataCopy[tbname];
|
||||||
let arrayOfValuesToSend = [];
|
let arrayOfValuesToSend = [];
|
||||||
|
|
||||||
for (let i = 0; i < arrayOfValues.length; i++) {
|
for (let i = 0; i < arrayOfValues.length; i++) {
|
||||||
|
|
||||||
ts = arrayOfValues[i].ts;
|
ts = arrayOfValues[i].ts;
|
||||||
let values = this.prepareValuesForTb(tbname, ts, arrayOfValues[i].values);
|
let values = this.prepareValuesForTb(tbname, ts, arrayOfValues[i].values);
|
||||||
|
|
||||||
if (!this.isEmptyObject(values)) {
|
if (!this.isEmptyObject(values)) {
|
||||||
arrayOfValuesToSend.push({ ts: ts, values: values });
|
arrayOfValuesToSend.push({ ts: ts, values: values });
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (arrayOfValuesToSend.length == 0) {
|
if (arrayOfValuesToSend.length == 0) {
|
||||||
//if(this.debug) console.log("data not sent - empty array");
|
//if(this.debug) console.log("data not sent - empty array");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.messageCounter++;
|
this.messageCounter++;
|
||||||
|
|
||||||
let dataToTbModified = {
|
let dataToTbModified = {
|
||||||
[tbname]: arrayOfValuesToSend
|
[tbname]: arrayOfValuesToSend
|
||||||
}
|
}
|
||||||
|
|
||||||
//console.log(this.sender + " DATA SEND TO TB ", tbname, this.messageCounter, new Date(ts), dataToTbModified[tbname][0].values, this.instance);
|
//console.log(this.sender + " DATA SEND TO TB ", tbname, this.messageCounter, new Date(ts), dataToTbModified[tbname][0].values, this.instance);
|
||||||
//if(this.debug) console.log(this.sender + " DATA SEND TO TB ", this.index, tbname, arrayOfValuesToSend);
|
//if(this.debug) console.log(this.sender + " DATA SEND TO TB ", this.index, tbname, arrayOfValuesToSend);
|
||||||
instance.send(this.index, dataToTbModified);
|
instance.send(this.index, dataToTbModified);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
getDiffTimestamp(key) {
|
getDiffTimestamp(key) {
|
||||||
//TODO set different value for given key!!!
|
//TODO set different value for given key!!!
|
||||||
//if(key == "status") this.timeToHoldTbValue = 2*60*60;//2h
|
//if(key == "status") this.timeToHoldTbValue = 2*60*60;//2h
|
||||||
return this.timeToHoldTbValue * 1000;
|
return this.timeToHoldTbValue * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
prepareValuesForTb(tbname, timestamp, values) {
|
prepareValuesForTb(tbname, timestamp, values) {
|
||||||
|
|
||||||
let keys = Object.keys(values);
|
let keys = Object.keys(values);
|
||||||
|
|
||||||
if (keys.includes("lifetime")) this.itIsNodeReadout = true;
|
if (keys.includes("lifetime")) this.itIsNodeReadout = true;
|
||||||
|
|
||||||
if (!this.previousValues.hasOwnProperty(tbname)) {
|
if (!this.previousValues.hasOwnProperty(tbname)) {
|
||||||
this.previousValues[tbname] = {};
|
this.previousValues[tbname] = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
//if(this.debug) console.log("prepareValuesForTb", tbname, timestamp, values);
|
//if(this.debug) console.log("prepareValuesForTb", tbname, timestamp, values);
|
||||||
|
|
||||||
for (let i = 0; i < keys.length; i++) {
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
|
||||||
let key = keys[i];
|
let key = keys[i];
|
||||||
let value = values[key];
|
let value = values[key];
|
||||||
|
|
||||||
if (!this.previousValues[tbname].hasOwnProperty(key)) {
|
if (!this.previousValues[tbname].hasOwnProperty(key)) {
|
||||||
this.previousValues[tbname][key] = { ts: timestamp, value: value };
|
this.previousValues[tbname][key] = { ts: timestamp, value: value };
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// attributeData ==> {voltage: {ts:333333, value:5}}
|
// attributeData ==> {voltage: {ts:333333, value:5}}
|
||||||
let attributeData = this.previousValues[tbname][key];
|
let attributeData = this.previousValues[tbname][key];
|
||||||
let attributeToChange = false;
|
let attributeToChange = false;
|
||||||
if (key in this.attributeChangeLimit) attributeToChange = true;
|
if (key in this.attributeChangeLimit) attributeToChange = true;
|
||||||
let limit = this.attributeChangeLimit[key];
|
let limit = this.attributeChangeLimit[key];
|
||||||
let timestampDiffToRemoveKey;
|
let timestampDiffToRemoveKey;
|
||||||
|
|
||||||
//this will ensure "node statecode" will be sent just once an hour
|
//this will ensure "node statecode" will be sent just once an hour
|
||||||
if (this.itIsNodeReadout && key === "statecode") {
|
if (this.itIsNodeReadout && key === "statecode") {
|
||||||
attributeData.value = value;
|
attributeData.value = value;
|
||||||
this.itIsNodeReadout = false;
|
this.itIsNodeReadout = false;
|
||||||
timestampDiffToRemoveKey = 1 * 60 * 60 * 1000; // 1 hour
|
timestampDiffToRemoveKey = 1 * 60 * 60 * 1000; // 1 hour
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key === "twilight_sensor" && value > 100) {
|
if (key === "twilight_sensor" && value > 100) {
|
||||||
attributeData.value = value;
|
attributeData.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
//if edge, master or node version do not change, send just once a day:
|
//if edge, master or node version do not change, send just once a day:
|
||||||
if (["edge_fw_version", "master_node_version", "fw_version"].includes(key)) {
|
if (["edge_fw_version", "master_node_version", "fw_version"].includes(key)) {
|
||||||
timestampDiffToRemoveKey = 24 * 60 * 60 * 1000;
|
timestampDiffToRemoveKey = 24 * 60 * 60 * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (attributeData.value === value || attributeToChange && Math.abs(attributeData.value - value) < limit) {
|
if (attributeData.value === value || attributeToChange && Math.abs(attributeData.value - value) < limit) {
|
||||||
|
|
||||||
let diff = timestamp - attributeData.ts;
|
let diff = timestamp - attributeData.ts;
|
||||||
if (!timestampDiffToRemoveKey) timestampDiffToRemoveKey = this.getDiffTimestamp(key);
|
if (!timestampDiffToRemoveKey) timestampDiffToRemoveKey = this.getDiffTimestamp(key);
|
||||||
|
|
||||||
if (diff > timestampDiffToRemoveKey) {
|
if (diff > timestampDiffToRemoveKey) {
|
||||||
attributeData.ts = Date.now();
|
attributeData.ts = Date.now();
|
||||||
//if(this.debug) console.log(this.sender + ": update ts for key", key, "diff is", diff, "messageCounter", this.messageCounter);
|
//if(this.debug) console.log(this.sender + ": update ts for key", key, "diff is", diff, "messageCounter", this.messageCounter);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
delete values[key];
|
delete values[key];
|
||||||
//if(this.debug) console.log(this.sender + ": delete key", key, "diff is", diff, "messageCounter", this.messageCounter, timestampDiffToRemoveKey);
|
//if(this.debug) console.log(this.sender + ": delete key", key, "diff is", diff, "messageCounter", this.messageCounter, timestampDiffToRemoveKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
attributeData.value = value;
|
attributeData.value = value;
|
||||||
attributeData.ts = timestamp;
|
attributeData.ts = timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return values;
|
return values;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = DataToTbHandler;
|
module.exports = DataToTbHandler;
|
||||||
|
|
|
||||||
|
|
@ -1,44 +1,102 @@
|
||||||
function bytesToInt(bytes, numberOfBytes) {
|
function bytesToInt_orig(bytes, numberOfBytes) {
|
||||||
let buffer = [];
|
|
||||||
if (Array.isArray(bytes)) {
|
|
||||||
buffer = bytes.slice(0);
|
|
||||||
if (numberOfBytes != undefined) {
|
|
||||||
buffer = bytes.slice(bytes.length - numberOfBytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else buffer.push(bytes);
|
|
||||||
|
|
||||||
let result = 0;
|
let buffer = [];
|
||||||
for (let i = 0; i < buffer.length; i++) {
|
if (Array.isArray(bytes)) {
|
||||||
result = (result << 8) | buffer[i];
|
buffer = bytes.slice(0);
|
||||||
}
|
if (numberOfBytes != undefined) {
|
||||||
|
buffer = bytes.slice(bytes.length - numberOfBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else buffer.push(bytes);
|
||||||
|
//var decimal = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3];
|
||||||
|
|
||||||
return result >>> 0; //ensure it's an unsigned 32-bit number
|
let l = (buffer.length - 1) * 8;
|
||||||
|
let decimal = 0;
|
||||||
|
for (let i = 0; i < buffer.length; i++) {
|
||||||
|
var s = buffer[i] << l;
|
||||||
|
if (l < 8) s = buffer[i]
|
||||||
|
decimal = decimal + s;
|
||||||
|
l = l - 8;
|
||||||
|
}
|
||||||
|
// console.log("decimal utils.js: ", decimal);
|
||||||
|
|
||||||
|
let decimal1 = 0n;
|
||||||
|
for (let i = 0; i < buffer.length; i++) {
|
||||||
|
decimal1 += BigInt(buffer[i]) * (2n ** BigInt((buffer.length - 1 - i) * 8));
|
||||||
|
}
|
||||||
|
// console.log("decimal biging utils.js: ", decimal1);
|
||||||
|
return decimal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//bytestouintBE
|
||||||
|
function bytesToInt(bytes, numberOfBytes) {
|
||||||
|
|
||||||
|
let buffer = [];
|
||||||
|
if (Array.isArray(bytes)) {
|
||||||
|
buffer = bytes.slice(0);
|
||||||
|
if (numberOfBytes != undefined) {
|
||||||
|
buffer = bytes.slice(bytes.length - numberOfBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else buffer.push(bytes);
|
||||||
|
|
||||||
|
console.log(bytes, buffer);
|
||||||
|
|
||||||
|
let result = 0;
|
||||||
|
for (let i = 0; i < buffer.length; i++) {
|
||||||
|
result = (result << 8) | bytes[i];
|
||||||
|
}
|
||||||
|
// console.log("decimal biging utils.js: ", decimal1);
|
||||||
|
|
||||||
|
console.log("originall: ", bytesToInt_orig(buffer));
|
||||||
|
console.log("uint little endian: ", bytesToUintLE(buffer));
|
||||||
|
console.log('neww: ', result >>> 0);
|
||||||
|
return result >>> 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function bytesToUintLE(bytes, numberOfBytes) {
|
||||||
|
|
||||||
|
let buffer = [];
|
||||||
|
if (Array.isArray(bytes)) {
|
||||||
|
buffer = bytes.slice(0);
|
||||||
|
if (numberOfBytes != undefined) {
|
||||||
|
buffer = bytes.slice(bytes.length - numberOfBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else buffer.push(bytes);
|
||||||
|
//var decimal = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3];
|
||||||
|
|
||||||
|
let result = 0;
|
||||||
|
for (let i = buffer.length - 1; i <= 0; i--) {
|
||||||
|
result = (result << 8) | bytes[i];
|
||||||
|
}
|
||||||
|
return result >>> 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
function resizeArray(arr, newSize, defaultValue) {
|
function resizeArray(arr, newSize, defaultValue) {
|
||||||
while (newSize > arr.length)
|
while (newSize > arr.length)
|
||||||
arr.push(defaultValue);
|
arr.push(defaultValue);
|
||||||
arr.length = newSize;
|
arr.length = newSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
longToByteArray = function(/*long*/long) {
|
longToByteArray = function(/*long*/long) {
|
||||||
// we want to represent the input as a 8-bytes array
|
// we want to represent the input as a 8-bytes array
|
||||||
var byteArray = [0, 0, 0, 0, 0, 0, 0, 0];
|
var byteArray = [0, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
|
||||||
for (var index = 0; index < byteArray.length; index++) {
|
for (var index = 0; index < byteArray.length; index++) {
|
||||||
var byte = long & 0xff;
|
var byte = long & 0xff;
|
||||||
byteArray[index] = byte;
|
byteArray[index] = byte;
|
||||||
long = (long - byte) / 256;
|
long = (long - byte) / 256;
|
||||||
}
|
}
|
||||||
|
|
||||||
return byteArray;
|
return byteArray;
|
||||||
};
|
};
|
||||||
|
|
||||||
function addDays(date, days) {
|
function addDays(date, days) {
|
||||||
var result = new Date(date);
|
var result = new Date(date);
|
||||||
result.setDate(result.getDate() + days);
|
result.setDate(result.getDate() + days);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -50,63 +108,63 @@ sleep(2000).then(() => {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
function sleep(time) {
|
function sleep(time) {
|
||||||
return new Promise((resolve) => setTimeout(resolve, time));
|
return new Promise((resolve) => setTimeout(resolve, time));
|
||||||
}
|
}
|
||||||
|
|
||||||
function isEmptyObject(obj) {
|
function isEmptyObject(obj) {
|
||||||
for (var name in obj) {
|
for (var name in obj) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
function convertUTCDateToLocalDate(date) {
|
function convertUTCDateToLocalDate(date) {
|
||||||
var newDate = new Date(date);
|
var newDate = new Date(date);
|
||||||
newDate.setMinutes(date.getMinutes() + date.getTimezoneOffset());
|
newDate.setMinutes(date.getMinutes() + date.getTimezoneOffset());
|
||||||
return newDate;
|
return newDate;
|
||||||
}
|
}
|
||||||
|
|
||||||
function addZeroBefore(n) {
|
function addZeroBefore(n) {
|
||||||
return (n < 10 ? '0' : '') + n;
|
return (n < 10 ? '0' : '') + n;
|
||||||
}
|
}
|
||||||
|
|
||||||
var convertBase = function() {
|
var convertBase = function() {
|
||||||
|
|
||||||
function convertBase(baseFrom, baseTo) {
|
function convertBase(baseFrom, baseTo) {
|
||||||
return function(num) {
|
return function(num) {
|
||||||
return parseInt(num, baseFrom).toString(baseTo);
|
return parseInt(num, baseFrom).toString(baseTo);
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// binary to decimal
|
// binary to decimal
|
||||||
convertBase.bin2dec = convertBase(2, 10);
|
convertBase.bin2dec = convertBase(2, 10);
|
||||||
|
|
||||||
// binary to hexadecimal
|
// binary to hexadecimal
|
||||||
convertBase.bin2hex = convertBase(2, 16);
|
convertBase.bin2hex = convertBase(2, 16);
|
||||||
|
|
||||||
// decimal to binary
|
// decimal to binary
|
||||||
convertBase.dec2bin = convertBase(10, 2);
|
convertBase.dec2bin = convertBase(10, 2);
|
||||||
|
|
||||||
// decimal to hexadecimal
|
// decimal to hexadecimal
|
||||||
convertBase.dec2hex = convertBase(10, 16);
|
convertBase.dec2hex = convertBase(10, 16);
|
||||||
|
|
||||||
// hexadecimal to binary
|
// hexadecimal to binary
|
||||||
convertBase.hex2bin = convertBase(16, 2);
|
convertBase.hex2bin = convertBase(16, 2);
|
||||||
|
|
||||||
// hexadecimal to decimal
|
// hexadecimal to decimal
|
||||||
convertBase.hex2dec = convertBase(16, 10);
|
convertBase.hex2dec = convertBase(16, 10);
|
||||||
|
|
||||||
return convertBase;
|
return convertBase;
|
||||||
}();
|
}();
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
bytesToInt,
|
bytesToInt,
|
||||||
longToByteArray,
|
longToByteArray,
|
||||||
addDays,
|
addDays,
|
||||||
addZeroBefore,
|
addZeroBefore,
|
||||||
resizeArray,
|
resizeArray,
|
||||||
isEmptyObject,
|
isEmptyObject,
|
||||||
sleep,
|
sleep,
|
||||||
convertUTCDateToLocalDate
|
convertUTCDateToLocalDate
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ exports.readme = `
|
||||||
`;
|
`;
|
||||||
|
|
||||||
const modbus = require('jsmodbus');
|
const modbus = require('jsmodbus');
|
||||||
const SerialPort = require('serialport');
|
const {SerialPort} = require('serialport');
|
||||||
const { timeoutInterval, deviceConfig } = require("../databases/modbus_config");
|
const { timeoutInterval, deviceConfig } = require("../databases/modbus_config");
|
||||||
const { sendNotification } = require('./helper/notification_reporter');
|
const { sendNotification } = require('./helper/notification_reporter');
|
||||||
|
|
||||||
|
|
@ -36,7 +36,6 @@ let mainSocket;
|
||||||
let phases;
|
let phases;
|
||||||
//phases where voltage is 0 (set)
|
//phases where voltage is 0 (set)
|
||||||
let noVoltage;
|
let noVoltage;
|
||||||
let energyToSwitchLamps;
|
|
||||||
|
|
||||||
exports.install = function(instance) {
|
exports.install = function(instance) {
|
||||||
|
|
||||||
|
|
@ -77,13 +76,8 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
let obj = this;
|
let obj = this;
|
||||||
|
|
||||||
if (this.socket) {
|
this.socket = new SerialPort({path: "/dev/ttymxc0",
|
||||||
this.socket.removeAllListeners();
|
baudRate: 9600
|
||||||
this.socket = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.socket = new SerialPort("/dev/ttymxc0", {
|
|
||||||
baudRate: 9600,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// we create a client for every deviceAddress ( = address) in list and push them into dictionary
|
// we create a client for every deviceAddress ( = address) in list and push them into dictionary
|
||||||
|
|
@ -92,11 +86,15 @@ exports.install = function(instance) {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.socket.on('error', function(e) {
|
this.socket.on('error', function(e) {
|
||||||
console.log('Modbus_reader: Socket connection error', e); //'ECONNREFUSED' or 'ECONNRESET' ??
|
console.log('socket connection error', e);
|
||||||
|
if (e.code == 'ECONNREFUSED' || e.code == 'ECONNRESET') {
|
||||||
|
console.log(exports.title + ' Waiting 10 seconds before trying to connect again');
|
||||||
|
setTimeout(obj.startSocket, 10000);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
this.socket.on('close', function() {
|
this.socket.on('close', function() {
|
||||||
console.log('Modbus_reader: Socket connection closed - Waiting 10 seconds before connecting again');
|
console.log('Socket connection closed ' + exports.title + ' Waiting 10 seconds before trying to connect again');
|
||||||
setTimeout(obj.startSocket, 10000);
|
setTimeout(obj.startSocket, 10000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -117,8 +115,7 @@ exports.install = function(instance) {
|
||||||
this.deviceAddress = dev.deviceAddress; // 1 or 2 or any number
|
this.deviceAddress = dev.deviceAddress; // 1 or 2 or any number
|
||||||
this.device = dev.device; //em340, twilight_sensor
|
this.device = dev.device; //em340, twilight_sensor
|
||||||
|
|
||||||
//if we just start to loop devices from the beginning, or there is just 1 device in config, we wait whole timeoutInterval
|
if (this.indexInDeviceConfig == 0) setTimeout(this.readRegisters, this.timeoutInterval);
|
||||||
if (this.indexInDeviceConfig == 0 || deviceConfig.length === 1) setTimeout(this.readRegisters, this.timeoutInterval);
|
|
||||||
else setTimeout(this.readRegisters, DELAY_BETWEEN_DEVICES);
|
else setTimeout(this.readRegisters, DELAY_BETWEEN_DEVICES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -307,12 +304,15 @@ exports.install = function(instance) {
|
||||||
|
|
||||||
const actualTotalPower = values.total_power;
|
const actualTotalPower = values.total_power;
|
||||||
|
|
||||||
if (actualTotalPower > energyToSwitchLamps && this.onNotificationSent == false) {
|
const numberOfNodes = Object.keys(FLOW.GLOBALS.nodesData).length;
|
||||||
|
if (numberOfNodes == 0) numberOfNodes = 20; // to make sure, we send notification if totalPower is more than 300
|
||||||
|
|
||||||
|
if (actualTotalPower > numberOfNodes * 15 && this.onNotificationSent == false) {
|
||||||
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_on", {}, "", SEND_TO.tb, instance);
|
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_on", {}, "", SEND_TO.tb, instance);
|
||||||
this.onNotificationSent = true;
|
this.onNotificationSent = true;
|
||||||
this.offNotificationSent = false;
|
this.offNotificationSent = false;
|
||||||
}
|
}
|
||||||
else if (actualTotalPower <= energyToSwitchLamps && this.offNotificationSent == false) {
|
else if (actualTotalPower <= numberOfNodes * 15 && this.offNotificationSent == false) {
|
||||||
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_off", {}, "", SEND_TO.tb, instance);
|
sendNotification("modbus_reader: lampSwitchNotification", tbName, "lamps_have_turned_off", {}, "", SEND_TO.tb, instance);
|
||||||
this.onNotificationSent = false;
|
this.onNotificationSent = false;
|
||||||
this.offNotificationSent = true;
|
this.offNotificationSent = true;
|
||||||
|
|
@ -330,9 +330,9 @@ exports.install = function(instance) {
|
||||||
phases = FLOW.GLOBALS.settings.phases;
|
phases = FLOW.GLOBALS.settings.phases;
|
||||||
tbName = FLOW.GLOBALS.settings.rvoTbName;
|
tbName = FLOW.GLOBALS.settings.rvoTbName;
|
||||||
noVoltage = FLOW.GLOBALS.settings.no_voltage;
|
noVoltage = FLOW.GLOBALS.settings.no_voltage;
|
||||||
energyToSwitchLamps = FLOW.GLOBALS.settings.energy_to_switch_lamps / 2.5; //half value is enought to show if lamps are turned on or off
|
mainSocket = new SocketWithClients();
|
||||||
if (deviceConfig.length) mainSocket = new SocketWithClients();
|
|
||||||
else console.log("Modbus_reader: no modbus device in configuration");
|
console.log("novoltage: ", noVoltage, typeof noVoltage);
|
||||||
|
|
||||||
// this notification is to show, that flow (unipi) has been restarted
|
// this notification is to show, that flow (unipi) has been restarted
|
||||||
sendNotification("modbus_reader", tbName, "flow_restart", {}, "", SEND_TO.slack, instance);
|
sendNotification("modbus_reader", tbName, "flow_restart", {}, "", SEND_TO.slack, instance);
|
||||||
|
|
|
||||||
0
flow/variables.txt
Normal file
0
flow/variables.txt
Normal file
|
|
@ -44,9 +44,9 @@ const fs = require('fs');
|
||||||
const mqtt = require('mqtt');
|
const mqtt = require('mqtt');
|
||||||
|
|
||||||
const SEND_TO = {
|
const SEND_TO = {
|
||||||
debug: 0,
|
debug: 0,
|
||||||
rpcCall: 1,
|
rpcCall: 1,
|
||||||
services: 2
|
services: 2
|
||||||
}
|
}
|
||||||
|
|
||||||
//CONFIG
|
//CONFIG
|
||||||
|
|
@ -72,13 +72,13 @@ let sendClientError = true;
|
||||||
|
|
||||||
process.on('uncaughtException', function(err) {
|
process.on('uncaughtException', function(err) {
|
||||||
|
|
||||||
errLogger.error('uncaughtException:', err.message)
|
errLogger.error('uncaughtException:', err.message)
|
||||||
errLogger.error(err.stack);
|
errLogger.error(err.stack);
|
||||||
|
|
||||||
//TODO
|
//TODO
|
||||||
//send to service
|
//send to service
|
||||||
|
|
||||||
//process.exit(1);
|
//process.exit(1);
|
||||||
})
|
})
|
||||||
|
|
||||||
const nosql = NOSQL('tbdata');
|
const nosql = NOSQL('tbdata');
|
||||||
|
|
@ -87,362 +87,364 @@ const nosqlBackup = NOSQL('/backup/tbdata');
|
||||||
|
|
||||||
exports.install = function(instance) {
|
exports.install = function(instance) {
|
||||||
|
|
||||||
var client;
|
var client;
|
||||||
var opts;
|
var opts;
|
||||||
var clientReady = false;
|
var clientReady = false;
|
||||||
|
|
||||||
// wsmqtt status for notification purposes on projects.worksys.io database
|
// wsmqtt status for notification purposes on projects.worksys.io database
|
||||||
let wsmqttName = null;
|
let wsmqttName = null;
|
||||||
let sendWsStatusVar = null;
|
let sendWsStatusVar = null;
|
||||||
let wsmqtt_status = 'disconnected';
|
let wsmqtt_status = 'disconnected';
|
||||||
|
|
||||||
function getWsmqttName(host) {
|
function getWsmqttName(host) {
|
||||||
if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
|
if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
|
||||||
else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
|
else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
|
||||||
else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
|
else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
|
||||||
}
|
}
|
||||||
|
|
||||||
function sendWsStatus() {
|
function sendWsStatus() {
|
||||||
instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status });
|
instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function main() {
|
function main() {
|
||||||
if (!FLOW.dbLoaded) return;
|
if (!FLOW.dbLoaded) return;
|
||||||
|
|
||||||
loadSettings();
|
loadSettings();
|
||||||
clearInterval(sendWsStatus);
|
clearInterval(sendWsStatus);
|
||||||
sendWsStatusVar = setInterval(sendWsStatus, 180000);
|
sendWsStatusVar = setInterval(sendWsStatus, 180000);
|
||||||
}
|
}
|
||||||
|
|
||||||
//set opts according to db settings
|
//set opts according to db settings
|
||||||
function loadSettings() {
|
function loadSettings() {
|
||||||
|
|
||||||
if (instance.options.host !== "") {
|
if (instance.options.host !== "") {
|
||||||
//override settings from database
|
//override settings from database
|
||||||
var o = instance.options;
|
var o = instance.options;
|
||||||
opts = {
|
opts = {
|
||||||
host: o.host,
|
host: o.host,
|
||||||
port: o.port,
|
port: o.port,
|
||||||
clientId: o.clientid,
|
clientId: o.clientid,
|
||||||
username: o.username,
|
username: o.username,
|
||||||
rejectUnauthorized: false,
|
rejectUnauthorized: false,
|
||||||
resubscribe: false
|
resubscribe: false
|
||||||
};
|
};
|
||||||
|
|
||||||
wsmqttName = getWsmqttName(o.host);
|
wsmqttName = getWsmqttName(o.host);
|
||||||
|
|
||||||
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
|
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
||||||
const SETTINGS = FLOW.GLOBALS.settings;
|
const SETTINGS = FLOW.GLOBALS.settings;
|
||||||
backup_on_failure = SETTINGS.backup_on_failure;
|
backup_on_failure = SETTINGS.backup_on_failure;
|
||||||
saveTelemetryOnError = backup_on_failure;
|
saveTelemetryOnError = backup_on_failure;
|
||||||
|
|
||||||
restore_from_backup = SETTINGS.restore_from_backup;
|
restore_from_backup = SETTINGS.restore_from_backup;
|
||||||
restore_backup_wait = SETTINGS.restore_backup_wait;
|
restore_backup_wait = SETTINGS.restore_backup_wait;
|
||||||
|
|
||||||
let mqtt_host = SETTINGS.mqtt_host;
|
let mqtt_host = SETTINGS.mqtt_host;
|
||||||
let mqtt_clientid = SETTINGS.mqtt_clientid;
|
let mqtt_clientid = SETTINGS.mqtt_clientid;
|
||||||
let mqtt_username = SETTINGS.mqtt_username;
|
let mqtt_username = SETTINGS.mqtt_username;
|
||||||
let mqtt_port = SETTINGS.mqtt_port;
|
let mqtt_port = SETTINGS.mqtt_port;
|
||||||
|
|
||||||
opts = {
|
opts = {
|
||||||
host: mqtt_host,
|
host: mqtt_host,
|
||||||
port: mqtt_port,
|
port: mqtt_port,
|
||||||
keepalive: 10,
|
keepalive: 10,
|
||||||
clientId: mqtt_clientid,
|
clientId: mqtt_clientid,
|
||||||
username: mqtt_username,
|
username: mqtt_username,
|
||||||
rejectUnauthorized: false,
|
rejectUnauthorized: false,
|
||||||
resubscribe: false
|
resubscribe: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
wsmqttName = getWsmqttName(mqtt_host);
|
wsmqttName = getWsmqttName(mqtt_host);
|
||||||
}
|
}
|
||||||
|
|
||||||
connectToTbServer();
|
connectToTbServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
function connectToTbServer() {
|
function connectToTbServer() {
|
||||||
var url = "mqtt://" + opts.host + ":" + opts.port;
|
var url = "mqtt://" + opts.host + ":" + opts.port;
|
||||||
console.log("MQTT URL: ", url);
|
console.log("MQTT URL: ", url);
|
||||||
|
|
||||||
client = mqtt.connect(url, opts);
|
client = mqtt.connect(url, opts);
|
||||||
|
|
||||||
client.on('connect', function() {
|
client.on('connect', function() {
|
||||||
instance.status("Connected", "green");
|
instance.status("Connected", "green");
|
||||||
//monitor.info("MQTT client connected");
|
//monitor.info("MQTT client connected");
|
||||||
|
|
||||||
sendClientError = true;
|
sendClientError = true;
|
||||||
clientReady = true;
|
clientReady = true;
|
||||||
wsmqtt_status = 'connected';
|
wsmqtt_status = 'connected';
|
||||||
});
|
});
|
||||||
|
|
||||||
client.on('reconnect', function() {
|
client.on('reconnect', function() {
|
||||||
instance.status("Reconnecting", "yellow");
|
instance.status("Reconnecting", "yellow");
|
||||||
clientReady = false;
|
clientReady = false;
|
||||||
});
|
});
|
||||||
|
|
||||||
client.on('message', function(topic, message) {
|
client.on('message', function(topic, message) {
|
||||||
// message is type of buffer
|
// message is type of buffer
|
||||||
message = message.toString();
|
message = message.toString();
|
||||||
if (message[0] === '{') {
|
if (message[0] === '{') {
|
||||||
TRY(function() {
|
|
||||||
|
try {
|
||||||
message = JSON.parse(message);
|
message = JSON.parse(message);
|
||||||
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
||||||
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
|
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
|
||||||
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
|
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
|
||||||
}
|
}
|
||||||
|
|
||||||
}, () => instance.debug('MQTT: Error parsing data', message));
|
} catch (e) {
|
||||||
}
|
console.log('MQTT: Error parsing data', e);
|
||||||
|
}
|
||||||
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message });
|
}
|
||||||
});
|
|
||||||
|
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message });
|
||||||
client.on('close', function() {
|
});
|
||||||
clientReady = false;
|
|
||||||
wsmqtt_status = 'disconnected';
|
client.on('close', function() {
|
||||||
|
clientReady = false;
|
||||||
instance.status("Disconnected", "red");
|
wsmqtt_status = 'disconnected';
|
||||||
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
|
|
||||||
});
|
instance.status("Disconnected", "red");
|
||||||
|
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
|
||||||
client.on('error', function(err) {
|
});
|
||||||
instance.status("Err: " + err.code, "red");
|
|
||||||
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
|
client.on('error', function(err) {
|
||||||
if (sendClientError) {
|
instance.status("Err: " + err.code, "red");
|
||||||
monitor.info('MQTT client error', err);
|
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
|
||||||
sendClientError = false;
|
if (sendClientError) {
|
||||||
}
|
monitor.info('MQTT client error', err);
|
||||||
clientReady = false;
|
sendClientError = false;
|
||||||
wsmqtt_status = 'disconnected';
|
}
|
||||||
});
|
clientReady = false;
|
||||||
|
wsmqtt_status = 'disconnected';
|
||||||
}
|
});
|
||||||
|
|
||||||
|
}
|
||||||
instance.on("0", _ => {
|
|
||||||
main();
|
|
||||||
})
|
instance.on("0", _ => {
|
||||||
|
main();
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
instance.on('1', function(data) {
|
instance.on('1', function(data) {
|
||||||
|
|
||||||
if (clientReady) {
|
if (clientReady) {
|
||||||
//do we have some data in backup file? if any, process data from database
|
//do we have some data in backup file? if any, process data from database
|
||||||
if (saveTelemetryOnError) {
|
if (saveTelemetryOnError) {
|
||||||
//read telemetry data and send back to server
|
//read telemetry data and send back to server
|
||||||
if (!processingData) processDataFromDatabase();
|
if (!processingData) processDataFromDatabase();
|
||||||
}
|
}
|
||||||
|
|
||||||
let stringifiedJson = JSON.stringify(data.data);
|
let stringifiedJson = JSON.stringify(data.data);
|
||||||
client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 });
|
client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 });
|
||||||
|
|
||||||
//backup telemetry
|
//backup telemetry
|
||||||
if (createTelemetryBackup) {
|
if (createTelemetryBackup) {
|
||||||
data.data.id = UID();
|
data.data.id = UID();
|
||||||
nosqlBackup.insert(data.data);
|
nosqlBackup.insert(data.data);
|
||||||
|
|
||||||
insertBackupNoSqlCounter++;
|
insertBackupNoSqlCounter++;
|
||||||
if (insertBackupNoSqlCounter > 150) {
|
if (insertBackupNoSqlCounter > 150) {
|
||||||
let options = { compress: true };
|
let options = { compress: true };
|
||||||
let path = __dirname + "/../databases/backup/tbdata.nosql";
|
let path = __dirname + "/../databases/backup/tbdata.nosql";
|
||||||
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
|
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
|
||||||
stream.write("");
|
stream.write("");
|
||||||
stream.end();
|
stream.end();
|
||||||
|
|
||||||
insertBackupNoSqlCounter = 0;
|
insertBackupNoSqlCounter = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
||||||
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
|
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
|
||||||
|
|
||||||
if (saveTelemetryOnError) {
|
if (saveTelemetryOnError) {
|
||||||
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
||||||
makeBackupFromDbFile();
|
makeBackupFromDbFile();
|
||||||
|
|
||||||
//write to tb
|
//write to tb
|
||||||
data.data.id = UID();
|
data.data.id = UID();
|
||||||
nosql.insert(data.data);
|
nosql.insert(data.data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
instance.close = function(done) {
|
instance.close = function(done) {
|
||||||
if (clientReady) {
|
if (clientReady) {
|
||||||
client.end();
|
client.end();
|
||||||
clearInterval(sendWsStatusVar);
|
clearInterval(sendWsStatusVar);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
function getDbBackupFileCounter(type) {
|
function getDbBackupFileCounter(type) {
|
||||||
var files = fs.readdirSync(__dirname + "/../databases");
|
var files = fs.readdirSync(__dirname + "/../databases");
|
||||||
|
|
||||||
let counter = 0;
|
let counter = 0;
|
||||||
for (var i = 0; i < files.length; i++) {
|
for (var i = 0; i < files.length; i++) {
|
||||||
|
|
||||||
if (files[i] == "tbdata.nosql") continue;
|
if (files[i] == "tbdata.nosql") continue;
|
||||||
|
|
||||||
if (files[i].endsWith(".nosql")) {
|
if (files[i].endsWith(".nosql")) {
|
||||||
|
|
||||||
let pos = files[i].indexOf(".");
|
let pos = files[i].indexOf(".");
|
||||||
if (pos > -1) {
|
if (pos > -1) {
|
||||||
|
|
||||||
let fileCounter = counter;
|
let fileCounter = counter;
|
||||||
let firstDigit = files[i].slice(0, pos);
|
let firstDigit = files[i].slice(0, pos);
|
||||||
|
|
||||||
fileCounter = parseInt(firstDigit);
|
fileCounter = parseInt(firstDigit);
|
||||||
if (isNaN(fileCounter)) fileCounter = 0;
|
if (isNaN(fileCounter)) fileCounter = 0;
|
||||||
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
||||||
|
|
||||||
if (type == "max") {
|
if (type == "max") {
|
||||||
if (fileCounter > counter) {
|
if (fileCounter > counter) {
|
||||||
counter = fileCounter;
|
counter = fileCounter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (type == "min") {
|
else if (type == "min") {
|
||||||
if (counter == 0) counter = fileCounter;
|
if (counter == 0) counter = fileCounter;
|
||||||
|
|
||||||
if (fileCounter < counter) {
|
if (fileCounter < counter) {
|
||||||
counter = fileCounter;
|
counter = fileCounter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == "max") counter++;
|
if (type == "max") counter++;
|
||||||
|
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
const makeBackupFromDbFile = async () => {
|
const makeBackupFromDbFile = async () => {
|
||||||
|
|
||||||
if (!saveTelemetryOnError) return;
|
if (!saveTelemetryOnError) return;
|
||||||
|
|
||||||
//to avoid large file: tbdata.nosql
|
//to avoid large file: tbdata.nosql
|
||||||
|
|
||||||
//init value is 0!
|
//init value is 0!
|
||||||
if (insertNoSqlCounter > 0) {
|
if (insertNoSqlCounter > 0) {
|
||||||
--insertNoSqlCounter;
|
--insertNoSqlCounter;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
insertNoSqlCounter = 100;
|
insertNoSqlCounter = 100;
|
||||||
|
|
||||||
let source = __dirname + "/../databases/tbdata.nosql";
|
let source = __dirname + "/../databases/tbdata.nosql";
|
||||||
|
|
||||||
var stats = fs.statSync(source);
|
var stats = fs.statSync(source);
|
||||||
var fileSizeInBytes = stats.size;
|
var fileSizeInBytes = stats.size;
|
||||||
|
|
||||||
if (fileSizeInBytes > noSqlFileSizeLimit) {
|
if (fileSizeInBytes > noSqlFileSizeLimit) {
|
||||||
|
|
||||||
let counter = 1;
|
let counter = 1;
|
||||||
counter = getDbBackupFileCounter("max");
|
counter = getDbBackupFileCounter("max");
|
||||||
|
|
||||||
let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql";
|
let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql";
|
||||||
|
|
||||||
//make backup file
|
//make backup file
|
||||||
fs.copyFileSync(source, destination);
|
fs.copyFileSync(source, destination);
|
||||||
//fs.renameSync(p, p + "." + counter);
|
//fs.renameSync(p, p + "." + counter);
|
||||||
|
|
||||||
//clear tbdata.nosql
|
//clear tbdata.nosql
|
||||||
fs.writeFileSync(source, "");
|
fs.writeFileSync(source, "");
|
||||||
fs.truncateSync(source, 0);
|
fs.truncateSync(source, 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const processDataFromDatabase = async () => {
|
const processDataFromDatabase = async () => {
|
||||||
|
|
||||||
if (restore_from_backup <= 0) return;
|
if (restore_from_backup <= 0) return;
|
||||||
|
|
||||||
//calculate diff
|
//calculate diff
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
let currentTime = now.getTime();
|
let currentTime = now.getTime();
|
||||||
let diff = currentTime - lastRestoreTime;
|
let diff = currentTime - lastRestoreTime;
|
||||||
|
|
||||||
if ((diff / 1000) < restore_backup_wait) {
|
if ((diff / 1000) < restore_backup_wait) {
|
||||||
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
processingData = true;
|
processingData = true;
|
||||||
|
|
||||||
//get filename to process
|
//get filename to process
|
||||||
let counter = getDbBackupFileCounter("min");
|
let counter = getDbBackupFileCounter("min");
|
||||||
|
|
||||||
//we have some backup files
|
//we have some backup files
|
||||||
let dataBase = 'tbdata';
|
let dataBase = 'tbdata';
|
||||||
|
|
||||||
var nosql;
|
var nosql;
|
||||||
if (counter == 0) dataBase = 'tbdata';
|
if (counter == 0) dataBase = 'tbdata';
|
||||||
else dataBase = counter + "." + 'tbdata';
|
else dataBase = counter + "." + 'tbdata';
|
||||||
|
|
||||||
nosql = NOSQL(dataBase);
|
nosql = NOSQL(dataBase);
|
||||||
|
|
||||||
//select all data - use limit restore_from_backup
|
//select all data - use limit restore_from_backup
|
||||||
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
||||||
|
|
||||||
for (let i = 0; i < records.length; i++) {
|
for (let i = 0; i < records.length; i++) {
|
||||||
if (clientReady) {
|
if (clientReady) {
|
||||||
|
|
||||||
let item = records[i];
|
let item = records[i];
|
||||||
let id = item.id;
|
let id = item.id;
|
||||||
|
|
||||||
if (id !== undefined) {
|
if (id !== undefined) {
|
||||||
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
let message = JSON.parse(JSON.stringify(item));
|
let message = JSON.parse(JSON.stringify(item));
|
||||||
delete message.id;
|
delete message.id;
|
||||||
|
|
||||||
client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 });
|
client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 });
|
||||||
|
|
||||||
//remove from database
|
//remove from database
|
||||||
await promisifyBuilder(nosql.remove().where("id", id));
|
await promisifyBuilder(nosql.remove().where("id", id));
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
//process error
|
//process error
|
||||||
console.log("processDataFromDatabase", error);
|
console.log("processDataFromDatabase", error);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
processingData = false;
|
processingData = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (records.length > 0) {
|
if (records.length > 0) {
|
||||||
//clean backup file
|
//clean backup file
|
||||||
if (counter > 0) nosql.clean();
|
if (counter > 0) nosql.clean();
|
||||||
}
|
}
|
||||||
|
|
||||||
//no data in db, remove
|
//no data in db, remove
|
||||||
if (records.length == 0) {
|
if (records.length == 0) {
|
||||||
if (counter > 0) nosql.drop();
|
if (counter > 0) nosql.drop();
|
||||||
}
|
}
|
||||||
|
|
||||||
const d = new Date();
|
const d = new Date();
|
||||||
lastRestoreTime = d.getTime();
|
lastRestoreTime = d.getTime();
|
||||||
|
|
||||||
processingData = false;
|
processingData = false;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
instance.on('options', main);
|
instance.on('options', main);
|
||||||
//instance.reconfigure();
|
//instance.reconfigure();
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue