From 1f93ce4f29165fe138a9bb12dc845dcfdf897ee9 Mon Sep 17 00:00:00 2001 From: Maxim Prokhorov Date: Sun, 8 Sep 2024 01:09:51 +0300 Subject: [PATCH] mqtt: settings refactoring & custom topics - customize will topic qos and retain, ref. #2616 - allow to fully replace will and json topics. empty string means the default / replacement is taken as-is, after applying placeholders - validate will and data topic structure before connecting - validate {suf,post}fix as {suf,post}fix, not as topic --- code/espurna/config/general.h | 25 +- code/espurna/config/version.h | 2 +- code/espurna/mqtt.cpp | 460 +++++++++++++++++++++++----------- code/html/src/panel-mqtt.html | 42 +++- 4 files changed, 376 insertions(+), 153 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 61c1a136..cddd93a6 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -1060,7 +1060,7 @@ // Note: When using MQTT_LIBRARY_PUBSUBCLIENT, MQTT_MAX_PACKET_SIZE should not be more than this value. #endif -// These are the properties that will be sent when useJson is true +// These are the properties that will be sent when MQTT_USE_JSON is enabled #ifndef MQTT_ENQUEUE_IP #define MQTT_ENQUEUE_IP 1 #endif @@ -1082,24 +1082,31 @@ #endif #ifndef MQTT_STATUS_ONLINE -#define MQTT_STATUS_ONLINE "1" // Value for the device ON message +#define MQTT_STATUS_ONLINE "1" // Publish this value to the 'status' topic (aka Will topic) when device is ONLINE. + // Device publishes this value when connected to the broker, and with a periodic heartbeat messages. #endif #ifndef MQTT_STATUS_OFFLINE -#define MQTT_STATUS_OFFLINE "0" // Value for the device OFF message (will) +#define MQTT_STATUS_OFFLINE "0" // Publish this value to the 'status' topic (aka Will topic) when device is OFFLINE + // Broker would publish this message when device disconnects from it. #endif -#define MQTT_ACTION_RESET "reboot" // RESET MQTT topic particle +#ifndef MQTT_STATUS_RETAIN +#define MQTT_STATUS_RETAIN MQTT_RETAIN // Sets 'status' (aka Will) message RETAIN flag. +#endif + +#ifndef MQTT_STATUS_QOS +#define MQTT_STATUS_QOS MQTT_QOS // Sets 'status' (aka Will) message QoS. +#endif -// Custom get and set postfixes -// Use something like "/status" or "/set", with leading slash -// Since 1.9.0 the default value is "" for getter and "/set" for setter #ifndef MQTT_GETTER -#define MQTT_GETTER "" +#define MQTT_GETTER "" // Use this string postfix when publishing messages from the device + // When not empty, **MUST** start with a slash. #endif #ifndef MQTT_SETTER -#define MQTT_SETTER "/set" +#define MQTT_SETTER "/set" // Use this string postfix when subscribing to topics + // When not empty, **MUST** start with a slash. #endif // ----------------------------------------------------------------------------- diff --git a/code/espurna/config/version.h b/code/espurna/config/version.h index 9eebc01d..e0f01ea4 100644 --- a/code/espurna/config/version.h +++ b/code/espurna/config/version.h @@ -21,5 +21,5 @@ #endif #ifndef CFG_VERSION -#define CFG_VERSION 15 +#define CFG_VERSION 16 #endif diff --git a/code/espurna/mqtt.cpp b/code/espurna/mqtt.cpp index f7a91dad..d88ec6dc 100644 --- a/code/espurna/mqtt.cpp +++ b/code/espurna/mqtt.cpp @@ -195,14 +195,22 @@ constexpr KeepAlive keepalive() { static_assert(keepalive() >= KeepaliveMin, ""); static_assert(keepalive() <= KeepaliveMax, ""); -PROGMEM_STRING(TopicWill, MQTT_TOPIC_STATUS); +STRING_VIEW_INLINE(TopicWill, MQTT_TOPIC_STATUS); + +constexpr int willQoS() { + return MQTT_STATUS_QOS; +} + +constexpr bool willRetain() { + return 1 == MQTT_STATUS_RETAIN; +} constexpr bool json() { return 1 == MQTT_USE_JSON; } static constexpr auto JsonDelay = espurna::duration::Milliseconds(MQTT_USE_JSON_DELAY); -PROGMEM_STRING(TopicJson, MQTT_TOPIC_JSON); +STRING_VIEW_INLINE(TopicJson, MQTT_TOPIC_JSON); constexpr espurna::duration::Milliseconds skipTime() { return espurna::duration::Milliseconds(MQTT_SKIP_TIME); @@ -252,6 +260,8 @@ PROGMEM_STRING(Retain, "mqttRetain"); PROGMEM_STRING(Keepalive, "mqttKeep"); PROGMEM_STRING(ClientId, "mqttClientID"); PROGMEM_STRING(TopicWill, "mqttWill"); +PROGMEM_STRING(WillQoS, "mqttWillQoS"); +PROGMEM_STRING(WillRetain, "mqttWillRetain"); PROGMEM_STRING(UseJson, "mqttUseJson"); PROGMEM_STRING(TopicJson, "mqttJson"); @@ -328,7 +338,15 @@ String clientId() { } String topicWill() { - return getSetting(keys::TopicWill, espurna::StringView(build::TopicWill)); + return getSetting(keys::TopicWill); +} + +int willQoS() { + return getSetting(keys::WillQoS, build::willQoS()); +} + +bool willRetain() { + return getSetting(keys::WillRetain, build::willRetain()); } bool json() { @@ -336,7 +354,7 @@ bool json() { } String topicJson() { - return getSetting(keys::TopicJson, espurna::StringView(build::TopicJson)); + return getSetting(keys::TopicJson); } espurna::heartbeat::Mode heartbeatMode() { @@ -391,15 +409,17 @@ String NAME () {\ return espurna::settings::internal::serialize(FUNC());\ } -EXACT_VALUE(port, settings::port) -EXACT_VALUE(enabled, settings::enabled) EXACT_VALUE(autoconnect, settings::autoconnect) -EXACT_VALUE(qos, settings::qos) -EXACT_VALUE(retain, settings::retain) -EXACT_VALUE(keepalive, settings::keepalive) -EXACT_VALUE(json, settings::json) -EXACT_VALUE(heartbeatMode, settings::heartbeatMode) +EXACT_VALUE(enabled, settings::enabled) EXACT_VALUE(heartbeatInterval, settings::heartbeatInterval) +EXACT_VALUE(heartbeatMode, settings::heartbeatMode) +EXACT_VALUE(json, settings::json) +EXACT_VALUE(keepalive, settings::keepalive) +EXACT_VALUE(port, settings::port) +EXACT_VALUE(qos, settings::qos) +EXACT_VALUE(willQoS, settings::willQoS) +EXACT_VALUE(willRetain, settings::willRetain) +EXACT_VALUE(retain, settings::retain) EXACT_VALUE(skipTime, settings::skipTime) #undef EXACT_VALUE @@ -407,27 +427,29 @@ EXACT_VALUE(skipTime, settings::skipTime) } // namespace internal static constexpr espurna::settings::query::Setting Settings[] PROGMEM { - {keys::Server, settings::server}, - {keys::Port, internal::port}, - {keys::Enabled, internal::enabled}, {keys::Autoconnect, internal::autoconnect}, - {keys::Topic, settings::topic}, + {keys::ClientId, settings::clientId}, + {keys::Enabled, internal::enabled}, {keys::Getter, settings::getter}, - {keys::Setter, settings::setter}, - {keys::User, settings::user}, + {keys::HeartbeatInterval, internal::heartbeatInterval}, + {keys::HeartbeatMode, internal::heartbeatMode}, + {keys::Keepalive, internal::keepalive}, {keys::Password, settings::password}, + {keys::PayloadOffline, settings::payloadOffline}, + {keys::PayloadOnline, settings::payloadOnline}, + {keys::Port, internal::port}, {keys::QoS, internal::qos}, {keys::Retain, internal::retain}, - {keys::Keepalive, internal::keepalive}, - {keys::ClientId, settings::clientId}, + {keys::Server, settings::server}, + {keys::Setter, settings::setter}, + {keys::SkipTime, internal::skipTime}, + {keys::Topic, settings::topic}, + {keys::TopicJson, settings::topicJson}, {keys::TopicWill, settings::topicWill}, {keys::UseJson, internal::json}, - {keys::TopicJson, settings::topicJson}, - {keys::HeartbeatMode, internal::heartbeatMode}, - {keys::HeartbeatInterval, internal::heartbeatInterval}, - {keys::SkipTime, internal::skipTime}, - {keys::PayloadOnline, settings::payloadOnline}, - {keys::PayloadOffline, settings::payloadOffline}, + {keys::User, settings::user}, + {keys::WillQoS, internal::willQoS}, + {keys::WillRetain, internal::willRetain}, }; bool checkSamePrefix(espurna::StringView key) { @@ -452,6 +474,43 @@ void setup() { namespace { +struct Placeholders { + using Pair = std::pair; + + Placeholders(std::initializer_list pairs) noexcept : + _pairs(pairs) + {} + + void add(espurna::StringView key, String value) noexcept { + _pairs.push_back({key, std::move(value)}); + } + + void add(espurna::StringView key, espurna::StringView value) noexcept { + add(key, value.toString()); + } + + String replace(String value) const { + for (auto& pair : _pairs) { + value.replace(pair.first.toString(), pair.second); + } + + return value; + } + +private: + using Pairs = std::vector; + Pairs _pairs; +}; + +Placeholders make_placeholders() { + return Placeholders({ + {STRING_VIEW("{mac}"), systemChipId().toString()}, + {STRING_VIEW("{chipid}"), systemShortChipId().toString()}, + {STRING_VIEW("{hostname}"), systemHostname()}, + {STRING_VIEW("{magnitude}"), STRING_VIEW("#").toString()}, + }); +} + espurna::duration::Milliseconds _mqtt_skip_time; espurna::ReadyFlag _mqtt_skip_flag; @@ -462,29 +521,36 @@ bool _mqtt_enabled { mqtt::build::enabled() }; bool _mqtt_network { false }; AsyncClientState _mqtt_state { AsyncClientState::Disconnected }; -bool _mqtt_use_json { mqtt::build::json() }; bool _mqtt_forward { false }; +String _mqtt_setter; +String _mqtt_getter; + struct MqttConnectionSettings { + bool ok { false }; + + String server; + uint16_t port { 0 }; + + String clientId; + bool retain { mqtt::build::retain() }; int qos { mqtt::build::qos() }; mqtt::KeepAlive keepalive { mqtt::build::keepalive() }; + String topic; - String topic_json; - String setter; - String getter; String user; String pass; - String will; - String server; - uint16_t port { 0 }; - String clientId; + + String will_topic; + int will_qos; + bool will_retain; }; -static MqttConnectionSettings _mqtt_settings; +MqttConnectionSettings _mqtt_settings; template -static void _mqttApplySetting(Lhs& lhs, Rhs&& rhs) { +void _mqttApplySetting(Lhs& lhs, Rhs&& rhs) { if (lhs != rhs) { lhs = std::forward(rhs); mqttDisconnect(); @@ -493,6 +559,10 @@ static void _mqttApplySetting(Lhs& lhs, Rhs&& rhs) { // Can't have **any** MQTT placeholders but our own `{magnitude}` bool _mqttValidTopicString(espurna::StringView value) { + if (!value.length()) { + return false; + } + size_t hash = 0; size_t plus = 0; for (auto it = value.begin(); it != value.end(); ++it) { @@ -509,8 +579,13 @@ bool _mqttValidTopicString(espurna::StringView value) { return (hash <= 1) && (plus == 0); } -bool _mqttApplyValidTopicString(String& lhs, String&& rhs) { - if (_mqttValidTopicString(rhs)) { +bool _mqttValidSuffix(espurna::StringView value) { + return value.length() == 0 + || value[0] == '/'; +} + +bool _mqttApplyValidSuffixString(String& lhs, String&& rhs) { + if (_mqttValidSuffix(rhs)) { _mqttApplySetting(lhs, std::move(rhs)); return true; } @@ -519,6 +594,68 @@ bool _mqttApplyValidTopicString(String& lhs, String&& rhs) { return false; } +void _mqttApplySuffix(String getter, String setter) { + if (!_mqttApplyValidSuffixString(_mqtt_getter, std::move(getter))) { + _mqtt_settings.ok = false; + return; + } + + if (!_mqttApplyValidSuffixString(_mqtt_setter, std::move(setter))) { + _mqtt_settings.ok = false; + return; + } +} + +void _mqttApplyTopic(String topic) { + if (!_mqttValidTopicString(topic)) { + goto err; + } + + { + // Topic **must** end with some kind of word + const auto last = topic.length() - 1; + if (topic[last] == '/') { + topic.remove(last); + } + + // For simple topics, assume right-hand side contains magnitude + const auto hash = std::count(topic.begin(), topic.end(), '#'); + if (hash == 0) { + topic += STRING_VIEW("/#").toString(); + } else if (hash > 1) { + goto err; + } + } + + _mqttApplySetting(_mqtt_settings.topic, std::move(topic)); + return; + +err: + _mqtt_settings.ok = false; + mqttDisconnect(); +} + +void _mqttApplyWill(String topic) { + if (!_mqttValidTopicString(topic)) { + _mqtt_settings.ok = false; + return; + } + + _mqttApplySetting(_mqtt_settings.will_topic, + std::move(topic)); + _mqttApplySetting(_mqtt_settings.will_qos, + mqtt::settings::willQoS()); + _mqttApplySetting(_mqtt_settings.will_retain, + mqtt::settings::willRetain()); + + _mqttApplySetting( + _mqtt_payload_online, + mqtt::settings::payloadOnline()); + _mqttApplySetting( + _mqtt_payload_offline, + mqtt::settings::payloadOffline()); +} + } // namespace // ----------------------------------------------------------------------------- @@ -561,6 +698,20 @@ size_t _mqtt_json_payload_count { 0ul }; std::forward_list _mqtt_json_payload; espurna::timer::SystemTimer _mqtt_json_payload_flush; +bool _mqtt_json_enabled { mqtt::build::json() }; +String _mqtt_json_topic; + +void _mqttApplyJson(String topic) { + if (!_mqttValidTopicString(topic)) { + _mqtt_json_enabled = false; + _mqtt_settings.ok = false; + return; + } + + _mqttApplySetting(_mqtt_json_topic, std::move(topic)); + _mqttApplySetting(_mqtt_json_enabled, mqtt::settings::json()); +} + } // namespace // ----------------------------------------------------------------------------- @@ -622,9 +773,9 @@ void _mqttSetupAsyncClient(bool secure = false) { _mqtt.setCleanSession(false); _mqtt.setWill( - _mqtt_settings.will.c_str(), - _mqtt_settings.qos, - _mqtt_settings.retain, + _mqtt_settings.will_topic.c_str(), + _mqtt_settings.will_qos, + _mqtt_settings.will_retain, _mqtt_payload_offline.c_str()); if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) { @@ -676,9 +827,9 @@ bool _mqttConnectSyncClient(bool secure = false) { _mqtt.begin(_mqtt_settings.server.c_str(), _mqtt_settings.port, _mqttGetClient(secure)); - _mqtt.setWill(_mqtt_settings.will.c_str(), + _mqtt.setWill(_mqtt_settings.will_topic.c_str(), _mqtt_payload_offline.c_str(), - _mqtt_settings.retain, _mqtt_settings.qos); + _mqtt_settings.will_retain, _mqtt_settings.will_qos); _mqtt.setKeepAlive(_mqtt_settings.keepalive.count()); result = _mqtt.connect( _mqtt_settings.clientId.c_str(), @@ -686,24 +837,24 @@ bool _mqttConnectSyncClient(bool secure = false) { _mqtt_settings.pass.c_str()); #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.setClient(_mqttGetClient(secure)); - _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_port); + _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port); if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) { DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_settings.user.c_str()); result = _mqtt.connect( - _mqtt_settings.clientid.c_str(), + _mqtt_settings.clientId.c_str(), _mqtt_settings.user.c_str(), _mqtt_settings.pass.c_str(), - _mqtt_settings.will.c_str(), - _mqtt_settings.qos, - _mqtt_settings.retain, + _mqtt_settings.will_topic.c_str(), + _mqtt_settings.will_qos, + _mqtt_settings.will_retain, _mqtt_payload_offline.c_str()); } else { result = _mqtt.connect( - _mqtt_settings.clientid.c_str(), - _mqtt_settings.will.c_str(), - _mqtt_settings.qos, - _mqtt_settings.retain, + _mqtt_settings.clientId.c_str(), + _mqtt_settings.will_topic.c_str(), + _mqtt_settings.will_qos, + _mqtt_settings.will_retain, _mqtt_payload_offline.c_str()); } #endif @@ -719,16 +870,6 @@ bool _mqttConnectSyncClient(bool secure = false) { #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT) -String _mqttPlaceholders(String text) { - static const String mac = String(systemChipId()); - text.replace(F("{mac}"), mac); - - text.replace(F("{hostname}"), systemHostname()); - text.replace(F("{magnitude}"), F("#")); - - return text; -} - #if MDNS_SERVER_SUPPORT void _mqttMdnsSchedule(); @@ -741,7 +882,9 @@ void _mqttConfigure() { _mqtt_reconnect_flag.stop(); _mqtt_reconnect_delay = 0; - // Make sure we have both the server to connect to things are enabled + // Before going through the settings, make sure there is SERVER:PORT to connect to + _mqtt_settings.ok = true; + { _mqttApplySetting(_mqtt_settings.server, mqtt::settings::server()); _mqttApplySetting(_mqtt_settings.port, mqtt::settings::port()); @@ -760,65 +903,70 @@ void _mqttConfigure() { _mqttMdnsSchedule(); } #endif + _mqtt_settings.ok = false; return; } } // Get base topic and apply placeholders - { - // Replace things inside curly braces (like {hostname}, {mac} etc.) - auto topic = _mqttPlaceholders(mqtt::settings::topic()); - if (!_mqttValidTopicString(topic)) { - mqttDisconnect(); - return; - } + auto placeholders = make_placeholders(); - // Topic **must** end with some kind of word - if (topic.endsWith("/")) { - topic.remove(topic.length() - 1); - } - - // For simple topics, sssume right-hand side contains magnitude - if (topic.indexOf("#") == -1) { - topic.concat("/#"); - } - - _mqttApplySetting(_mqtt_settings.topic, std::move(topic)); - } + // Replace things inside curly braces (like {hostname}, {mac} etc.) + _mqttApplyTopic(placeholders.replace(mqtt::settings::topic())); // Getter and setter - _mqttApplyValidTopicString(_mqtt_settings.getter, mqtt::settings::getter()); - _mqttApplyValidTopicString(_mqtt_settings.setter, mqtt::settings::setter()); - _mqttApplySetting(_mqtt_forward, - !_mqtt_settings.setter.equals(_mqtt_settings.getter)); + _mqttApplySuffix( + mqtt::settings::getter(), + mqtt::settings::setter()); - // Last will aka status topic - // (note that *must* be after topic updates) - _mqttApplyValidTopicString(_mqtt_settings.will, - mqttTopic(mqtt::settings::topicWill())); + // Avoid re-publishing received data when getter and setter are the same + _mqttApplySetting(_mqtt_forward, !_mqtt_setter.equals(_mqtt_getter)); - // MQTT options - _mqttApplySetting(_mqtt_settings.user, _mqttPlaceholders(mqtt::settings::user())); - _mqttApplySetting(_mqtt_settings.pass, mqtt::settings::password()); + // Last will aka status topic. Should happen *after* topic updates + { + auto will = mqtt::settings::topicWill(); + if (will.length()) { + will = placeholders.replace(std::move(will)); + } else { + will = mqttTopic(mqtt::build::TopicWill.toString()); + } - _mqttApplySetting(_mqtt_settings.clientId, _mqttPlaceholders(mqtt::settings::clientId())); - - _mqttApplySetting(_mqtt_settings.qos, mqtt::settings::qos()); - _mqttApplySetting(_mqtt_settings.retain, mqtt::settings::retain()); - _mqttApplySetting(_mqtt_settings.keepalive, mqtt::settings::keepalive()); + _mqttApplyWill(std::move(will)); + } // MQTT JSON - _mqttApplySetting(_mqtt_use_json, mqtt::settings::json()); - _mqttApplyValidTopicString(_mqtt_settings.topic_json, - mqttTopic(mqtt::settings::topicJson())); + { + auto json = mqtt::settings::topicJson(); + if (json.length()) { + json = placeholders.replace(std::move(json)); + } else { + json = mqttTopic(mqtt::build::TopicJson.toString()); + } + + _mqttApplyJson(std::move(json)); + } + + // MQTT options + _mqttApplySetting(_mqtt_settings.user, + placeholders.replace(mqtt::settings::user())); + _mqttApplySetting(_mqtt_settings.pass, + mqtt::settings::password()); + + _mqttApplySetting(_mqtt_settings.clientId, + placeholders.replace(mqtt::settings::clientId())); + + _mqttApplySetting(_mqtt_settings.qos, + mqtt::settings::qos()); + _mqttApplySetting(_mqtt_settings.retain, + mqtt::settings::retain()); + _mqttApplySetting(_mqtt_settings.keepalive, + mqtt::settings::keepalive()); // Heartbeat messages - _mqttApplySetting(_mqtt_heartbeat_mode, mqtt::settings::heartbeatMode()); - _mqttApplySetting(_mqtt_heartbeat_interval, mqtt::settings::heartbeatInterval()); - - // Custom payload strings - _mqtt_payload_online = mqtt::settings::payloadOnline(); - _mqtt_payload_offline = mqtt::settings::payloadOffline(); + _mqttApplySetting(_mqtt_heartbeat_mode, + mqtt::settings::heartbeatMode()); + _mqttApplySetting(_mqtt_heartbeat_interval, + mqtt::settings::heartbeatInterval()); // Skip messages for the specified time after connecting _mqtt_skip_time = mqtt::settings::skipTime(); @@ -845,8 +993,8 @@ void _mqttMdnsDiscovery() { auto found = mdnsServiceQuery("mqtt", "tcp", [](String&& server, uint16_t port) { DEBUG_MSG_P(PSTR("[MQTT] MDNS found broker at %s:%hu\n"), server.c_str(), port); - setSetting("mqttServer", server); - setSetting("mqttPort", port); + setSetting(mqtt::settings::keys::Server, server); + setSetting(mqtt::settings::keys::Port, port); return true; }); @@ -862,11 +1010,21 @@ void _mqttMdnsDiscovery() { #endif -void _mqttBackwards() { - auto topic = mqtt::settings::topic(); - if (topic.indexOf("{identifier}") > 0) { - topic.replace("{identifier}", "{hostname}"); - setSetting("mqttTopic", topic); +void _mqttSettingsMigrate(int version) { + if (version < 4) { + STRING_VIEW_INLINE(Identifier, "{identifier}"); + STRING_VIEW_INLINE(Hostname, "{hostname}"); + + auto topic = mqtt::settings::topic(); + if (topic.indexOf(Identifier.toString()) > 0) { + topic.replace(Identifier.toString(), Hostname.toString()); + setSetting(mqtt::settings::keys::Topic, topic); + } + } + + if (version < 16) { + delSetting(mqtt::settings::keys::TopicWill); + delSetting(mqtt::settings::keys::TopicJson); } } @@ -969,20 +1127,33 @@ void _mqttWebSocketOnConnected(JsonObject& root) { using mqtt::settings::keys::Server; root[FPSTR(Enabled)] = mqttEnabled(); + root[FPSTR(Server)] = mqtt::settings::server(); root[FPSTR(Port)] = mqtt::settings::port(); + + root[FPSTR(TopicWill)] = mqtt::settings::topicWill(); + root[FPSTR(WillQoS)] = mqtt::settings::willQoS(); + root[FPSTR(WillRetain)] = mqtt::settings::willRetain(); + + root[FPSTR(PayloadOnline)] = mqtt::settings::payloadOnline(); + root[FPSTR(PayloadOffline)] = mqtt::settings::payloadOffline(); + + root[FPSTR(QoS)] = mqtt::settings::qos(); + root[FPSTR(Retain)] = mqtt::settings::retain(); + root[FPSTR(ClientId)] = mqtt::settings::clientId(); + root[FPSTR(Keepalive)] = mqtt::settings::keepalive().count(); + root[FPSTR(User)] = mqtt::settings::user(); root[FPSTR(Password)] = mqtt::settings::password(); - root[FPSTR(Retain)] = mqtt::settings::retain(); - root[FPSTR(Keepalive)] = mqtt::settings::keepalive().count(); - root[FPSTR(ClientId)] = mqtt::settings::clientId(); - root[FPSTR(QoS)] = mqtt::settings::qos(); + + root[FPSTR(Topic)] = mqtt::settings::topic(); + root[FPSTR(UseJson)] = mqtt::settings::json(); + root[FPSTR(TopicJson)] = mqtt::settings::topicJson(); + #if SECURE_CLIENT != SECURE_CLIENT_NONE root[FPSTR(Secure)] = mqtt::settings::secure(); root[FPSTR(Fingerprint)] = mqtt::settings::fingerprint(); #endif - root[FPSTR(Topic)] = mqtt::settings::topic(); - root[FPSTR(UseJson)] = mqtt::settings::json(); } #endif @@ -1206,7 +1377,9 @@ void _mqttOnDisconnect() { espurna::StringView()); } - if (_mqtt_enabled) { + const auto connect = _mqtt_enabled && _mqtt_settings.ok; + + if (connect) { _mqttScheduleConnect(); } else { _mqttStopConnect(); @@ -1214,7 +1387,7 @@ void _mqttOnDisconnect() { DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); - if (_mqtt_enabled && _mqtt_reconnect_delay > 0) { + if (connect && _mqtt_reconnect_delay > 0) { DEBUG_MSG_P(PSTR("[MQTT] Retrying in %u seconds\n"), mqtt::reconnect::delay(_mqtt_reconnect_delay).count()); } @@ -1248,9 +1421,10 @@ void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) { // Force-skip everything received in a short window right after connecting to avoid syncronization issues. -bool _mqttMaybeSkipRetained(char* topic) { +bool _mqttMaybeSkipRetained(espurna::StringView topic) { if (!_mqtt_skip_flag) { - DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic); + DEBUG_MSG_P(PSTR("[MQTT] Received %.*s - SKIPPED\n"), + topic.length(), topic.data()); return true; } @@ -1266,7 +1440,7 @@ bool _mqttMaybeSkipRetained(char* topic) { // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks. -void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties, size_t len, size_t index, size_t total) { +void _mqttOnMessageAsync(char* raw_topic, char* raw_payload, AsyncMqttClientMessageProperties, size_t len, size_t index, size_t total) { static constexpr size_t BufferSize { MQTT_BUFFER_MAX_SIZE }; static_assert(BufferSize > 0, ""); @@ -1274,30 +1448,33 @@ void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessagePrope return; } + auto topic = espurna::StringView{ raw_topic }; if (_mqttMaybeSkipRetained(topic)) { return; } alignas(4) static char buffer[((BufferSize + 3) & ~3) + 4] = {0}; - std::copy(payload, payload + len, &buffer[index]); + std::copy(raw_payload, raw_payload + len, &buffer[index]); // Not done yet if (total != (len + index)) { - DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total); + DEBUG_MSG_P(PSTR("[MQTT] Buffered %.*s => %u / %u bytes\n"), + topic.length(), topic.data(), len, total); return; } buffer[len + index] = '\0'; if (len < mqtt::build::MessageLogMax) { - DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, buffer); + DEBUG_MSG_P(PSTR("[MQTT] Received %.*s => %s\n"), + topic.length(), topic.data(), buffer); } else { - DEBUG_MSG_P(PSTR("[MQTT] Received %s => (%u bytes)\n"), topic, len); + DEBUG_MSG_P(PSTR("[MQTT] Received %.*s => (%u bytes)\n"), + topic.length(), topic.data(), len); } - auto topic_view = espurna::StringView{ topic }; - auto message_view = espurna::StringView{ &buffer[0], &buffer[total] }; + auto message = espurna::StringView{ &buffer[0], &buffer[total] }; for (const auto callback : _mqtt_callbacks) { - callback(MQTT_MESSAGE_EVENT, topic_view, message_view); + callback(MQTT_MESSAGE_EVENT, topic, message); } } @@ -1349,7 +1526,7 @@ espurna::StringView mqttMagnitude(espurna::StringView topic) { using espurna::StringView; StringView out; - const auto pattern = _mqtt_settings.topic + _mqtt_settings.setter; + const auto pattern = _mqtt_settings.topic + _mqtt_setter; auto it = std::find(pattern.begin(), pattern.end(), '#'); if (it == pattern.end()) { return out; @@ -1374,8 +1551,8 @@ static String _mqttTopicWith(String magnitude) { String out; out.reserve(magnitude.length() + _mqtt_settings.topic.length() - + _mqtt_settings.setter.length() - + _mqtt_settings.getter.length()); + + _mqtt_setter.length() + + _mqtt_getter.length()); out += _mqtt_settings.topic; out.replace("#", magnitude); @@ -1385,12 +1562,12 @@ static String _mqttTopicWith(String magnitude) { // When magnitude is a status topic aka getter static String _mqttTopicGetter(String magnitude) { - return _mqttTopicWith(magnitude) + _mqtt_settings.getter; + return _mqttTopicWith(magnitude) + _mqtt_getter; } // When magnitude is an input topic aka setter String _mqttTopicSetter(String magnitude) { - return _mqttTopicWith(magnitude) + _mqtt_settings.setter; + return _mqttTopicWith(magnitude) + _mqtt_setter; } // When magnitude is indexed, append its index to the topic @@ -1458,7 +1635,7 @@ uint16_t mqttSendRaw(const char* topic, const char* message) { } bool mqttSend(const char* topic, const char* message, bool force, bool retain) { - if (!force && _mqtt_use_json) { + if (!force && _mqtt_json_enabled) { mqttEnqueue(topic, message); _mqtt_json_payload_flush.once(mqtt::build::JsonDelay, mqttFlush); return true; @@ -1550,7 +1727,7 @@ void mqttFlush() { _mqtt_json_payload_count = 0; _mqtt_json_payload.clear(); - mqttSendRaw(_mqtt_settings.topic_json.c_str(), output.c_str(), false); + mqttSendRaw(_mqtt_json_topic.c_str(), output.c_str(), false); } void mqttEnqueue(espurna::StringView topic, espurna::StringView payload) { @@ -1687,7 +1864,7 @@ const char* mqttPayloadStatus(bool status) { } void mqttSendStatus() { - mqttSendRaw(_mqtt_settings.will.c_str(), _mqtt_payload_online.c_str(), _mqtt_settings.retain); + mqttSendRaw(_mqtt_settings.will_topic.c_str(), _mqtt_payload_online.c_str(), _mqtt_settings.will_retain); } // ----------------------------------------------------------------------------- @@ -1703,6 +1880,9 @@ void _mqttConnect() { // Do not connect if disabled or no WiFi if (!_mqtt_enabled || !_mqtt_network) return; + // Do not connect if configuration was not clean + if (!_mqtt_settings.ok) return; + // Check reconnect interval if (!_mqtt_reconnect_flag) return; @@ -1760,7 +1940,7 @@ void mqttHeartbeat(espurna::heartbeat::Callback callback) { void mqttSetup() { - _mqttBackwards(); + migrateVersion(_mqttSettingsMigrate); _mqttInfo(); mqtt::settings::query::setup(); diff --git a/code/html/src/panel-mqtt.html b/code/html/src/panel-mqtt.html index b453ef72..05b153ec 100644 --- a/code/html/src/panel-mqtt.html +++ b/code/html/src/panel-mqtt.html @@ -106,6 +106,42 @@ +
+ Status & Will + +
+ + + + Status payload will be sent to this topic. When empty, defaults to <root>/<status> + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+
+
JSON @@ -120,10 +156,10 @@
- - + + - JSON message will be sent to <root>/<name> topic (data by default). + JSON message will be sent to this topic. When empty, defaults to <root>/<data>