exports.id = 'cloudmqttconnect'; exports.title = 'Cloud connect mqtt'; exports.group = 'MQTT'; exports.color = '#888600'; exports.version = '1.0.2'; exports.icon = 'sign-out'; exports.input = 2; exports.output = 2; exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" }; exports.html = `
Hostname or IP address (if not empty - setting will override db setting)
Port
@(Client id)
topic
`; const { promisifyBuilder } = require('./helper/db_helper'); const fs = require('fs'); const mqtt = require('mqtt'); const nosql = NOSQL('tbdatacloud'); const SEND_TO = { debug: 0, rpcCall: 1, } //CONFIG let saveTelemetryOnError = true;//backup_on_failure overrides this value //------------------------ const noSqlFileSizeLimit = 4194304;//use 5MB - 4194304 let insertNoSqlCounter = 0; let insertBackupNoSqlCounter = 0; let processingData = false; let backup_on_failure = true;//== saveTelemetryOnError - create backup client send failure let restore_from_backup = 50; //how many rows process at once? let restore_backup_wait = 3;//wait seconds let lastRestoreTime = 0; // if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable let sendClientError = true; exports.install = function(instance) { var client; var opts; var clientReady = false; let o = null; //options function main() { loadSettings(); } //set opts according to db settings function loadSettings() { o = instance.options; if(!o.topic) o.topic = FLOW.GLOBALS.settings.cloud_topic; opts = { host: o.host, port: o.port, clientId: o.clientid, username: o.username, rejectUnauthorized: false, resubscribe: false }; console.log("wsmqttpublich -> loadSettings from instance.options",o); connectToTbServer(); } function connectToTbServer() { var url = "mqtt://" + opts.host + ":" + opts.port; console.log("MQTT URL: ", url); client = mqtt.connect(url, opts); client.on('connect', function() { client.subscribe(`${o.topic}_backward`, (err) => { if (!err) { console.log("MQTT subscribed"); } }); instance.status("Connected", "green"); clientReady = true; sendClientError = true; }); client.on('reconnect', function() { instance.status("Reconnecting", "yellow"); clientReady = false; }); client.on('message', function(topic, message) { // message is type of buffer message = message.toString(); if (message[0] === '{') { TRY(function() { message = JSON.parse(message); if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) { client.publish(`${o.topic}_forward`, `{"device": "${message.device}", "id": ${message.data.id}, "data": {"success": true}}`, {qos:1}); instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}}); } }, () => instance.debug('MQTT: Error parsing data', message)); } instance.send(SEND_TO.rpcCall, {"topic":o.topic, "content":message }); }); client.on('close', function() { clientReady = false; instance.status("Disconnected", "red"); instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !"}); }); client.on('error', function(err) { instance.status("Err: "+ err.code, "red"); instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts }); if(sendClientError) { console.log('MQTT client error', err); sendClientError = false; } clientReady = false; }); } instance.on('0', function(data) { if(clientReady) { //do we have some data in backup file? if any, process data from database if(saveTelemetryOnError) { //read telemetry data and send back to server if(!processingData) processDataFromDatabase(); } let stringifiedJson = JSON.stringify(data.data) client.publish(`${o.topic}_forward`, stringifiedJson, {qos: 1}); } else { //logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data)); instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data }); if(saveTelemetryOnError) { //create new file from tbdata.nosql, if file size exceeds given limit, and clear tbdata.nosql makeBackupFromDbFile(); //write to tb data.data.id = UID(); nosql.insert(data.data); } } }); instance.on("1", _ => { main(); }) instance.close = function(done) { if(clientReady){ client.end(); } }; function getDbBackupFileCounter(type) { var files = fs.readdirSync(__dirname + "/../databases"); let counter = 0; for(var i = 0; i < files.length; i++) { if(files[i] == "tbdatacloud.nosql") continue; if(files[i].endsWith(".nosql")) { let pos = files[i].indexOf("."); if(pos > -1) { let fileCounter = counter; let firstDigit = files[i].slice(0, pos); fileCounter = parseInt(firstDigit); if(isNaN(fileCounter)) fileCounter = 0; //console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type); if(type == "max") { if(fileCounter > counter) { counter = fileCounter; } } else if(type == "min") { if(counter == 0) counter = fileCounter; if(fileCounter < counter) { counter = fileCounter; } } } } } if(type == "max") counter++; return counter; } const makeBackupFromDbFile = async () => { if(!saveTelemetryOnError) return; //to avoid large file: tbdata.nosql //init value is 0! if(insertNoSqlCounter > 0) { --insertNoSqlCounter; return; } insertNoSqlCounter = 100; let source = __dirname + "/../databases/tbdatacloud.nosql"; var stats = fs.statSync(source); var fileSizeInBytes = stats.size; if(fileSizeInBytes > noSqlFileSizeLimit) { let counter = 1; counter = getDbBackupFileCounter("max"); let destination = __dirname + "/../databases/" + counter + "." + "tbdatacloud.nosql"; //make backup file fs.copyFileSync(source, destination); //fs.renameSync(p, p + "." + counter); //clear tbdata.nosql fs.writeFileSync(source, ""); fs.truncateSync(source, 0); } } const processDataFromDatabase = async () => { if(restore_from_backup <= 0) return; //calculate diff const now = new Date(); let currentTime = now.getTime(); let diff = currentTime - lastRestoreTime; if( (diff / 1000) < restore_backup_wait) { //console.log("*********restore_backup_wait", diff, restore_backup_wait); return; } processingData = true; //get filename to process let counter = getDbBackupFileCounter("min"); //we have some backup files let dataBase = 'tbdata'; var nosql; if(counter == 0) dataBase = 'tbdatacloud'; else dataBase = counter + "." + 'tbdatacloud'; nosql = NOSQL(dataBase); //select all data - use limit restore_from_backup let records = await promisifyBuilder(nosql.find().take(restore_from_backup)); for(let i = 0; i < records.length; i++) { if(clientReady) { let item = records[i]; let id = item.id; if(id !== undefined) { //console.log("------------processDataFromDatabase - remove", id, dataBase, i); try { let message = JSON.parse(JSON.stringify(item)); delete message.id; client.publish(`${o.topic}_forward`, JSON.stringify(message), {qos:1}); //remove from database await promisifyBuilder(nosql.remove().where("id", id)); } catch(error) { //process error console.log("processDataFromDatabase", error); } } } else { processingData = false; return; } } if(records.length > 0) { //clean backup file if(counter > 0) nosql.clean(); } //no data in db, remove if(records.length == 0) { if(counter > 0) nosql.drop(); } const d = new Date(); lastRestoreTime = d.getTime(); processingData = false; } instance.on('options', main); };