diff --git a/src/httpserver/http_fns.c b/src/httpserver/http_fns.c index a5c38a685..3ce6c1340 100644 --- a/src/httpserver/http_fns.c +++ b/src/httpserver/http_fns.c @@ -1293,17 +1293,25 @@ int http_fn_ha_discovery(http_request_t* request) { if (relayCount > 0) { for (i = 0; i < CHANNEL_MAX; i++) { if (h_isChannelRelay(i)) { + if (dev_info != NULL) { + MQTT_QueuePublish(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN); + hass_free_device_info(dev_info); + } dev_info = hass_init_relay_device_info(i); - MQTT_QueuePublish(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN); - hass_free_device_info(dev_info); } } + + //Invoke publishChannles after the last topic + if (dev_info != NULL) { + MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, PublishChannels); + hass_free_device_info(dev_info); + } } if (pwmCount == 5 || isLedDriverChipRunning()) { // Enable + RGB control + CW control dev_info = hass_init_light_device_info(ENTITY_LIGHT_RGBCW); - MQTT_QueuePublish(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN); + MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, PublishChannels); hass_free_device_info(dev_info); } else if (pwmCount > 0) { @@ -1323,7 +1331,7 @@ int http_fn_ha_discovery(http_request_t* request) { } if (dev_info != NULL) { - MQTT_QueuePublish(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN); + MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, PublishChannels); hass_free_device_info(dev_info); } } diff --git a/src/mqtt/new_mqtt.c b/src/mqtt/new_mqtt.c index 3c2346977..0d8a0975c 100644 --- a/src/mqtt/new_mqtt.c +++ b/src/mqtt/new_mqtt.c @@ -884,16 +884,12 @@ OBK_Publish_Result MQTT_ChannelPublish(int channel, int flags) } // This console command will trigger a publish of all used variables (channels and extra stuff) OBK_Publish_Result MQTT_PublishAll(const void* context, const char* cmd, const char* args, int cmdFlags) { - - MQTT_PublishWholeDeviceState_Internal(false); - + MQTT_PublishWholeDeviceState_Internal(true); return 1;// TODO make return values consistent for all console commands } // This console command will trigger a publish of runtime variables OBK_Publish_Result MQTT_PublishChannels(const void* context, const char* cmd, const char* args, int cmdFlags) { - - MQTT_PublishWholeDeviceState_Internal(true); - + MQTT_PublishOnlyDeviceChannelsIfPossible(); return 1;// TODO make return values consistent for all console commands } OBK_Publish_Result MQTT_PublishCommand(const void* context, const char* cmd, const char* args, int cmdFlags) { @@ -937,9 +933,9 @@ void MQTT_init() mqtt_initialised = 1; - CMD_RegisterCommand("publish", "", MQTT_PublishCommand, "Sqqq", NULL); - CMD_RegisterCommand("publishAll", "", MQTT_PublishAll, "Sqqq", NULL); - CMD_RegisterCommand("publishChannels", "", MQTT_PublishChannels, "Sqqq", NULL); + CMD_RegisterCommand(MQTT_COMMAND_PUBLISH, "", MQTT_PublishCommand, "Sqqq", NULL); + CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_ALL, "", MQTT_PublishAll, "Sqqq", NULL); + CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_CHANNELS, "", MQTT_PublishChannels, "Sqqq", NULL); } OBK_Publish_Result MQTT_DoItemPublishString(const char* sChannel, const char* valueStr) @@ -1186,12 +1182,13 @@ MqttPublishItem_t* find_queue_reusable_item(MqttPublishItem_t* head) { return head; } -/// @brief Queue an entry for publish. +/// @brief Queue an entry for publish and execute a command after the publish. /// @param topic /// @param channel /// @param value /// @param flags -void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) { +/// @param command Command to execute after the publish +void MQTT_QueuePublishWithCommand(char* topic, char* channel, char* value, int flags, PostPublishCommands command) { if (g_MqttPublishItemsQueued >= MQTT_MAX_QUEUE_SIZE) { addLogAdv(LOG_ERROR, LOG_FEATURE_MQTT, "Unable to queue! %i items already present\r\n", g_MqttPublishItemsQueued); return; @@ -1227,12 +1224,23 @@ void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) { os_strcpy(newItem->topic, topic); os_strcpy(newItem->channel, channel); os_strcpy(newItem->value, value); + newItem->command = command; newItem->flags = flags; g_MqttPublishItemsQueued++; addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Queued topic=%s/%s %i, items queued", newItem->topic, newItem->channel, g_MqttPublishItemsQueued); } +/// @brief Queue an entry for publish. +/// @param topic +/// @param channel +/// @param value +/// @param flags +void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) { + MQTT_QueuePublishWithCommand(topic, channel, value, flags, None); +} + + /// @brief Publish MQTT_QUEUED_ITEMS_PUBLISHED_AT_ONCE queued items. /// @return OBK_Publish_Result PublishQueuedItems() { @@ -1253,6 +1261,17 @@ OBK_Publish_Result PublishQueuedItems() { //Stop if last publish failed if (result != OBK_PUBLISH_OK) break; + + switch (head->command) { + case None: + break; + case PublishAll: + CMD_ExecuteCommand(MQTT_COMMAND_PUBLISH_ALL, COMMAND_FLAG_SOURCE_MQTT); + break; + case PublishChannels: + CMD_ExecuteCommand(MQTT_COMMAND_PUBLISH_CHANNELS, COMMAND_FLAG_SOURCE_MQTT); + break; + } } else { //addLogAdv(LOG_INFO,LOG_FEATURE_MQTT,"PublishQueuedItems item skipped reusable"); diff --git a/src/mqtt/new_mqtt.h b/src/mqtt/new_mqtt.h index 2af4e389a..33dbf0395 100644 --- a/src/mqtt/new_mqtt.h +++ b/src/mqtt/new_mqtt.h @@ -41,6 +41,13 @@ typedef struct obk_mqtt_request_tag { #define MQTT_PUBLISH_ITEM_CHANNEL_LENGTH 64 #define MQTT_PUBLISH_ITEM_VALUE_LENGTH 1023 +typedef enum PostPublishCommands_e { + None, + PublishAll, + PublishChannels +} PostPublishCommands; + + /// @brief Publish queue item typedef struct MqttPublishItem { @@ -49,8 +56,14 @@ typedef struct MqttPublishItem char value[MQTT_PUBLISH_ITEM_VALUE_LENGTH]; int flags; struct MqttPublishItem* next; + PostPublishCommands command; } MqttPublishItem_t; +#define MQTT_COMMAND_PUBLISH "publish" +#define MQTT_COMMAND_PUBLISH_ALL "publishAll" +#define MQTT_COMMAND_PUBLISH_CHANNELS "publishChannels" + + // Count of queued items published at once. #define MQTT_QUEUED_ITEMS_PUBLISHED_AT_ONCE 3 #define MQTT_MAX_QUEUE_SIZE 7 @@ -82,6 +95,7 @@ OBK_Publish_Result MQTT_PublishMain_StringString(const char* sChannel, const cha OBK_Publish_Result MQTT_ChannelChangeCallback(int channel, int iVal); void MQTT_PublishOnlyDeviceChannelsIfPossible(); void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags); +void MQTT_QueuePublishWithCommand(char* topic, char* channel, char* value, int flags, PostPublishCommands command); OBK_Publish_Result MQTT_Publish(char* sTopic, char* sChannel, char* value, int flags); bool MQTT_IsReady();