Handle contactors separately in daily_report
This commit is contained in:
parent
7093d765ec
commit
c5c5b21f47
10 changed files with 427 additions and 242 deletions
|
|
@ -63,36 +63,39 @@ exports.install = function(instance) {
|
|||
var opts;
|
||||
var clientReady = false;
|
||||
|
||||
let o = null; //options
|
||||
let o = {}; //options
|
||||
|
||||
function main()
|
||||
{
|
||||
function main() {
|
||||
loadSettings();
|
||||
}
|
||||
|
||||
//set opts according to db settings
|
||||
function loadSettings()
|
||||
{
|
||||
function loadSettings() {
|
||||
|
||||
o = instance.options;
|
||||
if(!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic;
|
||||
if (!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic;
|
||||
|
||||
opts = {
|
||||
host: o.host,
|
||||
port: o.port,
|
||||
clientId: o.clientid,
|
||||
username: o.username,
|
||||
rejectUnauthorized: false,
|
||||
resubscribe: false
|
||||
host: o.host,
|
||||
port: o.port,
|
||||
clientId: o.clientid,
|
||||
username: o.username,
|
||||
rejectUnauthorized: false,
|
||||
resubscribe: false
|
||||
};
|
||||
|
||||
console.log("wsmqttpublich -> loadSettings from instance.options",o);
|
||||
console.log("wsmqttpublich -> loadSettings from instance.options", o);
|
||||
|
||||
if (!o.topic) {
|
||||
instance.status("Not configured", "white");
|
||||
console.log("Cloud mqtt connect: no topic selected");
|
||||
return;
|
||||
}
|
||||
|
||||
connectToTbServer();
|
||||
}
|
||||
|
||||
function connectToTbServer()
|
||||
{
|
||||
function connectToTbServer() {
|
||||
var url = "mqtt://" + opts.host + ":" + opts.port;
|
||||
console.log("MQTT URL: ", url);
|
||||
|
||||
|
|
@ -121,30 +124,30 @@ exports.install = function(instance) {
|
|||
TRY(function() {
|
||||
|
||||
message = JSON.parse(message);
|
||||
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
||||
client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
|
||||
instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
|
||||
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
||||
client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
|
||||
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
|
||||
}
|
||||
|
||||
}, () => instance.debug('MQTT: Error parsing data', message));
|
||||
}
|
||||
|
||||
instance.send(SEND_TO.rpcCall, {"topic":o.topic, "content":message });
|
||||
instance.send(SEND_TO.rpcCall, { "topic": o.topic, "content": message });
|
||||
});
|
||||
|
||||
client.on('close', function() {
|
||||
clientReady = false;
|
||||
|
||||
instance.status("Disconnected", "red");
|
||||
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !"});
|
||||
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
|
||||
});
|
||||
|
||||
client.on('error', function(err) {
|
||||
instance.status("Err: "+ err.code, "red");
|
||||
instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
|
||||
if(sendClientError) {
|
||||
console.log('MQTT client error', err);
|
||||
sendClientError = false;
|
||||
instance.status("Err: " + err.code, "red");
|
||||
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
|
||||
if (sendClientError) {
|
||||
console.log('MQTT client error', err);
|
||||
sendClientError = false;
|
||||
}
|
||||
clientReady = false;
|
||||
});
|
||||
|
|
@ -154,25 +157,21 @@ exports.install = function(instance) {
|
|||
|
||||
instance.on('0', function(data) {
|
||||
|
||||
if(clientReady)
|
||||
{
|
||||
//do we have some data in backup file? if any, process data from database
|
||||
if(saveTelemetryOnError)
|
||||
{
|
||||
if (clientReady) {
|
||||
//do we have some data in backup file? if any, process data from database
|
||||
if (saveTelemetryOnError) {
|
||||
//read telemetry data and send back to server
|
||||
if(!processingData) processDataFromDatabase();
|
||||
if (!processingData) processDataFromDatabase();
|
||||
}
|
||||
|
||||
let stringifiedJson = JSON.stringify(data.data)
|
||||
client.publish(`${o.topic}_forward`, stringifiedJson, {qos: 1});
|
||||
}
|
||||
else
|
||||
{
|
||||
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
||||
instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
|
||||
|
||||
if(saveTelemetryOnError)
|
||||
{
|
||||
let stringifiedJson = JSON.stringify(data.data)
|
||||
client.publish(`${o.topic}_forward`, stringifiedJson, { qos: 1 });
|
||||
}
|
||||
else {
|
||||
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
||||
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
|
||||
|
||||
if (saveTelemetryOnError && o.topic) {
|
||||
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
||||
makeBackupFromDbFile();
|
||||
|
||||
|
|
@ -188,71 +187,62 @@ exports.install = function(instance) {
|
|||
})
|
||||
|
||||
instance.close = function(done) {
|
||||
if(clientReady){
|
||||
if (clientReady) {
|
||||
client.end();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
function getDbBackupFileCounter(type)
|
||||
{
|
||||
|
||||
function getDbBackupFileCounter(type) {
|
||||
var files = fs.readdirSync(__dirname + "/../databases");
|
||||
|
||||
let counter = 0;
|
||||
for(var i = 0; i < files.length; i++)
|
||||
{
|
||||
for (var i = 0; i < files.length; i++) {
|
||||
|
||||
if(files[i] == "tbdatacloud.nosql") continue;
|
||||
if (files[i] == "tbdatacloud.nosql") continue;
|
||||
|
||||
if(files[i].endsWith(".nosql"))
|
||||
{
|
||||
if (files[i].endsWith(".nosql")) {
|
||||
|
||||
let pos = files[i].indexOf(".");
|
||||
if(pos > -1)
|
||||
{
|
||||
if (pos > -1) {
|
||||
|
||||
let fileCounter = counter;
|
||||
let firstDigit = files[i].slice(0, pos);
|
||||
|
||||
fileCounter = parseInt(firstDigit);
|
||||
if(isNaN(fileCounter)) fileCounter = 0;
|
||||
if (isNaN(fileCounter)) fileCounter = 0;
|
||||
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
||||
|
||||
if(type == "max")
|
||||
{
|
||||
if(fileCounter > counter)
|
||||
{
|
||||
|
||||
if (type == "max") {
|
||||
if (fileCounter > counter) {
|
||||
counter = fileCounter;
|
||||
}
|
||||
}
|
||||
else if(type == "min")
|
||||
{
|
||||
if(counter == 0) counter = fileCounter;
|
||||
else if (type == "min") {
|
||||
if (counter == 0) counter = fileCounter;
|
||||
|
||||
if(fileCounter < counter)
|
||||
{
|
||||
if (fileCounter < counter) {
|
||||
counter = fileCounter;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
if(type == "max") counter++;
|
||||
|
||||
if (type == "max") counter++;
|
||||
|
||||
return counter;
|
||||
}
|
||||
|
||||
const makeBackupFromDbFile = async () => {
|
||||
|
||||
if(!saveTelemetryOnError) return;
|
||||
if (!saveTelemetryOnError) return;
|
||||
|
||||
//to avoid large file: tbdata.nosql
|
||||
|
||||
//init value is 0!
|
||||
if(insertNoSqlCounter > 0)
|
||||
{
|
||||
if (insertNoSqlCounter > 0) {
|
||||
--insertNoSqlCounter;
|
||||
return;
|
||||
}
|
||||
|
|
@ -264,8 +254,7 @@ exports.install = function(instance) {
|
|||
var stats = fs.statSync(source);
|
||||
var fileSizeInBytes = stats.size;
|
||||
|
||||
if(fileSizeInBytes > noSqlFileSizeLimit)
|
||||
{
|
||||
if (fileSizeInBytes > noSqlFileSizeLimit) {
|
||||
|
||||
let counter = 1;
|
||||
counter = getDbBackupFileCounter("max");
|
||||
|
|
@ -279,21 +268,20 @@ exports.install = function(instance) {
|
|||
//clear tbdata.nosql
|
||||
fs.writeFileSync(source, "");
|
||||
fs.truncateSync(source, 0);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
const processDataFromDatabase = async () => {
|
||||
|
||||
if(restore_from_backup <= 0) return;
|
||||
|
||||
if (restore_from_backup <= 0) return;
|
||||
|
||||
//calculate diff
|
||||
const now = new Date();
|
||||
let currentTime = now.getTime();
|
||||
let diff = currentTime - lastRestoreTime;
|
||||
|
||||
if( (diff / 1000) < restore_backup_wait)
|
||||
{
|
||||
if ((diff / 1000) < restore_backup_wait) {
|
||||
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
||||
return;
|
||||
}
|
||||
|
|
@ -307,59 +295,54 @@ exports.install = function(instance) {
|
|||
let dataBase = 'tbdata';
|
||||
|
||||
var nosql;
|
||||
if(counter == 0) dataBase = 'tbdatacloud';
|
||||
if (counter == 0) dataBase = 'tbdatacloud';
|
||||
else dataBase = counter + "." + 'tbdatacloud';
|
||||
|
||||
nosql = NOSQL(dataBase);
|
||||
|
||||
//select all data - use limit restore_from_backup
|
||||
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
||||
|
||||
for(let i = 0; i < records.length; i++)
|
||||
{
|
||||
if(clientReady) {
|
||||
|
||||
for (let i = 0; i < records.length; i++) {
|
||||
if (clientReady) {
|
||||
|
||||
let item = records[i];
|
||||
let id = item.id;
|
||||
|
||||
if(id !== undefined)
|
||||
{
|
||||
if (id !== undefined) {
|
||||
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
||||
|
||||
try {
|
||||
|
||||
let message = JSON.parse(JSON.stringify(item));
|
||||
delete message.id;
|
||||
client.publish(`${o.topic}_forward`, JSON.stringify(message), {qos:1});
|
||||
|
||||
client.publish(`${o.topic}_forward`, JSON.stringify(message), { qos: 1 });
|
||||
|
||||
//remove from database
|
||||
await promisifyBuilder(nosql.remove().where("id", id));
|
||||
|
||||
} catch(error) {
|
||||
} catch (error) {
|
||||
//process error
|
||||
console.log("processDataFromDatabase", error);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
else {
|
||||
processingData = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if(records.length > 0)
|
||||
{
|
||||
if (records.length > 0) {
|
||||
//clean backup file
|
||||
if(counter > 0) nosql.clean();
|
||||
if (counter > 0) nosql.clean();
|
||||
}
|
||||
|
||||
//no data in db, remove
|
||||
if(records.length == 0)
|
||||
{
|
||||
if(counter > 0) nosql.drop();
|
||||
if (records.length == 0) {
|
||||
if (counter > 0) nosql.drop();
|
||||
}
|
||||
|
||||
const d = new Date();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue