citysys-flowserver/flow/wsmqttpublish.js

551 lines
14 KiB
JavaScript

exports.id = 'wsmqttpublish';
exports.title = 'WS MQTT publish';
exports.group = 'MQTT';
exports.color = '#888600';
exports.version = '1.0.2';
exports.icon = 'sign-out';
exports.input = 1;
exports.output = ["red", "white", "blue"];
exports.author = 'Daniel Segeš';
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
exports.npm = ['mqtt'];
exports.html = `<div class="padding">
<div class="row">
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
</div>
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
</div>
</div>
<div class="row">
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
</div>
<div class="col-md-6">
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
</div>
</div>
</div>`;
exports.readme = `
# WS MQTT Publish
Version 1.0.3.
Added:
- database collections,
- rpc response
`;
const instanceSendTo = {
debug: 0,
rpcCall: 1,
services: 2
}
const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
//CONFIG
let useLog4js = true;
let createTelemetryBackup = true;
let saveTelemetryOnError = true;//backup_on_failure overrides this value
//------------------------
var fs = require('fs');
let rollers;
if(createTelemetryBackup) rollers = require('streamroller');
const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304
let insertNoSqlCounter = 0;
let insertBackupNoSqlCounter = 0;
let processingData = false;
let backup_on_failure = false;//== saveTelemetryOnError - create backup broker send failure
let restore_from_backup = 0; //how many rows process at once?
let restore_backup_wait = 0;//wait seconds
let lastRestoreTime = 0;
let errLogger;
let logger;
let monitor;
if(useLog4js)
{
var path = require('path');
var log4js = require("log4js");
log4js.configure({
appenders: {
errLogs: { type: 'file', filename: path.join(__dirname + "/../", 'err.txt') },
monitorLogs: { type: 'file', compress:true, daysToKeep: 2, maxLogSize: 1048576, backups: 1, keepFileExt: true, filename: path.join(__dirname + "/../", 'monitor.txt') },
console: { type: 'console' }
},
categories: {
errLogs: { appenders: ['console', 'errLogs'], level: 'error' },
monitorLogs: { appenders: ['console', 'monitorLogs'], level: 'trace' },
//another: { appenders: ['console'], level: 'trace' },
default: { appenders: ['console'], level: 'trace' }
}
});
errLogger = log4js.getLogger("errLogs");
logger = log4js.getLogger();
monitor = log4js.getLogger("monitorLogs");
//USAGE
//logger.debug("text");
//monitor.info('info');
//errLogger.error("some error");
}
process.on('uncaughtException', function (err) {
if(errLogger)
{
errLogger.error('uncaughtException:', err.message)
errLogger.error(err.stack);
}
//TODO
//send to service
//process.exit(1);
})
const nosql = NOSQL('tbdata');
const nosqlBackup = NOSQL('/backup/tbdata');
exports.install = function(instance) {
var broker;
var opts;
var brokerready = false;
instance.on('options', loadSettings);
mqtt = require('mqtt');
// wsmqtt status for notification purposes on projects.worksys.io database
let wsmqttName = null;
let sendWsStatusVar = null;
let wsmqtt_status = 'disconnected';
function getWsmqttName(host)
{
if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
}
function sendWsStatus()
{
instance.send(instanceSendTo.services, {[wsmqttName]: wsmqtt_status});
}
sendWsStatusVar = setInterval(sendWsStatus, 180000);
//set opts according to db settings
async function loadSettings()
{
if(instance.options.host !== "")
{
//override settings from database
var o = instance.options;
opts = {
host: o.host,
port: o.port,
clientId: o.clientid,
username: o.username,
rejectUnauthorized: false,
resubscribe: false
};
wsmqttName = getWsmqttName(o.host);
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
}
else
{
const dbSettings = TABLE("settings");
let responseSettings = await promisifyBuilder(dbSettings.find());
backup_on_failure = responseSettings[0]["backup_on_failure"];
saveTelemetryOnError = backup_on_failure;
restore_from_backup = responseSettings[0]["restore_from_backup"];
restore_backup_wait = responseSettings[0]["restore_backup_wait"];
let mqtt_host = responseSettings[0]["mqtt_host"];
let mqtt_clientid = responseSettings[0]["mqtt_clientid"];
let mqtt_username = responseSettings[0]["mqtt_username"];
let mqtt_port = responseSettings[0]["mqtt_port"];
console.log("wsmqttpublich -> loadSettings from db", responseSettings[0]);
opts = {
host: mqtt_host,
port: mqtt_port,
keepalive: 10,
clientId: mqtt_clientid,
username: mqtt_username,
rejectUnauthorized: false,
resubscribe: false
};
wsmqttName = getWsmqttName(mqtt_host);
}
connectToTbServer();
}
function connectToTbServer()
{
var url = "mqtt://" + opts.host + ":" + opts.port;
console.log("MQTT URL: ", url);
broker = mqtt.connect(url, opts);
broker.on('connect', function() {
instance.status("Connected", "green");
brokerready = true;
FLOW.OMS_brokerready = brokerready;
wsmqtt_status = 'connected';
});
broker.on('reconnect', function() {
instance.status("Reconnecting", "yellow");
brokerready = false;
FLOW.OMS_brokerready = brokerready;
});
broker.on('message', function(topic, message) {
// message is type of buffer
message = message.toString();
if (message[0] === '{') {
TRY(function() {
message = JSON.parse(message);
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
broker.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
instance.send(instanceSendTo.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
}
}, () => instance.debug('MQTT: Error parsing data', message));
}
instance.send(instanceSendTo.rpcCall, {"topic":topic, "content":message });
});
broker.on('close', function(err) {
brokerready = false;
FLOW.OMS_brokerready = brokerready;
wsmqtt_status = 'disconnected';
if (err && err.toString().indexOf('Error')) {
instance.status("Err: "+err.code, "red");
instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
} else {
instance.status("Disconnected", "red");
instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
}
});
broker.on('error', function(err) {
instance.status("Err: "+ err.code, "red");
instance.send(instanceSendTo.debug, {"message":"Broker ERROR signal received !", "error":err, "opt":opts });
brokerready = false;
FLOW.OMS_brokerready = brokerready;
wsmqtt_status = 'disconnected';
});
//broker = new Broker(opts);
//MQTT_BROKERS.push(broker);
//instance.status('Ready');
}
//set opts accortding to options
/*
instance.reconfigure = function() {
var o = instance.options;
opts = {
host: o.host,
port: o.port,
keepalive: 10,
clientId: o.clientid,
username: o.username,
rejectUnauthorized: false,
resubscribe: false
};
//connectToTbServer();
};
*/
instance.on('data', function(data) {
if (brokerready)
{
//do we have some data in backup file?
//if any, process data from database
if(saveTelemetryOnError)
{
//read telemetry data and send back to server
if(!processingData) processDataFromDatabase();
}
}
if (brokerready)
{
let stringifiedJson = JSON.stringify(data.data);
broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
//backup telemetry
if(createTelemetryBackup)
{
data.data.id = UID();
nosqlBackup.insert(data.data);
insertBackupNoSqlCounter++;
if(insertBackupNoSqlCounter > 150)
{
let options = {compress: true};
let path = __dirname + "/../databases/backup/tbdata.nosql";
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
stream.write("");
stream.end();
insertBackupNoSqlCounter = 0;
}
}
}
else
{
if(logger) logger.debug("Broker unavailable. Data not sent !", data.data);
instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data });
if(saveTelemetryOnError)
{
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
makeBackupFromDbFile();
//write to tb
data.data.id = UID();
nosql.insert(data.data);
}
}
});
instance.close = function(done) {
if (brokerready){
broker.end();
clearInterval(sendWsStatusVar);
}
};
function getDbBackupFileCounter(type)
{
var files = fs.readdirSync(__dirname + "/../databases");
let counter = 0;
for(var i = 0; i < files.length; i++)
{
if(files[i] == "tbdata.nosql") continue;
if(files[i].endsWith(".nosql"))
{
let pos = files[i].indexOf(".");
if(pos > -1)
{
let fileCounter = counter;
let firstDigit = files[i].slice(0, pos);
fileCounter = parseInt(firstDigit);
if (isNaN(fileCounter)) fileCounter = 0;
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
if(type == "max")
{
if(fileCounter > counter)
{
counter = fileCounter;
}
}
else if(type == "min")
{
if(counter == 0) counter = fileCounter;
if(fileCounter < counter)
{
counter = fileCounter;
}
}
}
}
}
if(type == "max") counter++;
return counter;
}
const makeBackupFromDbFile = async () => {
if(!saveTelemetryOnError) return;
//to avoid large file: tbdata.nosql
//init value is 0!
if(insertNoSqlCounter > 0)
{
--insertNoSqlCounter;
return;
}
insertNoSqlCounter = 100;
let source = __dirname + "/../databases/tbdata.nosql";
var stats = fs.statSync(source);
var fileSizeInBytes = stats.size;
if(fileSizeInBytes > noSqlFileSizeLimit)
{
let counter = 1;
counter = getDbBackupFileCounter("max");
let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql";
//make backup file
fs.copyFileSync(source, destination);
//fs.renameSync(p, p + "." + counter);
//clear tbdata.nosql
fs.writeFileSync(source, "");
fs.truncateSync(source, 0);
}
}
const processDataFromDatabase = async () => {
if(restore_from_backup <= 0)
{
return;
}
//calculate diff
const now = new Date();
let currentTime = now.getTime();
let diff = currentTime - lastRestoreTime;
if( (diff / 1000) < restore_backup_wait)
{
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
return;
}
processingData = true;
//get filename to process
let counter = getDbBackupFileCounter("min");
//we have some backup files
let dataBase = 'tbdata';
var nosql;
if(counter == 0) dataBase = 'tbdata';
else dataBase = counter + "." + 'tbdata';
nosql = NOSQL(dataBase);
//select all data - use limit restore_from_backup
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
for(let i = 0; i < records.length; i++)
{
if (brokerready) {
let item = records[i];
let id = item.id;
if(id !== undefined)
{
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
try{
let o = JSON.parse(JSON.stringify(item));
delete o.id;
let message = JSON.stringify(o);
broker.publish("v1/gateway/telemetry", message, {qos:1});
//remove from database
await promisifyBuilder(nosql.remove().where("id", id));
} catch (error) {
//process error
console.log("processDataFromDatabase", error);
}
}
}
else
{
processingData = false;
return;
}
}
if(records.length > 0)
{
//clean backup file
if(counter > 0) nosql.clean();
}
//no data in db, remove
if(records.length == 0)
{
if(counter > 0) nosql.drop();
}
const d = new Date();
lastRestoreTime = d.getTime();
processingData = false;
}
loadSettings();
//instance.on('options', instance.reconfigure);
//instance.reconfigure();
};