448 lines
12 KiB
JavaScript
Executable file
448 lines
12 KiB
JavaScript
Executable file
exports.id = 'wsmqttpublish';
|
|
exports.title = 'WS MQTT publish';
|
|
exports.group = 'MQTT';
|
|
exports.color = '#888600';
|
|
exports.version = '1.0.2';
|
|
exports.icon = 'sign-out';
|
|
exports.input = 2;
|
|
exports.output = 3;
|
|
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
|
|
|
|
exports.html = `<div class="padding">
|
|
<div class="row">
|
|
<div class="col-md-6">
|
|
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
|
</div>
|
|
<div class="col-md-6">
|
|
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
|
</div>
|
|
</div>
|
|
<div class="row">
|
|
<div class="col-md-6">
|
|
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
|
|
</div>
|
|
<div class="col-md-6">
|
|
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
|
|
</div>
|
|
</div>
|
|
</div>`;
|
|
|
|
|
|
exports.readme = `
|
|
# WS MQTT Publish
|
|
|
|
Version 1.0.3.
|
|
|
|
Added:
|
|
- database collections,
|
|
- rpc response
|
|
`;
|
|
|
|
const { promisifyBuilder } = require('./helper/db_helper');
|
|
const { errLogger, monitor } = require('./helper/logger');
|
|
const fs = require('fs');
|
|
const mqtt = require('mqtt');
|
|
|
|
const SEND_TO = {
|
|
debug: 0,
|
|
rpcCall: 1,
|
|
services: 2
|
|
}
|
|
|
|
//CONFIG
|
|
let createTelemetryBackup = true;
|
|
let saveTelemetryOnError = true;//backup_on_failure overrides this value
|
|
//------------------------
|
|
|
|
let rollers;
|
|
if (createTelemetryBackup) rollers = require('streamroller');
|
|
|
|
const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304
|
|
let insertNoSqlCounter = 0;
|
|
let insertBackupNoSqlCounter = 0;
|
|
let processingData = false;
|
|
|
|
let backup_on_failure = false;//== saveTelemetryOnError - create backup client send failure
|
|
let restore_from_backup = 0; //how many rows process at once?
|
|
let restore_backup_wait = 0;//wait seconds
|
|
let lastRestoreTime = 0;
|
|
|
|
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
|
|
let sendClientError = true;
|
|
|
|
process.on('uncaughtException', function(err) {
|
|
|
|
errLogger.error('uncaughtException:', err.message)
|
|
errLogger.error(err.stack);
|
|
|
|
//TODO
|
|
//send to service
|
|
|
|
//process.exit(1);
|
|
})
|
|
|
|
const nosql = NOSQL('tbdata');
|
|
const nosqlBackup = NOSQL('/backup/tbdata');
|
|
|
|
|
|
exports.install = function(instance) {
|
|
|
|
var client;
|
|
var opts;
|
|
var clientReady = false;
|
|
|
|
// wsmqtt status for notification purposes on projects.worksys.io database
|
|
let wsmqttName = null;
|
|
let sendWsStatusVar = null;
|
|
let wsmqtt_status = 'disconnected';
|
|
|
|
function getWsmqttName(host) {
|
|
if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
|
|
else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
|
|
else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
|
|
}
|
|
|
|
function sendWsStatus() {
|
|
instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status });
|
|
}
|
|
|
|
|
|
function main() {
|
|
if (!FLOW.dbLoaded) return;
|
|
|
|
loadSettings();
|
|
clearInterval(sendWsStatus);
|
|
sendWsStatusVar = setInterval(sendWsStatus, 180000);
|
|
}
|
|
|
|
//set opts according to db settings
|
|
function loadSettings() {
|
|
|
|
if (instance.options.host !== "") {
|
|
//override settings from database
|
|
var o = instance.options;
|
|
opts = {
|
|
host: o.host,
|
|
port: o.port,
|
|
clientId: o.clientid,
|
|
username: o.username,
|
|
rejectUnauthorized: false,
|
|
resubscribe: false
|
|
};
|
|
|
|
wsmqttName = getWsmqttName(o.host);
|
|
|
|
console.log("wsmqttpublich -> loadSettings from instance.options", instance.options);
|
|
}
|
|
else {
|
|
|
|
const SETTINGS = FLOW.GLOBALS.settings;
|
|
backup_on_failure = SETTINGS.backup_on_failure;
|
|
saveTelemetryOnError = backup_on_failure;
|
|
|
|
restore_from_backup = SETTINGS.restore_from_backup;
|
|
restore_backup_wait = SETTINGS.restore_backup_wait;
|
|
|
|
let mqtt_host = SETTINGS.mqtt_host;
|
|
let mqtt_clientid = SETTINGS.mqtt_clientid;
|
|
let mqtt_username = SETTINGS.mqtt_username;
|
|
let mqtt_port = SETTINGS.mqtt_port;
|
|
|
|
opts = {
|
|
host: mqtt_host,
|
|
port: mqtt_port,
|
|
keepalive: 10,
|
|
clientId: mqtt_clientid,
|
|
username: mqtt_username,
|
|
rejectUnauthorized: false,
|
|
resubscribe: false
|
|
};
|
|
|
|
wsmqttName = getWsmqttName(mqtt_host);
|
|
}
|
|
|
|
connectToTbServer();
|
|
}
|
|
|
|
function connectToTbServer() {
|
|
var url = "mqtt://" + opts.host + ":" + opts.port;
|
|
console.log("MQTT URL: ", url);
|
|
|
|
client = mqtt.connect(url, opts);
|
|
|
|
client.on('connect', function() {
|
|
instance.status("Connected", "green");
|
|
//monitor.info("MQTT client connected");
|
|
|
|
sendClientError = true;
|
|
clientReady = true;
|
|
wsmqtt_status = 'connected';
|
|
});
|
|
|
|
client.on('reconnect', function() {
|
|
instance.status("Reconnecting", "yellow");
|
|
clientReady = false;
|
|
});
|
|
|
|
client.on('message', function(topic, message) {
|
|
// message is type of buffer
|
|
message = message.toString();
|
|
if (message[0] === '{') {
|
|
TRY(function() {
|
|
|
|
message = JSON.parse(message);
|
|
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
|
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 });
|
|
instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } });
|
|
}
|
|
|
|
}, () => instance.debug('MQTT: Error parsing data', message));
|
|
}
|
|
|
|
instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message });
|
|
});
|
|
|
|
client.on('close', function() {
|
|
clientReady = false;
|
|
wsmqtt_status = 'disconnected';
|
|
|
|
instance.status("Disconnected", "red");
|
|
instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" });
|
|
});
|
|
|
|
client.on('error', function(err) {
|
|
instance.status("Err: " + err.code, "red");
|
|
instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts });
|
|
if (sendClientError) {
|
|
monitor.info('MQTT client error', err);
|
|
sendClientError = false;
|
|
}
|
|
clientReady = false;
|
|
wsmqtt_status = 'disconnected';
|
|
});
|
|
|
|
}
|
|
|
|
|
|
instance.on("0", _ => {
|
|
main();
|
|
})
|
|
|
|
|
|
instance.on('1', function(data) {
|
|
|
|
if (clientReady) {
|
|
//do we have some data in backup file? if any, process data from database
|
|
if (saveTelemetryOnError) {
|
|
//read telemetry data and send back to server
|
|
if (!processingData) processDataFromDatabase();
|
|
}
|
|
|
|
let stringifiedJson = JSON.stringify(data.data);
|
|
client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 });
|
|
|
|
//backup telemetry
|
|
if (createTelemetryBackup) {
|
|
data.data.id = UID();
|
|
nosqlBackup.insert(data.data);
|
|
|
|
insertBackupNoSqlCounter++;
|
|
if (insertBackupNoSqlCounter > 150) {
|
|
let options = { compress: true };
|
|
let path = __dirname + "/../databases/backup/tbdata.nosql";
|
|
var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options);
|
|
stream.write("");
|
|
stream.end();
|
|
|
|
insertBackupNoSqlCounter = 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
else {
|
|
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
|
instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data });
|
|
|
|
if (saveTelemetryOnError) {
|
|
//create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql
|
|
makeBackupFromDbFile();
|
|
|
|
//write to tb
|
|
data.data.id = UID();
|
|
nosql.insert(data.data);
|
|
}
|
|
}
|
|
});
|
|
|
|
|
|
instance.close = function(done) {
|
|
if (clientReady) {
|
|
client.end();
|
|
clearInterval(sendWsStatusVar);
|
|
}
|
|
};
|
|
|
|
|
|
function getDbBackupFileCounter(type) {
|
|
var files = fs.readdirSync(__dirname + "/../databases");
|
|
|
|
let counter = 0;
|
|
for (var i = 0; i < files.length; i++) {
|
|
|
|
if (files[i] == "tbdata.nosql") continue;
|
|
|
|
if (files[i].endsWith(".nosql")) {
|
|
|
|
let pos = files[i].indexOf(".");
|
|
if (pos > -1) {
|
|
|
|
let fileCounter = counter;
|
|
let firstDigit = files[i].slice(0, pos);
|
|
|
|
fileCounter = parseInt(firstDigit);
|
|
if (isNaN(fileCounter)) fileCounter = 0;
|
|
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
|
|
|
if (type == "max") {
|
|
if (fileCounter > counter) {
|
|
counter = fileCounter;
|
|
}
|
|
}
|
|
else if (type == "min") {
|
|
if (counter == 0) counter = fileCounter;
|
|
|
|
if (fileCounter < counter) {
|
|
counter = fileCounter;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if (type == "max") counter++;
|
|
|
|
return counter;
|
|
}
|
|
|
|
const makeBackupFromDbFile = async () => {
|
|
|
|
if (!saveTelemetryOnError) return;
|
|
|
|
//to avoid large file: tbdata.nosql
|
|
|
|
//init value is 0!
|
|
if (insertNoSqlCounter > 0) {
|
|
--insertNoSqlCounter;
|
|
return;
|
|
}
|
|
|
|
insertNoSqlCounter = 100;
|
|
|
|
let source = __dirname + "/../databases/tbdata.nosql";
|
|
|
|
var stats = fs.statSync(source);
|
|
var fileSizeInBytes = stats.size;
|
|
|
|
if (fileSizeInBytes > noSqlFileSizeLimit) {
|
|
|
|
let counter = 1;
|
|
counter = getDbBackupFileCounter("max");
|
|
|
|
let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql";
|
|
|
|
//make backup file
|
|
fs.copyFileSync(source, destination);
|
|
//fs.renameSync(p, p + "." + counter);
|
|
|
|
//clear tbdata.nosql
|
|
fs.writeFileSync(source, "");
|
|
fs.truncateSync(source, 0);
|
|
|
|
}
|
|
}
|
|
|
|
const processDataFromDatabase = async () => {
|
|
|
|
if (restore_from_backup <= 0) return;
|
|
|
|
//calculate diff
|
|
const now = new Date();
|
|
let currentTime = now.getTime();
|
|
let diff = currentTime - lastRestoreTime;
|
|
|
|
if ((diff / 1000) < restore_backup_wait) {
|
|
//console.log("*********restore_backup_wait", diff, restore_backup_wait);
|
|
return;
|
|
}
|
|
|
|
processingData = true;
|
|
|
|
//get filename to process
|
|
let counter = getDbBackupFileCounter("min");
|
|
|
|
//we have some backup files
|
|
let dataBase = 'tbdata';
|
|
|
|
var nosql;
|
|
if (counter == 0) dataBase = 'tbdata';
|
|
else dataBase = counter + "." + 'tbdata';
|
|
|
|
nosql = NOSQL(dataBase);
|
|
|
|
//select all data - use limit restore_from_backup
|
|
let records = await promisifyBuilder(nosql.find().take(restore_from_backup));
|
|
|
|
for (let i = 0; i < records.length; i++) {
|
|
if (clientReady) {
|
|
|
|
let item = records[i];
|
|
let id = item.id;
|
|
|
|
if (id !== undefined) {
|
|
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
|
|
|
try {
|
|
|
|
let message = JSON.parse(JSON.stringify(item));
|
|
delete message.id;
|
|
|
|
client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 });
|
|
|
|
//remove from database
|
|
await promisifyBuilder(nosql.remove().where("id", id));
|
|
|
|
} catch (error) {
|
|
//process error
|
|
console.log("processDataFromDatabase", error);
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
else {
|
|
processingData = false;
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (records.length > 0) {
|
|
//clean backup file
|
|
if (counter > 0) nosql.clean();
|
|
}
|
|
|
|
//no data in db, remove
|
|
if (records.length == 0) {
|
|
if (counter > 0) nosql.drop();
|
|
}
|
|
|
|
const d = new Date();
|
|
lastRestoreTime = d.getTime();
|
|
|
|
processingData = false;
|
|
|
|
}
|
|
|
|
instance.on('options', main);
|
|
//instance.reconfigure();
|
|
};
|