From 6132458036edee5d71fac940ddcc661f302d659d Mon Sep 17 00:00:00 2001
From: chodak166
Date: Wed, 25 May 2022 14:38:08 +0200
Subject: [PATCH] Added wifi/mqtt reconnection and resubscription
---
.../firmware/include/nx/firmware/MqttClient.h | 6 +-
components/firmware/src/MqttClient.c | 24 +++++--
.../include/nx/software/AppSettings.h | 6 +-
components/software/src/AppSettings.c | 7 ++-
components/software/src/SystemSettingsApi.c | 8 +--
main/Main.c | 62 ++++++++++++++-----
main/static/app.html | 4 +-
main/static/index.html | 2 +-
main/static/index.js | 50 ++++++---------
main/static/min/app.html | 4 +-
main/static/min/index.html | 2 +-
main/static/min/index.js | 11 ++--
12 files changed, 113 insertions(+), 73 deletions(-)
diff --git a/components/firmware/include/nx/firmware/MqttClient.h b/components/firmware/include/nx/firmware/MqttClient.h
index 4556dbb..a6ce337 100644
--- a/components/firmware/include/nx/firmware/MqttClient.h
+++ b/components/firmware/include/nx/firmware/MqttClient.h
@@ -31,10 +31,12 @@ typedef struct MqttSettings
MqttErrorCallback errorCb;
} MqttSettings;
-void nxStartMqttClient(const MqttSettings* settings);
+bool nxStartMqttClient(const MqttSettings* settings);
bool nxMqttSubscribe(const char* topic, uint8_t qos);
-uint32_t nxMqttPublish(const char* topic, uint8_t qos, const char* data, size_t size, uint8_t retain);
+int32_t nxMqttPublish(const char* topic, uint8_t qos, const char* data, size_t size, uint8_t retain);
+
+bool nxMqttIsConnected(void);
#endif /* COMPONENTS_FIRMWARE_INCLUDE_NX_FIRMWARE_MQTTCLIENT_H_ */
diff --git a/components/firmware/src/MqttClient.c b/components/firmware/src/MqttClient.c
index 2a082b5..4cb3c30 100644
--- a/components/firmware/src/MqttClient.c
+++ b/components/firmware/src/MqttClient.c
@@ -16,6 +16,9 @@
#define UNUSED(x) (void)(x)
+#define RECONNECTION_TIMEOUT_MS 5000
+#define KEEPALIVE_SEC 30 // broker ping
+
static const char* TAG = "MQTT";
static const MqttSettings* settings = NULL;
@@ -84,13 +87,13 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
// --------- Public API --------- //
-void nxStartMqttClient(const MqttSettings* mqttSettings)
+bool nxStartMqttClient(const MqttSettings* mqttSettings)
{
ESP_LOGI(TAG, "Initializing MQTT");
if (strlen(mqttSettings->brokerAddr) == 0) {
ESP_LOGW(TAG, "Empty broker address, skipping MQTT initialization");
- return;
+ return false;
}
settings = mqttSettings;
@@ -103,9 +106,15 @@ void nxStartMqttClient(const MqttSettings* mqttSettings)
mqtt_cfg.username = settings->user;
mqtt_cfg.password = settings->password;
+ mqtt_cfg.disable_clean_session = false;
+ mqtt_cfg.disable_auto_reconnect = false;
+ mqtt_cfg.reconnect_timeout_ms = RECONNECTION_TIMEOUT_MS;
+ mqtt_cfg.refresh_connection_after_ms = 0;
+ mqtt_cfg.keepalive = KEEPALIVE_SEC;
+
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
- esp_mqtt_client_start(client);
+ return esp_mqtt_client_start(client) == ESP_OK;
}
bool nxMqttSubscribe(const char* topic, uint8_t qos)
@@ -117,17 +126,18 @@ bool nxMqttSubscribe(const char* topic, uint8_t qos)
int msg_id = esp_mqtt_client_subscribe(clientHandle, settings->apiTopic, qos);
if (msg_id == -1) {
- ESP_LOGE(TAG, "Subscribtion of topic %s failed", topic);
+ ESP_LOGE(TAG, "Subscription of topic %s failed", topic);
if (settings->errorCb) {
settings->errorCb();
}
return false;
}
+ ESP_LOGI(TAG, "Subscription of topic %s done", topic);
return true;
}
-uint32_t nxMqttPublish(const char* topic, uint8_t qos, const char* data, size_t size, uint8_t retain)
+int32_t nxMqttPublish(const char* topic, uint8_t qos, const char* data, size_t size, uint8_t retain)
{
if (!clientHandle) {
ESP_LOGE(TAG, "MQTT client not initialized, aborting data publishing in topic %s", topic);
@@ -136,3 +146,7 @@ uint32_t nxMqttPublish(const char* topic, uint8_t qos, const char* data, size_t
return esp_mqtt_client_publish(clientHandle, topic, data, size, qos, retain);
}
+bool nxMqttIsConnected(void)
+{
+ return clientHandle != NULL;
+}
diff --git a/components/software/include/nx/software/AppSettings.h b/components/software/include/nx/software/AppSettings.h
index 25559a8..673aa76 100644
--- a/components/software/include/nx/software/AppSettings.h
+++ b/components/software/include/nx/software/AppSettings.h
@@ -8,8 +8,8 @@
#define WIFI_STRINGS_MAX_LEN 32
#define URI_MAX_SIZE 64
#define USER_DATA_MAX_SIZE 16
-#define CA_CERT_MAX_SIZE 1500
#define DEVICE_NAME_MAX_LEN 64
+#define CA_CERT_MAX_SIZE 1500
typedef bool (*StorageReadFn)(const char* key, void* data, size_t size);
@@ -24,10 +24,10 @@ typedef struct AppSettings {
char mqttUser[USER_DATA_MAX_SIZE];
char mqttPassword[USER_DATA_MAX_SIZE];
uint16_t mqttHbIntervalSec;
- uint8_t mqttUseTls;
- char caCert[CA_CERT_MAX_SIZE];
+ bool mqttUseTls;
bool overrideDevName;
char customDevName[DEVICE_NAME_MAX_LEN];
+ char caCert[CA_CERT_MAX_SIZE];
} AppSettings;
diff --git a/components/software/src/AppSettings.c b/components/software/src/AppSettings.c
index 8ff3f11..f5f02ba 100644
--- a/components/software/src/AppSettings.c
+++ b/components/software/src/AppSettings.c
@@ -76,7 +76,10 @@ void nxInitAppSettings(StorageWriteFn writeFn,
settingsUpdatedCb = updateCb;
if (firstRun()) {
+ SettingsUpdatedCb cb = settingsUpdatedCb;
+ settingsUpdatedCb = NULL; // skip callback during first init
nxRestoreAppDefaultSettings();
+ settingsUpdatedCb = cb;
}
else {
loadSettings();
@@ -95,16 +98,16 @@ void nxRestoreAppDefaultSettings(void)
printf("Restoring DEFAULT application settings\n");
settings.mqttHbIntervalSec = DEFAULT_MQTT_HB_SEC;
settings.mqttUseTls = DEFAULT_MQTT_TLS;
+ settings.overrideDevName = DEFAULT_OV_DEVNAME;
strcpy(settings.mqttHbUri , DEFAULT_MQTT_HB_URI );
strcpy(settings.mqttApiUri , DEFAULT_MQTT_API_URI );
strcpy(settings.mqttHost , DEFAULT_MQTT_HOST );
strcpy(settings.mqttUser , DEFAULT_MQTT_USER );
strcpy(settings.mqttPassword , DEFAULT_MQTT_PASS );
- strcpy(settings.caCert , DEFAULT_CA_CERT );
strcpy(settings.customDevName , DEFAULT_CUSTOM_DEVNAME);
+ strcpy(settings.caCert , DEFAULT_CA_CERT );
- settings.overrideDevName = DEFAULT_OV_DEVNAME;
nxWriteAppSettings();
}
diff --git a/components/software/src/SystemSettingsApi.c b/components/software/src/SystemSettingsApi.c
index 005e5e7..5ae93cf 100644
--- a/components/software/src/SystemSettingsApi.c
+++ b/components/software/src/SystemSettingsApi.c
@@ -104,8 +104,8 @@ void nxApiGetSystemSettings(const uint8_t* msg, size_t msgLen,
sprintf(responseBuffer,
"{"
"\"%s\":\"%s\", " // ssid
- "\"%s\":%s, " // power save
- "\"%s\":%s, " // use static ip
+ "\"%s\":%i, " // power save
+ "\"%s\":%i, " // use static ip
"\"%s\":\"%i.%i.%i.%i\", " // ip
"\"%s\":\"%i.%i.%i.%i\", " // gw
"\"%s\":\"%i.%i.%i.%i\"," // mask
@@ -116,8 +116,8 @@ void nxApiGetSystemSettings(const uint8_t* msg, size_t msgLen,
"\"%s\":\"%s\"" // dn
"}",
API_KEY_SSID, settings->wifiSsid,
- API_KEY_WPWSAVE, settings->wifiPowerSave ? "true" : "false",
- API_KEY_USE_STATIC, settings->useStaticAddr ? "true" : "false",
+ API_KEY_WPWSAVE, settings->wifiPowerSave,
+ API_KEY_USE_STATIC, settings->useStaticAddr,
API_KEY_IPV4ADDR, settings->ip4addr[0], settings->ip4addr[1], settings->ip4addr[2], settings->ip4addr[3],
API_KEY_IPV4GW, settings->ip4gw[0], settings->ip4gw[1], settings->ip4gw[2], settings->ip4gw[3],
API_KEY_IPV4MASK, settings->ip4mask[0], settings->ip4mask[1], settings->ip4mask[2], settings->ip4mask[3],
diff --git a/main/Main.c b/main/Main.c
index 1df392d..3df364c 100644
--- a/main/Main.c
+++ b/main/Main.c
@@ -45,6 +45,9 @@ static AppSettings* appSettings = NULL;
static WifiSettings wifiSettings;
static MqttSettings mqttSettings;
+static bool wifiStarted = false;
+static bool mqttStarted = false;
+
static const MqTriggerHttpCallbacks httpCallbacks = {
.getRoot = rootHandler,
.getJs = jsHandler,
@@ -66,12 +69,22 @@ void app_main(void)
nxInitStorage();
+ // uncomment to force default NVS initialization for development
+// uint8_t zero = 0;
+// nxStorageWrite("ledinit", &zero, 1);
+// nxStorageWrite("sysinit", &zero, 1);
+
nxInitSystemSettings(nxStorageWrite, nxStorageRead);
nxInitAppSettings(nxStorageWrite, nxStorageRead, onAppSettingsUpdate);
systemSettings = nxGetSystemSettings();
appSettings = nxGetAppSettings();
+ // uncomment to force WiFi settings for development
+// strcpy(systemSettings->wifiSsid, "androidAp");
+// strcpy(systemSettings->wifiPassword, "password");
+// systemSettings->useStaticAddr = false;
+
nxStartRestarter(systemSettings->rsSchedule, systemSettings->tzEnv);
nxSetWifiConnectedCallback(onWifiConnected);
@@ -83,9 +96,6 @@ void app_main(void)
static void startWifi(void)
{
- systemSettings->useStaticAddr = true;
- systemSettings->ip4addr[3] = 91;
-
wifiSettings = (struct WifiSettings){
.wname = systemSettings->wifiSsid,
.wpass = systemSettings->wifiPassword,
@@ -106,6 +116,13 @@ static void startWifi(void)
static void onWifiConnected(void)
{
+ nxUpdateStatus(STATUS_OK);
+
+ if (wifiStarted) {
+ return;
+ }
+
+ wifiStarted = true;
systemSettings->deviceName = nxGetWifiDeviceName();
nxInitSntpClient(SNTP_RETRIES, systemSettings->sntpAddr, systemSettings->tzEnv);
@@ -127,7 +144,9 @@ static void onWifiConnected(void)
mqttSettings.disconnectedCb = onMqttDisconnected;
mqttSettings.errorCb = onMqttError;
- nxStartMqttClient(&mqttSettings);
+ if (!nxStartMqttClient(&mqttSettings)) {
+ onMqttError();
+ }
nxSetMqTriggerHttpCallbacks(&httpCallbacks);
nxStartMqTriggerHttpServer();
@@ -154,8 +173,14 @@ static void onWifiError()
static void onMqttConnected()
{
ESP_LOGI(TAG, "MQTT CONNECTED");
- if (nxMqttSubscribe(appSettings->mqttApiUri, API_QOS)) {
+
+ if (!mqttStarted) {
xTaskCreate(&hbTask, "hb_task", 2048, NULL, 1, NULL);
+ mqttStarted = true;
+ }
+
+ // (re)subscribe
+ if (nxMqttSubscribe(appSettings->mqttApiUri, API_QOS)) {
nxUpdateStatus(STATUS_OK);
}
}
@@ -186,16 +211,23 @@ static void hbTask(void* param)
}
while (1) {
- ESP_LOGI(TAG, "Sending MQTT heartbeat");
-
- char timeStr[DT_FORMAT_LEN];
- nxGetTimeStr(timeStr);
- char hbMessage[DT_FORMAT_LEN + strlen(nxGetWifiDeviceName()) + 2];
- strcpy(hbMessage, nxGetWifiDeviceName());
- strcat(hbMessage, " ");
- strcat(hbMessage, timeStr);
-
- nxMqttPublish(appSettings->mqttHbUri, HB_QOS, hbMessage, strlen(hbMessage), 1);
+ if (nxMqttIsConnected()) {
+ ESP_LOGI(TAG, "Sending MQTT heartbeat");
+
+ char timeStr[DT_FORMAT_LEN];
+ nxGetTimeStr(timeStr);
+ char hbMessage[DT_FORMAT_LEN + strlen(nxGetWifiDeviceName()) + 2];
+ strcpy(hbMessage, nxGetWifiDeviceName());
+ strcat(hbMessage, " ");
+ strcat(hbMessage, timeStr);
+
+ if(nxMqttPublish(appSettings->mqttHbUri, HB_QOS, hbMessage, strlen(hbMessage), 1) < 0) {
+ ESP_LOGE(TAG, "Cannot publish heartbeat message");
+ }
+ }
+ else {
+ ESP_LOGW(TAG, "Skipping MQTT heartbeat due to the disconnected client");
+ }
vTaskDelay(appSettings->mqttHbIntervalSec*1000 / portTICK_PERIOD_MS);
}
diff --git a/main/static/app.html b/main/static/app.html
index c0f088e..93df1c8 100644
--- a/main/static/app.html
+++ b/main/static/app.html
@@ -16,13 +16,13 @@
-
- Heartbeat interval (sec) [TODO]
+ Heartbeat interval (sec)
diff --git a/main/static/index.html b/main/static/index.html
index 830a059..5aa4192 100644
--- a/main/static/index.html
+++ b/main/static/index.html
@@ -33,7 +33,7 @@
diff --git a/main/static/index.js b/main/static/index.js
index 821a946..6b10200 100644
--- a/main/static/index.js
+++ b/main/static/index.js
@@ -71,8 +71,25 @@ function loadContent(url) {
http.send();
}
+function fillInputs(element, jsonString) {
+ let obj = JSON.parse(jsonString);
+ if (obj) {
+ for (var key of Object.keys(obj)) {
+ console.log(key + " -> " + obj[key])
+ let input = element.querySelector("[name='" + key + "'");
+ if (input) { //TODO: checkbox/radio
+ input.value = obj[key];
+ if (input.hasAttribute('data-onset')) {
+ eval(input.getAttribute("data-onset"));
+ }
+ }
+ }
+ }
+}
+
function loadValues(element) {
-
+ valuesCache = {};
+
const forms = element.querySelectorAll('form');
for (i = 0; i < forms.length; ++i) {
const srcUrl = forms[i].getAttribute('data-values-src');
@@ -82,20 +99,7 @@ function loadValues(element) {
if (valuesCache[srcUrl]) {
console.log("Source values already cached: " + valuesCache[srcUrl]);
- //TODO: dry
- let obj = JSON.parse(valuesCache[srcUrl]);
- if (obj) {
- for (var key of Object.keys(obj)) {
- console.log(key + " -> " + obj[key])
- let input = element.querySelector("[name='" + key + "'");
- if (input) { //TODO: checkbox/radio
- input.value = obj[key];
- if (input.hasAttribute('data-onset')) {
- eval(input.getAttribute("data-onset"));
- }
- }
- }
- }
+ fillInputs(element, valuesCache[srcUrl]);
continue;
}
@@ -111,21 +115,7 @@ function loadValues(element) {
if (http.readyState == 4 && http.status == 200) {
console.log("Values received: " + http.responseText);
valuesCache[srcUrl] = http.responseText;
-
- //TODO: dry
- let obj = JSON.parse(http.responseText);
- if (obj) {
- for (var key of Object.keys(obj)) {
- console.log(key + " -> " + obj[key])
- let input = element.querySelector("[name='" + key + "'");
- if (input) { //TODO: checkbox/radio
- input.value = obj[key];
- if (input.hasAttribute('data-onset')) {
- eval(input.getAttribute("data-onset"));
- }
- }
- }
- }
+ fillInputs(element, http.responseText);
}
}
http.send();
diff --git a/main/static/min/app.html b/main/static/min/app.html
index c9a2f61..a90a715 100644
--- a/main/static/min/app.html
+++ b/main/static/min/app.html
@@ -1,7 +1,7 @@
Application settings
MQTT settings