exports.id = 'wsmqttpublish'; exports.title = 'WS MQTT publish'; exports.group = 'MQTT'; exports.color = '#888600'; exports.version = '1.0.2'; exports.icon = 'sign-out'; exports.input = 2; exports.output = ["red", "white", "blue"]; exports.author = 'Daniel Segeš'; exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" }; exports.html = `
Hostname or IP address (if not empty - setting will override db setting)
Port
@(Client id)
@(Username)
`; exports.readme = ` # WS MQTT Publish Version 1.0.3. Added: - database collections, - rpc response `; const { promisifyBuilder } = require('./helper/db_helper'); const { errLogger, monitor } = require('./helper/logger'); const fs = require('fs'); const mqtt = require('mqtt'); const SEND_TO = { debug: 0, rpcCall: 1, services: 2 } //CONFIG let createTelemetryBackup = true; let saveTelemetryOnError = true;//backup_on_failure overrides this value //------------------------ let rollers; if(createTelemetryBackup) rollers = require('streamroller'); const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304 let insertNoSqlCounter = 0; let insertBackupNoSqlCounter = 0; let processingData = false; let backup_on_failure = false;//== saveTelemetryOnError - create backup client send failure let restore_from_backup = 0; //how many rows process at once? let restore_backup_wait = 0;//wait seconds let lastRestoreTime = 0; // if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable let sendClientError = true; process.on('uncaughtException', function (err) { errLogger.error('uncaughtException:', err.message) errLogger.error(err.stack); //TODO //send to service //process.exit(1); }) const nosql = NOSQL('tbdata'); const nosqlBackup = NOSQL('/backup/tbdata'); exports.install = function(instance) { var client; var opts; var clientReady = false; // wsmqtt status for notification purposes on projects.worksys.io database let wsmqttName = null; let sendWsStatusVar = null; let wsmqtt_status = 'disconnected'; function getWsmqttName(host) { if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo'; else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01'; else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01'; } function sendWsStatus() { instance.send(SEND_TO.services, {[wsmqttName]: wsmqtt_status}); } function main() { if(!FLOW.dbLoaded) return; loadSettings(); clearInterval(sendWsStatus); sendWsStatusVar = setInterval(sendWsStatus, 180000); } //set opts according to db settings function loadSettings() { if(instance.options.host !== "") { //override settings from database var o = instance.options; opts = { host: o.host, port: o.port, clientId: o.clientid, username: o.username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(o.host); console.log("wsmqttpublich -> loadSettings from instance.options", instance.options); } else { const SETTINGS = FLOW.GLOBALS.settings; backup_on_failure = SETTINGS.backup_on_failure; saveTelemetryOnError = backup_on_failure; restore_from_backup = SETTINGS.restore_from_backup; restore_backup_wait = SETTINGS.restore_backup_wait; let mqtt_host = SETTINGS.mqtt_host; let mqtt_clientid = SETTINGS.mqtt_clientid; let mqtt_username = SETTINGS.mqtt_username; let mqtt_port = SETTINGS.mqtt_port; opts = { host: mqtt_host, port: mqtt_port, keepalive: 10, clientId: mqtt_clientid, username: mqtt_username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(mqtt_host); } connectToTbServer(); } function connectToTbServer() { var url = "mqtt://" + opts.host + ":" + opts.port; console.log("MQTT URL: ", url); client = mqtt.connect(url, opts); client.on('connect', function() { instance.status("Connected", "green"); monitor.info("MQTT client connected"); sendClientError = true; clientReady = true; wsmqtt_status = 'connected'; }); client.on('reconnect', function() { instance.status("Reconnecting", "yellow"); clientReady = false; }); client.on('message', function(topic, message) { // message is type of buffer message = message.toString(); if (message[0] === '{') { TRY(function() { message = JSON.parse(message); console.log("ooooo------x", message); if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1}); instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}}); } }, () => instance.debug('MQTT: Error parsing data', message)); } instance.send(SEND_TO.rpcCall, {"topic":topic, "content":message }); }); client.on('close', function(err) { clientReady = false; wsmqtt_status = 'disconnected'; if (err && err.toString().indexOf('Error')) { instance.status("Err: "+err.code, "red"); instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts }); } else { instance.status("Disconnected", "red"); instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts }); } client.reconnect(); }); client.on('error', function(err) { instance.status("Err: "+ err.code, "red"); instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts }); if(sendClientError) { monitor.info('MQTT client error', err); sendClientError = false; } clientReady = false; wsmqtt_status = 'disconnected'; }); } instance.on("0", _ => { main(); }) instance.on('1', function(data) { if(clientReady) { //do we have some data in backup file? //if any, process data from database if(saveTelemetryOnError) { //read telemetry data and send back to server if(!processingData) processDataFromDatabase(); } } if(clientReady) { let stringifiedJson = JSON.stringify(data.data); client.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1}); //backup telemetry if(createTelemetryBackup) { data.data.id = UID(); nosqlBackup.insert(data.data); insertBackupNoSqlCounter++; if(insertBackupNoSqlCounter > 150) { let options = {compress: true}; let path = __dirname + "/../databases/backup/tbdata.nosql"; var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options); stream.write(""); stream.end(); insertBackupNoSqlCounter = 0; } } } else { //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data)); instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data }); if(saveTelemetryOnError) { //create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql makeBackupFromDbFile(); //write to tb data.data.id = UID(); nosql.insert(data.data); } } }); instance.close = function(done) { if(clientReady){ client.end(); clearInterval(sendWsStatusVar); } }; function getDbBackupFileCounter(type) { var files = fs.readdirSync(__dirname + "/../databases"); let counter = 0; for(var i = 0; i < files.length; i++) { if(files[i] == "tbdata.nosql") continue; if(files[i].endsWith(".nosql")) { let pos = files[i].indexOf("."); if(pos > -1) { let fileCounter = counter; let firstDigit = files[i].slice(0, pos); fileCounter = parseInt(firstDigit); if(isNaN(fileCounter)) fileCounter = 0; //console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type); if(type == "max") { if(fileCounter > counter) { counter = fileCounter; } } else if(type == "min") { if(counter == 0) counter = fileCounter; if(fileCounter < counter) { counter = fileCounter; } } } } } if(type == "max") counter++; return counter; } const makeBackupFromDbFile = async () => { if(!saveTelemetryOnError) return; //to avoid large file: tbdata.nosql //init value is 0! if(insertNoSqlCounter > 0) { --insertNoSqlCounter; return; } insertNoSqlCounter = 100; let source = __dirname + "/../databases/tbdata.nosql"; var stats = fs.statSync(source); var fileSizeInBytes = stats.size; if(fileSizeInBytes > noSqlFileSizeLimit) { let counter = 1; counter = getDbBackupFileCounter("max"); let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql"; //make backup file fs.copyFileSync(source, destination); //fs.renameSync(p, p + "." + counter); //clear tbdata.nosql fs.writeFileSync(source, ""); fs.truncateSync(source, 0); } } const processDataFromDatabase = async () => { if(restore_from_backup <= 0) return; //calculate diff const now = new Date(); let currentTime = now.getTime(); let diff = currentTime - lastRestoreTime; if( (diff / 1000) < restore_backup_wait) { //console.log("*********restore_backup_wait", diff, restore_backup_wait); return; } processingData = true; //get filename to process let counter = getDbBackupFileCounter("min"); //we have some backup files let dataBase = 'tbdata'; var nosql; if(counter == 0) dataBase = 'tbdata'; else dataBase = counter + "." + 'tbdata'; nosql = NOSQL(dataBase); //select all data - use limit restore_from_backup let records = await promisifyBuilder(nosql.find().take(restore_from_backup)); for(let i = 0; i < records.length; i++) { if(clientReady) { let item = records[i]; let id = item.id; if(id !== undefined) { //console.log("------------processDataFromDatabase - remove", id, dataBase, i); try { let o = JSON.parse(JSON.stringify(item)); delete o.id; let message = JSON.stringify(o); client.publish("v1/gateway/telemetry", message, {qos:1}); //remove from database await promisifyBuilder(nosql.remove().where("id", id)); } catch(error) { //process error console.log("processDataFromDatabase", error); } } } else { processingData = false; return; } } if(records.length > 0) { //clean backup file if(counter > 0) nosql.clean(); } //no data in db, remove if(records.length == 0) { if(counter > 0) nosql.drop(); } const d = new Date(); lastRestoreTime = d.getTime(); processingData = false; } instance.on('options', main); //instance.reconfigure(); };