exports.id = 'wsmqttpublish'; exports.title = 'WS MQTT publish'; exports.group = 'MQTT'; exports.color = '#888600'; exports.version = '1.0.2'; exports.icon = 'sign-out'; exports.input = 1; exports.output = ["red", "white", "blue"]; exports.author = 'Daniel Segeš'; exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" }; exports.npm = ['mqtt']; exports.html = `
Hostname or IP address (if not empty - setting will override db setting)
Port
@(Client id)
@(Username)
`; exports.readme = ` # WS MQTT Publish Version 1.0.3. Added: - database collections, - rpc response `; const instanceSendTo = { debug: 0, rpcCall: 1, services: 2 } const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js'); //CONFIG let useLog4js = true; let createTelemetryBackup = true; let saveTelemetryOnError = true;//backup_on_failure overrides this value //------------------------ var fs = require('fs'); let rollers; if(createTelemetryBackup) rollers = require('streamroller'); const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304 let insertNoSqlCounter = 0; let insertBackupNoSqlCounter = 0; let processingData = false; let backup_on_failure = false;//== saveTelemetryOnError - create backup broker send failure let restore_from_backup = 0; //how many rows process at once? let restore_backup_wait = 0;//wait seconds let lastRestoreTime = 0; let errLogger; let logger; let monitor; if(useLog4js) { var path = require('path'); var log4js = require("log4js"); log4js.configure({ appenders: { errLogs: { type: 'file', filename: path.join(__dirname + "/../", 'err.txt') }, monitorLogs: { type: 'file', compress:true, daysToKeep: 2, maxLogSize: 1048576, backups: 1, keepFileExt: true, filename: path.join(__dirname + "/../", 'monitor.txt') }, console: { type: 'console' } }, categories: { errLogs: { appenders: ['console', 'errLogs'], level: 'error' }, monitorLogs: { appenders: ['console', 'monitorLogs'], level: 'trace' }, //another: { appenders: ['console'], level: 'trace' }, default: { appenders: ['console'], level: 'trace' } } }); errLogger = log4js.getLogger("errLogs"); logger = log4js.getLogger(); monitor = log4js.getLogger("monitorLogs"); //USAGE //logger.debug("text"); //monitor.info('info'); //errLogger.error("some error"); } process.on('uncaughtException', function (err) { if(errLogger) { errLogger.error('uncaughtException:', err.message) errLogger.error(err.stack); } //TODO //send to service //process.exit(1); }) const nosql = NOSQL('tbdata'); const nosqlBackup = NOSQL('/backup/tbdata'); exports.install = function(instance) { var broker; var opts; var brokerready = false; instance.on('options', loadSettings); mqtt = require('mqtt'); // wsmqtt status for notification purposes on projects.worksys.io database let wsmqttName = null; let sendWsStatusVar = null; let wsmqtt_status = 'disconnected'; function getWsmqttName(host) { if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo'; else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01'; else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01'; } function sendWsStatus() { instance.send(instanceSendTo.services, {[wsmqttName]: wsmqtt_status}); } sendWsStatusVar = setInterval(sendWsStatus, 180000); //set opts according to db settings async function loadSettings() { if(instance.options.host !== "") { //override settings from database var o = instance.options; opts = { host: o.host, port: o.port, clientId: o.clientid, username: o.username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(o.host); console.log("wsmqttpublich -> loadSettings from instance.options", instance.options); } else { const dbSettings = TABLE("settings"); let responseSettings = await promisifyBuilder(dbSettings.find()); backup_on_failure = responseSettings[0]["backup_on_failure"]; saveTelemetryOnError = backup_on_failure; restore_from_backup = responseSettings[0]["restore_from_backup"]; restore_backup_wait = responseSettings[0]["restore_backup_wait"]; let mqtt_host = responseSettings[0]["mqtt_host"]; let mqtt_clientid = responseSettings[0]["mqtt_clientid"]; let mqtt_username = responseSettings[0]["mqtt_username"]; let mqtt_port = responseSettings[0]["mqtt_port"]; console.log("wsmqttpublich -> loadSettings from db", responseSettings[0]); opts = { host: mqtt_host, port: mqtt_port, keepalive: 10, clientId: mqtt_clientid, username: mqtt_username, rejectUnauthorized: false, resubscribe: false }; wsmqttName = getWsmqttName(mqtt_host); } connectToTbServer(); } function connectToTbServer() { var url = "mqtt://" + opts.host + ":" + opts.port; console.log("MQTT URL: ", url); broker = mqtt.connect(url, opts); broker.on('connect', function() { instance.status("Connected", "green"); monitor.info("MQTT broker connected"); brokerready = true; FLOW.OMS_brokerready = brokerready; wsmqtt_status = 'connected'; }); broker.on('reconnect', function() { instance.status("Reconnecting", "yellow"); brokerready = false; FLOW.OMS_brokerready = brokerready; }); broker.on('message', function(topic, message) { // message is type of buffer message = message.toString(); if (message[0] === '{') { TRY(function() { message = JSON.parse(message); if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { broker.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1}); instance.send(instanceSendTo.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}}); } }, () => instance.debug('MQTT: Error parsing data', message)); } instance.send(instanceSendTo.rpcCall, {"topic":topic, "content":message }); }); broker.on('close', function(err) { brokerready = false; FLOW.OMS_brokerready = brokerready; wsmqtt_status = 'disconnected'; if (err && err.toString().indexOf('Error')) { instance.status("Err: "+err.code, "red"); instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts }); } else { instance.status("Disconnected", "red"); instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts }); } broker.reconnect(); }); broker.on('error', function(err) { instance.status("Err: "+ err.code, "red"); instance.send(instanceSendTo.debug, {"message":"Broker ERROR signal received !", "error":err, "opt":opts }); monitor.info('MQTT broker error', err); brokerready = false; FLOW.OMS_brokerready = brokerready; wsmqtt_status = 'disconnected'; }); } instance.on('data', function(data) { if (brokerready) { //do we have some data in backup file? //if any, process data from database if(saveTelemetryOnError) { //read telemetry data and send back to server if(!processingData) processDataFromDatabase(); } } if (brokerready) { let stringifiedJson = JSON.stringify(data.data); broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1}); //backup telemetry if(createTelemetryBackup) { data.data.id = UID(); nosqlBackup.insert(data.data); insertBackupNoSqlCounter++; if(insertBackupNoSqlCounter > 150) { let options = {compress: true}; let path = __dirname + "/../databases/backup/tbdata.nosql"; var stream = new rollers.RollingFileStream(path, noSqlFileSizeLimit, 150, options); stream.write(""); stream.end(); insertBackupNoSqlCounter = 0; } } } else { if(logger) logger.debug("Broker unavailable. Data not sent !", data.data); instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data }); if(saveTelemetryOnError) { //create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql makeBackupFromDbFile(); //write to tb data.data.id = UID(); nosql.insert(data.data); } } }); instance.close = function(done) { if (brokerready){ broker.end(); clearInterval(sendWsStatusVar); } }; function getDbBackupFileCounter(type) { var files = fs.readdirSync(__dirname + "/../databases"); let counter = 0; for(var i = 0; i < files.length; i++) { if(files[i] == "tbdata.nosql") continue; if(files[i].endsWith(".nosql")) { let pos = files[i].indexOf("."); if(pos > -1) { let fileCounter = counter; let firstDigit = files[i].slice(0, pos); fileCounter = parseInt(firstDigit); if (isNaN(fileCounter)) fileCounter = 0; //console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type); if(type == "max") { if(fileCounter > counter) { counter = fileCounter; } } else if(type == "min") { if(counter == 0) counter = fileCounter; if(fileCounter < counter) { counter = fileCounter; } } } } } if(type == "max") counter++; return counter; } const makeBackupFromDbFile = async () => { if(!saveTelemetryOnError) return; //to avoid large file: tbdata.nosql //init value is 0! if(insertNoSqlCounter > 0) { --insertNoSqlCounter; return; } insertNoSqlCounter = 100; let source = __dirname + "/../databases/tbdata.nosql"; var stats = fs.statSync(source); var fileSizeInBytes = stats.size; if(fileSizeInBytes > noSqlFileSizeLimit) { let counter = 1; counter = getDbBackupFileCounter("max"); let destination = __dirname + "/../databases/" + counter + "." + "tbdata.nosql"; //make backup file fs.copyFileSync(source, destination); //fs.renameSync(p, p + "." + counter); //clear tbdata.nosql fs.writeFileSync(source, ""); fs.truncateSync(source, 0); } } const processDataFromDatabase = async () => { if(restore_from_backup <= 0) { return; } //calculate diff const now = new Date(); let currentTime = now.getTime(); let diff = currentTime - lastRestoreTime; if( (diff / 1000) < restore_backup_wait) { //console.log("*********restore_backup_wait", diff, restore_backup_wait); return; } processingData = true; //get filename to process let counter = getDbBackupFileCounter("min"); //we have some backup files let dataBase = 'tbdata'; var nosql; if(counter == 0) dataBase = 'tbdata'; else dataBase = counter + "." + 'tbdata'; nosql = NOSQL(dataBase); //select all data - use limit restore_from_backup let records = await promisifyBuilder(nosql.find().take(restore_from_backup)); for(let i = 0; i < records.length; i++) { if (brokerready) { let item = records[i]; let id = item.id; if(id !== undefined) { //console.log("------------processDataFromDatabase - remove", id, dataBase, i); try{ let o = JSON.parse(JSON.stringify(item)); delete o.id; let message = JSON.stringify(o); broker.publish("v1/gateway/telemetry", message, {qos:1}); //remove from database await promisifyBuilder(nosql.remove().where("id", id)); } catch (error) { //process error console.log("processDataFromDatabase", error); } } } else { processingData = false; return; } } if(records.length > 0) { //clean backup file if(counter > 0) nosql.clean(); } //no data in db, remove if(records.length == 0) { if(counter > 0) nosql.drop(); } const d = new Date(); lastRestoreTime = d.getTime(); processingData = false; } loadSettings(); //instance.on('options', instance.reconfigure); //instance.reconfigure(); };