From 9668659c8942fccccc4ec5dfa3161d501fc674eb Mon Sep 17 00:00:00 2001 From: Eric Blankenhorn Date: Thu, 10 Aug 2023 08:51:43 -0500 Subject: [PATCH] Add MqttClient_PropsAdd_ex for multithreaded apps --- examples/multithread/multithread.c | 84 +++++++++++++++++++++++++----- src/mqtt_client.c | 17 +++--- src/mqtt_packet.c | 56 ++++++++++++++++---- wolfmqtt/mqtt_client.h | 19 +++++-- wolfmqtt/mqtt_packet.h | 1 + 5 files changed, 145 insertions(+), 32 deletions(-) diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 92ce00692..01214bdea 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -58,6 +58,7 @@ static int mNumMsgsDone; #define THREAD_CREATE(h, f, c) ((*h = CreateThread(NULL, 0, f, c, 0, NULL)) == NULL) #define THREAD_JOIN(h, c) WaitForMultipleObjects(c, h, TRUE, INFINITE) #define THREAD_EXIT(e) ExitThread(e) + #define THREAD_SLEEP_MS(ms) Sleep(ms) #else /* Posix (Linux/Mac) */ #include @@ -67,6 +68,7 @@ static int mNumMsgsDone; #define THREAD_CREATE(h, f, c) ({ int ret = pthread_create(h, NULL, f, c); if (ret) { errno = ret; } ret; }) #define THREAD_JOIN(h, c) ({ int ret, x; for(x=0;xprops; + while (prop != NULL) + { + if (prop->type == MQTT_PROP_CONTENT_TYPE) { + PRINTF("Content type: %.*s", prop->data_str.len, + prop->data_str.str); + } + if (prop->type == MQTT_PROP_USER_PROP) { + PRINTF("User property: key=\"%.*s\", value=\"%.*s\"", + prop->data_str.len, prop->data_str.str, + prop->data_str2.len, prop->data_str2.str); + } + prop = prop->next; + } + } + #endif wm_SemUnlock(&mtLock); /* Return negative to terminate publish processing */ @@ -396,6 +417,9 @@ static void *subscribe_task(void *param) uint16_t i; MQTTCtx *mqttCtx = (MQTTCtx*)param; word32 startSec = 0; +#ifdef WOLFMQTT_V5 + MqttProp prop; +#endif /* Build list of topics */ XMEMSET(&mqttCtx->subscribe, 0, sizeof(MqttSubscribe)); @@ -406,10 +430,14 @@ static void *subscribe_task(void *param) #ifdef WOLFMQTT_V5 if (mqttCtx->subId_not_avail != 1) { /* Subscription Identifier */ - MqttProp* prop; - prop = MqttClient_PropsAdd(&mqttCtx->subscribe.props); - prop->type = MQTT_PROP_SUBSCRIPTION_ID; - prop->data_int = DEFAULT_SUB_ID; + XMEMSET(&prop, 0, sizeof(MqttProp)); + prop.type = MQTT_PROP_SUBSCRIPTION_ID; + prop.data_int = DEFAULT_SUB_ID; + rc = MqttClient_PropsAdd_ex(&mqttCtx->subscribe.props, &prop); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Subscribe property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + } } #endif @@ -441,12 +469,6 @@ static void *subscribe_task(void *param) } } -#ifdef WOLFMQTT_V5 - if (mqttCtx->subscribe.props != NULL) { - MqttClient_PropsFree(mqttCtx->subscribe.props); - } -#endif - THREAD_EXIT(0); } @@ -516,8 +538,6 @@ static void *waitMessage_task(void *param) if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) { - rc = (int)XSTRLEN((char*)mqttCtx->rx_buf); - /* Publish Topic */ mqttCtx->stat = WMQ_PUB; XMEMSET(&mqttCtx->publish, 0, sizeof(MqttPublish)); @@ -527,7 +547,8 @@ static void *waitMessage_task(void *param) mqttCtx->publish.topic_name = mqttCtx->topic_name; mqttCtx->publish.packet_id = mqtt_get_packetid_threadsafe(); mqttCtx->publish.buffer = mqttCtx->rx_buf; - mqttCtx->publish.total_len = (word16)rc; + mqttCtx->publish.total_len = + (word16)XSTRLEN((char*)mqttCtx->rx_buf); rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); PRINTF("MQTT Publish: Topic %s, %s (%d)", @@ -544,13 +565,19 @@ static void *waitMessage_task(void *param) } /* Keep Alive handled in ping thread */ + PRINTF("Keep-alive timeout"); + /* Signal keep alive thread */ wm_SemUnlock(&pingSignal); + + /* Allow ping_task to be scheduled */ + THREAD_SLEEP_MS(500); } else if (rc != MQTT_CODE_SUCCESS) { /* There was an error */ PRINTF("MQTT Message Wait Error: %s (%d)", MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); break; } startSec = 0; @@ -575,6 +602,10 @@ static void *publish_task(void *param) MQTTCtx *mqttCtx = (MQTTCtx*)param; MqttPublish publish; word32 startSec = 0; +#ifdef WOLFMQTT_V5 + MqttProp prop1; + MqttProp prop2; +#endif /* Publish Topic */ XMEMSET(&publish, 0, sizeof(MqttPublish)); @@ -588,6 +619,30 @@ static void *publish_task(void *param) buf[5] = '0' + (publish.packet_id % 10); publish.buffer = (byte*)buf; publish.total_len = (word16)XSTRLEN(buf); +#ifdef WOLFMQTT_V5 + /* Payload Format Indicator */ + XMEMSET(&prop1, 0, sizeof(MqttProp)); + prop1.type = MQTT_PROP_PAYLOAD_FORMAT_IND; + prop1.data_byte = 1; + rc = MqttClient_PropsAdd_ex(&publish.props, &prop1); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Publish property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); + } + + /* Content Type */ + XMEMSET(&prop2, 0, sizeof(MqttProp)); + prop2.type = MQTT_PROP_CONTENT_TYPE; + prop2.data_str.str = (char*)"wolf_type"; + prop2.data_str.len = (word16)XSTRLEN(prop2.data_str.str); + rc = MqttClient_PropsAdd_ex(&publish.props, &prop2); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Publish property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); + } +#endif do { rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); @@ -641,6 +696,7 @@ static void *ping_task(void *param) if (rc != MQTT_CODE_SUCCESS) { PRINTF("MQTT Ping Keep Alive Error: %s (%d)", MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); break; } } while (!mqtt_stop_get()); diff --git a/src/mqtt_client.c b/src/mqtt_client.c index c37a443ff..f74f328ba 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1502,7 +1502,7 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) mc_connect->stat.write == MQTT_MSG_AUTH) { MqttAuth auth, *p_auth = &auth; - MqttProp* prop, *conn_prop; + MqttProp prop, *conn_prop; /* Find the AUTH property in the connect structure */ for (conn_prop = mc_connect->props; @@ -1526,14 +1526,14 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) p_auth->reason_code = MQTT_REASON_CONT_AUTH; /* Use the same authentication method property from connect */ - prop = MqttProps_Add(&p_auth->props); - prop->type = MQTT_PROP_AUTH_METHOD; - prop->data_str.str = conn_prop->data_str.str; - prop->data_str.len = conn_prop->data_str.len; + XMEMSET(&prop, 0, sizeof(MqttProp)); + rc = MqttProps_Add_ex(&p_auth->props, &prop); + prop.type = MQTT_PROP_AUTH_METHOD; + prop.data_str.str = conn_prop->data_str.str; + prop.data_str.len = conn_prop->data_str.len; /* Send the AUTH packet */ rc = MqttClient_Auth(client, p_auth); - MqttClient_PropsFree(p_auth->props); #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE) return rc; @@ -2471,6 +2471,11 @@ int MqttClient_PropsFree(MqttProp *head) return MqttProps_Free(head); } +int MqttClient_PropsAdd_ex(MqttProp **head, MqttProp *new_prop) +{ + return MqttProps_Add_ex(head, new_prop); +} + #endif /* WOLFMQTT_V5 */ int MqttClient_WaitMessage_ex(MqttClient *client, MqttObject* msg, diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 63f1d8f56..5377673bc 100644 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -500,6 +500,12 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf, break; } + #ifdef WOLFMQTT_MULTITHREAD + rc = wm_SemLock(&clientPropStack_lock); + if (rc != 0) { + break; + } + #endif /* Decode the Identifier */ rc = MqttDecode_Vbi(buf, (word32*)&cur_prop->type, (word32)(buf_len - (buf - pbuf))); @@ -638,6 +644,9 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf, break; } } + #ifdef WOLFMQTT_MULTITHREAD + (void)wm_SemUnlock(&clientPropStack_lock); + #endif }; if (rc < 0) { @@ -1821,12 +1830,6 @@ MqttProp* MqttProps_Add(MqttProp **head) return NULL; } -#ifdef WOLFMQTT_MULTITHREAD - if (wm_SemLock(&clientPropStack_lock) != 0) { - return NULL; - } -#endif - cur = *head; /* Find the end of the parameter list */ @@ -1836,6 +1839,12 @@ MqttProp* MqttProps_Add(MqttProp **head) }; #ifndef WOLFMQTT_DYN_PROP +#ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&clientPropStack_lock) != 0) { + return NULL; + } +#endif + /* Find a free element */ for (i = 0; i < MQTT_MAX_PROPS; i++) { if (clientPropStack[i].type == MQTT_PROP_NONE) { @@ -1870,7 +1879,7 @@ MqttProp* MqttProps_Add(MqttProp **head) (void)MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PROPERTY); } -#ifdef WOLFMQTT_MULTITHREAD +#if defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_DYN_PROP) (void)wm_SemUnlock(&clientPropStack_lock); #endif @@ -1881,11 +1890,12 @@ MqttProp* MqttProps_Add(MqttProp **head) int MqttProps_Free(MqttProp *head) { int ret = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_MULTITHREAD +#if defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_DYN_PROP) if ((ret = wm_SemLock(&clientPropStack_lock)) != 0) { return ret; } #endif + while (head != NULL) { #ifndef WOLFMQTT_DYN_PROP head->type = MQTT_PROP_NONE; /* available */ @@ -1898,12 +1908,40 @@ int MqttProps_Free(MqttProp *head) head = tmp; #endif } -#ifdef WOLFMQTT_MULTITHREAD +#if defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_DYN_PROP) (void)wm_SemUnlock(&clientPropStack_lock); #endif return ret; } +int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop) +{ + MqttProp *prev = NULL, *cur; + + if ((head == NULL) || (new_prop == NULL)) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); + } + + cur = *head; + + /* Find the end of the parameter list */ + while (cur != NULL) { + prev = cur; + cur = cur->next; + }; + + /* set placeholder until caller sets it to a real type */ + if (prev == NULL) { + /* Start a new list */ + *head = new_prop; + } + else { + /* Add to the existing list */ + prev->next = new_prop; + } + + return MQTT_CODE_SUCCESS; +} #endif /* WOLFMQTT_V5 */ static int MqttPacket_HandleNetError(MqttClient *client, int rc) diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index ac996767d..03a1d27bb 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -395,7 +395,7 @@ WOLFMQTT_API int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping); #ifdef WOLFMQTT_V5 /*! \brief Encodes and sends the MQTT Authentication Request packet and - waits for the Ping Response packet + waits for the broker AUTH packet * \note This is a blocking function that will wait for MqttNet.read * \param client Pointer to MqttClient structure * \param auth Pointer to MqttAuth structure @@ -410,8 +410,10 @@ WOLFMQTT_API int MqttClient_Auth( /*! \brief Add a new property. * Allocate a property structure and add it to the head of the list pointed to by head. To be used prior to calling packet command. + Properties added using this method use the internal stack, and must be + freed using MqttClient_PropsFree after the operation is complete. * \param head Pointer-pointer to a property structure - * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG + * \return pointer to new MqttProp or NULL */ WOLFMQTT_API MqttProp* MqttClient_PropsAdd( MqttProp **head); @@ -424,8 +426,19 @@ WOLFMQTT_API MqttProp* MqttClient_PropsAdd( */ WOLFMQTT_API int MqttClient_PropsFree( MqttProp *head); -#endif +/*! \brief Add a new property. + * Pass in a pointer to a locally allocated property structure and add it to + the head of the list pointed to by head. To be used prior to calling packet + command. + * \param head Pointer-pointer to a property structure + * \param new_prop Pointer to new property structure to be added. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG + */ +WOLFMQTT_API int MqttClient_PropsAdd_ex( + MqttProp **head, MqttProp *new_prop); + +#endif /*! \brief Encodes and sends the MQTT Disconnect packet (no response) * \note This is a non-blocking function that will try and send using diff --git a/wolfmqtt/mqtt_packet.h b/wolfmqtt/mqtt_packet.h index 73e6e0a96..35614c789 100644 --- a/wolfmqtt/mqtt_packet.h +++ b/wolfmqtt/mqtt_packet.h @@ -709,6 +709,7 @@ WOLFMQTT_LOCAL int MqttDecode_Props(MqttPacketType packet, MqttProp** props, WOLFMQTT_LOCAL int MqttProps_Init(void); WOLFMQTT_LOCAL int MqttProps_ShutDown(void); WOLFMQTT_LOCAL MqttProp* MqttProps_Add(MqttProp **head); +WOLFMQTT_LOCAL int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop); WOLFMQTT_LOCAL int MqttProps_Free(MqttProp *head); WOLFMQTT_LOCAL MqttProp* MqttProps_FindType(MqttProp *head, MqttPropertyType type);