Send to rado platform; show_dbdata, db loads after start
This commit is contained in:
parent
ed0fe5b15d
commit
f63ac50497
22 changed files with 6380 additions and 2279 deletions
|
|
@ -4,30 +4,27 @@ exports.group = 'MQTT';
|
|||
exports.color = '#888600';
|
||||
exports.version = '1.0.2';
|
||||
exports.icon = 'sign-out';
|
||||
exports.input = 1;
|
||||
exports.output = ["red", "white", "blue"];
|
||||
exports.author = 'Daniel Segeš';
|
||||
exports.input = 2;
|
||||
exports.output = 4;
|
||||
exports.options = { host: 'tb-stage.worksys.io', port: 1883, clientid: "", username: "" };
|
||||
exports.npm = ['mqtt'];
|
||||
|
||||
|
||||
exports.html = `<div class="padding">
|
||||
<div class="row">
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="host" data-jc-config="placeholder:test.mosquitto.org;required:false" class="m">Hostname or IP address (if not empty - setting will override db setting)</div>
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="port" data-jc-config="placeholder:1883;required:true" class="m">Port</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="clientid">@(Client id)</div>
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<div data-jc="textbox" data-jc-path="username" class="m">@(Username)</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>`;
|
||||
|
||||
|
||||
|
|
@ -41,21 +38,22 @@ Added:
|
|||
- rpc response
|
||||
`;
|
||||
|
||||
const instanceSendTo = {
|
||||
const { promisifyBuilder } = require('./helper/db_helper');
|
||||
const { errLogger, monitor } = require('./helper/logger');
|
||||
const fs = require('fs');
|
||||
const mqtt = require('mqtt');
|
||||
|
||||
const SEND_TO = {
|
||||
debug: 0,
|
||||
rpcCall: 1,
|
||||
services: 2
|
||||
}
|
||||
|
||||
const { promisifyBuilder, makeMapFromDbResult } = require('./helper/db_helper.js');
|
||||
|
||||
//CONFIG
|
||||
let useLog4js = true;
|
||||
let createTelemetryBackup = true;
|
||||
let saveTelemetryOnError = true;//backup_on_failure overrides this value
|
||||
//------------------------
|
||||
|
||||
var fs = require('fs');
|
||||
let rollers;
|
||||
if(createTelemetryBackup) rollers = require('streamroller');
|
||||
|
||||
|
|
@ -64,56 +62,18 @@ let insertNoSqlCounter = 0;
|
|||
let insertBackupNoSqlCounter = 0;
|
||||
let processingData = false;
|
||||
|
||||
let backup_on_failure = false;//== saveTelemetryOnError - create backup broker send failure
|
||||
let backup_on_failure = false;//== saveTelemetryOnError - create backup client send failure
|
||||
let restore_from_backup = 0; //how many rows process at once?
|
||||
let restore_backup_wait = 0;//wait seconds
|
||||
let lastRestoreTime = 0;
|
||||
|
||||
let errLogger;
|
||||
let logger;
|
||||
let monitor;
|
||||
|
||||
//TODO brokerready and sendBrokerError seems to be the same. Moreover, we use FLOW_OMS_brokerready variable!!
|
||||
//
|
||||
// if there is an error in broker connection, flow logs to monitor.txt. Not to log messages every second, we use sendBrokerError variable
|
||||
let sendBrokerError = true;
|
||||
|
||||
if(useLog4js)
|
||||
{
|
||||
var path = require('path');
|
||||
var log4js = require("log4js");
|
||||
|
||||
log4js.configure({
|
||||
appenders: {
|
||||
errLogs: { type: 'file', filename: path.join(__dirname + "/../", 'err.txt') },
|
||||
monitorLogs: { type: 'file', compress:true, daysToKeep: 2, maxLogSize: 1048576, backups: 1, keepFileExt: true, filename: path.join(__dirname + "/../", 'monitor.txt') },
|
||||
console: { type: 'console' }
|
||||
},
|
||||
categories: {
|
||||
errLogs: { appenders: ['console', 'errLogs'], level: 'error' },
|
||||
monitorLogs: { appenders: ['console', 'monitorLogs'], level: 'trace' },
|
||||
//another: { appenders: ['console'], level: 'trace' },
|
||||
default: { appenders: ['console'], level: 'trace' }
|
||||
}
|
||||
});
|
||||
|
||||
errLogger = log4js.getLogger("errLogs");
|
||||
logger = log4js.getLogger();
|
||||
monitor = log4js.getLogger("monitorLogs");
|
||||
|
||||
//USAGE
|
||||
//logger.debug("text");
|
||||
//monitor.info('info');
|
||||
//errLogger.error("some error");
|
||||
}
|
||||
// if there is an error in client connection, flow logs to monitor.txt. Not to log messages every second, we use sendClientError variable
|
||||
let sendClientError = true;
|
||||
|
||||
process.on('uncaughtException', function (err) {
|
||||
|
||||
if(errLogger)
|
||||
{
|
||||
errLogger.error('uncaughtException:', err.message)
|
||||
errLogger.error(err.stack);
|
||||
}
|
||||
errLogger.error('uncaughtException:', err.message)
|
||||
errLogger.error(err.stack);
|
||||
|
||||
//TODO
|
||||
//send to service
|
||||
|
|
@ -127,13 +87,9 @@ const nosqlBackup = NOSQL('/backup/tbdata');
|
|||
|
||||
exports.install = function(instance) {
|
||||
|
||||
var broker;
|
||||
var client;
|
||||
var opts;
|
||||
var brokerready = false;
|
||||
|
||||
instance.on('options', loadSettings);
|
||||
|
||||
mqtt = require('mqtt');
|
||||
var clientReady = false;
|
||||
|
||||
// wsmqtt status for notification purposes on projects.worksys.io database
|
||||
let wsmqttName = null;
|
||||
|
|
@ -142,21 +98,28 @@ exports.install = function(instance) {
|
|||
|
||||
function getWsmqttName(host)
|
||||
{
|
||||
if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
|
||||
else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
|
||||
else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
|
||||
if(host == "tb-demo.worksys.io" || host == '192.168.252.4') return 'wsmqtt_demo';
|
||||
else if(host == "tb-qas01.worksys.io" || host == '192.168.252.5') return 'wsmqtt_qas01';
|
||||
else if(host == "tb-prod01.worksys.io" || host == '192.168.252.1') return 'wsmqtt_prod01';
|
||||
}
|
||||
|
||||
function sendWsStatus()
|
||||
{
|
||||
instance.send(instanceSendTo.services, {[wsmqttName]: wsmqtt_status});
|
||||
instance.send(SEND_TO.services, {[wsmqttName]: wsmqtt_status});
|
||||
}
|
||||
|
||||
sendWsStatusVar = setInterval(sendWsStatus, 180000);
|
||||
|
||||
function main()
|
||||
{
|
||||
if(!FLOW.dbLoaded) return;
|
||||
|
||||
loadSettings();
|
||||
clearInterval(sendWsStatus);
|
||||
sendWsStatusVar = setInterval(sendWsStatus, 180000);
|
||||
}
|
||||
|
||||
//set opts according to db settings
|
||||
async function loadSettings()
|
||||
function loadSettings()
|
||||
{
|
||||
|
||||
if(instance.options.host !== "")
|
||||
|
|
@ -179,21 +142,17 @@ exports.install = function(instance) {
|
|||
else
|
||||
{
|
||||
|
||||
const dbSettings = TABLE("settings");
|
||||
let responseSettings = await promisifyBuilder(dbSettings.find());
|
||||
|
||||
backup_on_failure = responseSettings[0]["backup_on_failure"];
|
||||
const SETTINGS = FLOW.GLOBALS.settings;
|
||||
backup_on_failure = SETTINGS.backup_on_failure;
|
||||
saveTelemetryOnError = backup_on_failure;
|
||||
|
||||
restore_from_backup = responseSettings[0]["restore_from_backup"];
|
||||
restore_backup_wait = responseSettings[0]["restore_backup_wait"];
|
||||
restore_from_backup = SETTINGS.restore_from_backup;
|
||||
restore_backup_wait = SETTINGS.restore_backup_wait;
|
||||
|
||||
let mqtt_host = responseSettings[0]["mqtt_host"];
|
||||
let mqtt_clientid = responseSettings[0]["mqtt_clientid"];
|
||||
let mqtt_username = responseSettings[0]["mqtt_username"];
|
||||
let mqtt_port = responseSettings[0]["mqtt_port"];
|
||||
|
||||
console.log("wsmqttpublich -> loadSettings from db", responseSettings[0]);
|
||||
let mqtt_host = SETTINGS.mqtt_host;
|
||||
let mqtt_clientid = SETTINGS.mqtt_clientid;
|
||||
let mqtt_username = SETTINGS.mqtt_username;
|
||||
let mqtt_port = SETTINGS.mqtt_port;
|
||||
|
||||
opts = {
|
||||
host: mqtt_host,
|
||||
|
|
@ -216,27 +175,23 @@ exports.install = function(instance) {
|
|||
var url = "mqtt://" + opts.host + ":" + opts.port;
|
||||
console.log("MQTT URL: ", url);
|
||||
|
||||
broker = mqtt.connect(url, opts);
|
||||
client = mqtt.connect(url, opts);
|
||||
|
||||
broker.on('connect', function() {
|
||||
client.on('connect', function() {
|
||||
instance.status("Connected", "green");
|
||||
monitor.info("MQTT broker connected");
|
||||
monitor.info("MQTT client connected");
|
||||
|
||||
sendBrokerError = true;
|
||||
|
||||
brokerready = true;
|
||||
FLOW.OMS_brokerready = brokerready;
|
||||
sendClientError = true;
|
||||
clientReady = true;
|
||||
wsmqtt_status = 'connected';
|
||||
});
|
||||
|
||||
broker.on('reconnect', function() {
|
||||
client.on('reconnect', function() {
|
||||
instance.status("Reconnecting", "yellow");
|
||||
brokerready = false;
|
||||
|
||||
FLOW.OMS_brokerready = brokerready;
|
||||
clientReady = false;
|
||||
});
|
||||
|
||||
broker.on('message', function(topic, message) {
|
||||
client.on('message', function(topic, message) {
|
||||
// message is type of buffer
|
||||
message = message.toString();
|
||||
if (message[0] === '{') {
|
||||
|
|
@ -244,50 +199,53 @@ exports.install = function(instance) {
|
|||
|
||||
message = JSON.parse(message);
|
||||
if (message.hasOwnProperty("device") && message.hasOwnProperty("data") && message.data.hasOwnProperty("id")) {
|
||||
broker.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
|
||||
instance.send(instanceSendTo.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
|
||||
client.publish(topic, `{"device": ${message.device}, "id": ${message.data.id}, "data": {"success": true}}`, {qos:1});
|
||||
instance.send(SEND_TO.rpcCall, {"device": message.device, "id": message.data.id, "RPC response": {"success": true}});
|
||||
}
|
||||
|
||||
}, () => instance.debug('MQTT: Error parsing data', message));
|
||||
}
|
||||
|
||||
instance.send(instanceSendTo.rpcCall, {"topic":topic, "content":message });
|
||||
instance.send(SEND_TO.rpcCall, {"topic":topic, "content":message });
|
||||
});
|
||||
|
||||
broker.on('close', function(err) {
|
||||
brokerready = false;
|
||||
FLOW.OMS_brokerready = brokerready;
|
||||
client.on('close', function(err) {
|
||||
clientReady = false;
|
||||
wsmqtt_status = 'disconnected';
|
||||
|
||||
if (err && err.toString().indexOf('Error')) {
|
||||
instance.status("Err: "+err.code, "red");
|
||||
instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
|
||||
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts });
|
||||
} else {
|
||||
instance.status("Disconnected", "red");
|
||||
instance.send(instanceSendTo.debug, {"message":"Broker CLOSE signal received !", "error":err, "opt":opts });
|
||||
instance.send(SEND_TO.debug, {"message":"Client CLOSE signal received !", "error":err, "opt":opts });
|
||||
}
|
||||
|
||||
broker.reconnect();
|
||||
client.reconnect();
|
||||
});
|
||||
|
||||
broker.on('error', function(err) {
|
||||
client.on('error', function(err) {
|
||||
instance.status("Err: "+ err.code, "red");
|
||||
instance.send(instanceSendTo.debug, {"message":"Broker ERROR signal received !", "error":err, "opt":opts });
|
||||
if(sendBrokerError) {
|
||||
monitor.info('MQTT broker error', err);
|
||||
sendBrokerError = false;
|
||||
instance.send(SEND_TO.debug, {"message":"Client ERROR signal received !", "error":err, "opt":opts });
|
||||
if(sendClientError) {
|
||||
monitor.info('MQTT client error', err);
|
||||
sendClientError = false;
|
||||
}
|
||||
brokerready = false;
|
||||
FLOW.OMS_brokerready = brokerready;
|
||||
clientReady = false;
|
||||
wsmqtt_status = 'disconnected';
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
instance.on('data', function(data) {
|
||||
|
||||
if (brokerready)
|
||||
instance.on("0", _ => {
|
||||
main();
|
||||
})
|
||||
|
||||
|
||||
instance.on('1', function(data) {
|
||||
|
||||
if(clientReady)
|
||||
{
|
||||
//do we have some data in backup file?
|
||||
//if any, process data from database
|
||||
|
|
@ -296,13 +254,14 @@ exports.install = function(instance) {
|
|||
//read telemetry data and send back to server
|
||||
if(!processingData) processDataFromDatabase();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (brokerready)
|
||||
if(clientReady)
|
||||
{
|
||||
let stringifiedJson = JSON.stringify(data.data);
|
||||
broker.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
|
||||
client.publish("v1/gateway/telemetry", stringifiedJson, {qos: 1});
|
||||
|
||||
instance.send(3, stringifiedJson);
|
||||
|
||||
//backup telemetry
|
||||
if(createTelemetryBackup)
|
||||
|
|
@ -327,8 +286,8 @@ exports.install = function(instance) {
|
|||
else
|
||||
{
|
||||
|
||||
if(logger) logger.debug("Broker unavailable. Data not sent !", JSON.stringify(data.data));
|
||||
instance.send(instanceSendTo.debug, {"message":"Broker unavailable. Data not sent !", "data": data.data });
|
||||
//logger.debug("Client unavailable. Data not sent !", JSON.stringify(data.data));
|
||||
instance.send(SEND_TO.debug, {"message":"Client unavailable. Data not sent !", "data": data.data });
|
||||
|
||||
if(saveTelemetryOnError)
|
||||
{
|
||||
|
|
@ -344,9 +303,9 @@ exports.install = function(instance) {
|
|||
});
|
||||
|
||||
|
||||
instance.close = function(done) {
|
||||
if (brokerready){
|
||||
broker.end();
|
||||
instance.close = function(done) {
|
||||
if(clientReady){
|
||||
client.end();
|
||||
clearInterval(sendWsStatusVar);
|
||||
}
|
||||
};
|
||||
|
|
@ -373,7 +332,7 @@ exports.install = function(instance) {
|
|||
let firstDigit = files[i].slice(0, pos);
|
||||
|
||||
fileCounter = parseInt(firstDigit);
|
||||
if (isNaN(fileCounter)) fileCounter = 0;
|
||||
if(isNaN(fileCounter)) fileCounter = 0;
|
||||
//console.log("getDbBackupFileCounter digit:", files[i], firstDigit, fileCounter, isNaN(fileCounter), type);
|
||||
|
||||
if(type == "max")
|
||||
|
|
@ -443,10 +402,7 @@ exports.install = function(instance) {
|
|||
|
||||
const processDataFromDatabase = async () => {
|
||||
|
||||
if(restore_from_backup <= 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if(restore_from_backup <= 0) return;
|
||||
|
||||
//calculate diff
|
||||
const now = new Date();
|
||||
|
|
@ -478,7 +434,7 @@ exports.install = function(instance) {
|
|||
|
||||
for(let i = 0; i < records.length; i++)
|
||||
{
|
||||
if (brokerready) {
|
||||
if(clientReady) {
|
||||
|
||||
let item = records[i];
|
||||
let id = item.id;
|
||||
|
|
@ -487,18 +443,19 @@ exports.install = function(instance) {
|
|||
{
|
||||
//console.log("------------processDataFromDatabase - remove", id, dataBase, i);
|
||||
|
||||
try{
|
||||
try {
|
||||
|
||||
let o = JSON.parse(JSON.stringify(item));
|
||||
delete o.id;
|
||||
let message = JSON.stringify(o);
|
||||
|
||||
broker.publish("v1/gateway/telemetry", message, {qos:1});
|
||||
client.publish("v1/gateway/telemetry", message, {qos:1});
|
||||
instance.send(3, message);
|
||||
|
||||
//remove from database
|
||||
await promisifyBuilder(nosql.remove().where("id", id));
|
||||
|
||||
} catch (error) {
|
||||
} catch(error) {
|
||||
//process error
|
||||
console.log("processDataFromDatabase", error);
|
||||
}
|
||||
|
|
@ -533,8 +490,6 @@ exports.install = function(instance) {
|
|||
|
||||
}
|
||||
|
||||
loadSettings();
|
||||
|
||||
//instance.on('options', instance.reconfigure);
|
||||
//instance.reconfigure();
|
||||
instance.on('options', main);
|
||||
//instance.reconfigure();
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue