exports.id = 'wsmqttpublish'; exports.title = 'WS MQTT publish'; exports.group = 'MQTT'; exports.color = '#888600'; exports.version = '1.0.2'; exports.icon = 'sign-out'; exports.input = 2; exports.output = 3; exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" }; exports.html = `
Hostname or IP address (if not empty - setting will override db setting)
Port
@(Client id)
@(Username)
`; exports.readme = ` # WS MQTT Publish Version 1.0.3. Added: - database collections, - rpc response `; const { promisifyBuilder } = require('./helper/db_helper'); const { errLogger, monitor } = require('./helper/logger'); const fs = require('fs'); const mqtt = require('mqtt'); const SEND_TO = { debug: 0, rpcCall: 1, services: 2 } //CONFIG let createTelemetryBackup = true; let saveTelemetryOnError = true;//backup_on_failure overrides this value //------------------------ let rollers; if (createTelemetryBackup) rollers = require('streamroller'); const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304 let insertNoSqlCounter = 0; let insertBackupNoSqlCounter = 0; let processingData = false; let backup_on_failure = false;//== saveTelemetryOnError - create backup client send failure let restore_from_backup = 0; //how many rows process at once? let restore_backup_wait = 0;//wait seconds let lastRestoreTime = 0; // if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable let sendClientError = true; process.on('uncaughtException', function(err) { errLogger.error('uncaughtException:', err.message) errLogger.error(err.stack); //TODO //send to service //process.exit(1); }) const nosql = NOSQL('tbdata'); const nosqlBackup = NOSQL('/backup/tbdata'); exports.install = function(instance) { var client; var opts; var clientReady = false; // wsmqtt status for notification purposes on projects.worksys.io database let wsmqttName = null; let sendWsStatusVar = null; let wsmqtt_status = 'disconnected'; function getWsmqttName(host) { if (host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo'; else if (host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01'; else if (host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01'; } function sendWsStatus() { instance.send(SEND_TO.services, { [wsmqttName]: wsmqtt_status }); } function main() { if (!FLOW.dbLoaded) return; loadSettings(); clearInterval(sendWsStatus); sendWsStatusVar = setInterval(sendWsStatus, 180000); } //set opts according to db settings function loadSettings() { if (instance.options.host !== "") { //override settings from database var o = instance.options; opts = { host: o.host, port: o.port, clientId: o.clientid, username: o.username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(o.host); console.log("wsmqttpublich -> loadSettings from instance.options", instance.options); } else { const SETTINGS = FLOW.GLOBALS.settings; backup_on_failure = SETTINGS.backup_on_failure; saveTelemetryOnError = backup_on_failure; restore_from_backup = SETTINGS.restore_from_backup; restore_backup_wait = SETTINGS.restore_backup_wait; let mqtt_host = SETTINGS.mqtt_host; let mqtt_clientid = SETTINGS.mqtt_clientid; let mqtt_username = SETTINGS.mqtt_username; let mqtt_port = SETTINGS.mqtt_port; opts = { host: mqtt_host, port: mqtt_port, keepalive: 10, clientId: mqtt_clientid, username: mqtt_username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(mqtt_host); } connectToTbServer(); } function connectToTbServer() { var url = "mqtt://" + opts.host + ":" + opts.port; console.log("MQTT URL: ", url); client = mqtt.connect(url, opts); client.on('connect', function() { instance.status("Connected", "green"); //monitor.info("MQTT client connected"); sendClientError = true; clientReady = true; wsmqtt_status = 'connected'; }); client.on('reconnect', function() { instance.status("Reconnecting", "yellow"); clientReady = false; }); client.on('message', function(topic, message) { // message is type of buffer message = message.toString(); if (message[0] === '{') { TRY(function() { message = JSON.parse(message); if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, { qos: 1 }); instance.send(SEND_TO.rpcCall, { "device": message.device, "id": message.data.id, "RPC response": { "success": true } }); } }, () => instance.debug('MQTT: Error parsing data', message)); } instance.send(SEND_TO.rpcCall, { "topic": topic, "content": message }); }); client.on('close', function() { clientReady = false; wsmqtt_status = 'disconnected'; instance.status("Disconnected", "red"); instance.send(SEND_TO.debug, { "message": "Client CLOSE signal received !" }); }); client.on('error', function(err) { instance.status("Err: " + err.code, "red"); instance.send(SEND_TO.debug, { "message": "Client ERROR signal received !", "error": err, "opt": opts }); if (sendClientError) { monitor.info('MQTT client error', err); sendClientError = false; } clientReady = false; wsmqtt_status = 'disconnected'; }); } instance.on("0", _ => { main(); }) instance.on('1', function(data) { if (clientReady) { //do we have some data in backup file? if any, process data from database if (saveTelemetryOnError) { //read telemetry data and send back to server if (!processingData) processDataFromDatabase(); } let stringifiedJson = JSON.stringify(data.data); client.publish("v1/gateway/telemetry", stringifiedJson, { qos: 1 }); //backup telemetry if (createTelemetryBackup) { data.data.id = UID(); nosqlBackup.insert(data.data); insertBackupNoSqlCounter++; if (insertBackupNoSqlCounter > 150) { let options = { compress: true }; let path = __dirname + "/../databases/backup/tbdata.nosql"; var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options); stream.write(""); stream.end(); insertBackupNoSqlCounter = 0; } } } else { //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data)); instance.send(SEND_TO.debug, { "message": "Client unavailable. Data not sent !", "data": data.data }); if (saveTelemetryOnError) { //create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql makeBackupFromDbFile(); //write to tb data.data.id = UID(); nosql.insert(data.data); } } }); instance.close = function(done) { if (clientReady) { client.end(); clearInterval(sendWsStatusVar); } }; function getDbBackupFileCounter(type) { var files = fs.readdirSync(__dirname + "/../databases"); let counter = 0; for (var i = 0; i < files.length; i++) { if (files[i] == "tbdata.nosql") continue; if (files[i].endsWith(".nosql")) { let pos = files[i].indexOf("."); if (pos > -1) { let fileCounter = counter; let firstDigit = files[i].slice(0, pos); fileCounter = parseInt(firstDigit); if (isNaN(fileCounter)) fileCounter = 0; //console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type); if (type == "max") { if (fileCounter > counter) { counter = fileCounter; } } else if (type == "min") { if (counter == 0) counter = fileCounter; if (fileCounter < counter) { counter = fileCounter; } } } } } if (type == "max") counter++; return counter; } const makeBackupFromDbFile = async () => { if (!saveTelemetryOnError) return; //to avoid large file: tbdata.nosql //init value is 0! if (insertNoSqlCounter > 0) { --insertNoSqlCounter; return; } insertNoSqlCounter = 100; let source = __dirname + "/../databases/tbdata.nosql"; var stats = fs.statSync(source); var fileSizeInBytes = stats.size; if (fileSizeInBytes > noSqlFileSizeLimit) { let counter = 1; counter = getDbBackupFileCounter("max"); let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql"; //make backup file fs.copyFileSync(source, destination); //fs.renameSync(p, p + "." + counter); //clear tbdata.nosql fs.writeFileSync(source, ""); fs.truncateSync(source, 0); } } const processDataFromDatabase = async () => { if (restore_from_backup <= 0) return; //calculate diff const now = new Date(); let currentTime = now.getTime(); let diff = currentTime - lastRestoreTime; if ((diff / 1000) < restore_backup_wait) { //console.log("*********restore_backup_wait", diff, restore_backup_wait); return; } processingData = true; //get filename to process let counter = getDbBackupFileCounter("min"); //we have some backup files let dataBase = 'tbdata'; var nosql; if (counter == 0) dataBase = 'tbdata'; else dataBase = counter + "." + 'tbdata'; nosql = NOSQL(dataBase); //select all data - use limit restore_from_backup let records = await promisifyBuilder(nosql.find().take(restore_from_backup)); for (let i = 0; i < records.length; i++) { if (clientReady) { let item = records[i]; let id = item.id; if (id !== undefined) { //console.log("------------processDataFromDatabase - remove", id, dataBase, i); try { let message = JSON.parse(JSON.stringify(item)); delete message.id; client.publish("v1/gateway/telemetry", JSON.stringify(message), { qos: 1 }); //remove from database await promisifyBuilder(nosql.remove().where("id", id)); } catch (error) { //process error console.log("processDataFromDatabase", error); } } } else { processingData = false; return; } } if (records.length > 0) { //clean backup file if (counter > 0) nosql.clean(); } //no data in db, remove if (records.length == 0) { if (counter > 0) nosql.drop(); } const d = new Date(); lastRestoreTime = d.getTime(); processingData = false; } instance.on('options', main); //instance.reconfigure(); };