diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 92ce00692..7c3d5c6b4 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -196,6 +196,25 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, if (msg_done) { PRINTF("MQTT Message: Done"); } + #ifdef WOLFMQTT_V5 + { + /* Properties can be checked in the message callback */ + MqttProp *prop = msg->props; + while (prop != NULL) + { + if (prop->type == MQTT_PROP_CONTENT_TYPE) { + PRINTF("Content type: %.*s", prop->data_str.len, + prop->data_str.str); + } + if (prop->type == MQTT_PROP_USER_PROP) { + PRINTF("User property: key=\"%.*s\", value=\"%.*s\"", + prop->data_str.len, prop->data_str.str, + prop->data_str2.len, prop->data_str2.str); + } + prop = prop->next; + } + } + #endif wm_SemUnlock(&mtLock); /* Return negative to terminate publish processing */ @@ -396,6 +415,9 @@ static void *subscribe_task(void *param) uint16_t i; MQTTCtx *mqttCtx = (MQTTCtx*)param; word32 startSec = 0; +#ifdef WOLFMQTT_V5 + MqttProp prop; +#endif /* Build list of topics */ XMEMSET(&mqttCtx->subscribe, 0, sizeof(MqttSubscribe)); @@ -406,10 +428,14 @@ static void *subscribe_task(void *param) #ifdef WOLFMQTT_V5 if (mqttCtx->subId_not_avail != 1) { /* Subscription Identifier */ - MqttProp* prop; - prop = MqttClient_PropsAdd(&mqttCtx->subscribe.props); - prop->type = MQTT_PROP_SUBSCRIPTION_ID; - prop->data_int = DEFAULT_SUB_ID; + XMEMSET(&prop, 0, sizeof(MqttProp)); + prop.type = MQTT_PROP_SUBSCRIPTION_ID; + prop.data_int = DEFAULT_SUB_ID; + rc = MqttClient_PropsAdd_ex(&mqttCtx->subscribe.props, &prop); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Subscribe property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + } } #endif @@ -441,12 +467,6 @@ static void *subscribe_task(void *param) } } -#ifdef WOLFMQTT_V5 - if (mqttCtx->subscribe.props != NULL) { - MqttClient_PropsFree(mqttCtx->subscribe.props); - } -#endif - THREAD_EXIT(0); } @@ -516,8 +536,6 @@ static void *waitMessage_task(void *param) if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) { - rc = (int)XSTRLEN((char*)mqttCtx->rx_buf); - /* Publish Topic */ mqttCtx->stat = WMQ_PUB; XMEMSET(&mqttCtx->publish, 0, sizeof(MqttPublish)); @@ -527,7 +545,8 @@ static void *waitMessage_task(void *param) mqttCtx->publish.topic_name = mqttCtx->topic_name; mqttCtx->publish.packet_id = mqtt_get_packetid_threadsafe(); mqttCtx->publish.buffer = mqttCtx->rx_buf; - mqttCtx->publish.total_len = (word16)rc; + mqttCtx->publish.total_len = + (word16)XSTRLEN((char*)mqttCtx->rx_buf); rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); PRINTF("MQTT Publish: Topic %s, %s (%d)", @@ -544,13 +563,19 @@ static void *waitMessage_task(void *param) } /* Keep Alive handled in ping thread */ + PRINTF("Keep-alive timeout"); + /* Signal keep alive thread */ wm_SemUnlock(&pingSignal); + + /* Allow ping_task to get scheduled */ + sleep(1); } else if (rc != MQTT_CODE_SUCCESS) { /* There was an error */ PRINTF("MQTT Message Wait Error: %s (%d)", MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); break; } startSec = 0; @@ -575,6 +600,10 @@ static void *publish_task(void *param) MQTTCtx *mqttCtx = (MQTTCtx*)param; MqttPublish publish; word32 startSec = 0; +#ifdef WOLFMQTT_V5 + MqttProp prop1; + MqttProp prop2; +#endif /* Publish Topic */ XMEMSET(&publish, 0, sizeof(MqttPublish)); @@ -588,6 +617,30 @@ static void *publish_task(void *param) buf[5] = '0' + (publish.packet_id % 10); publish.buffer = (byte*)buf; publish.total_len = (word16)XSTRLEN(buf); +#ifdef WOLFMQTT_V5 + /* Payload Format Indicator */ + XMEMSET(&prop1, 0, sizeof(MqttProp)); + prop1.type = MQTT_PROP_PAYLOAD_FORMAT_IND; + prop1.data_byte = 1; + rc = MqttClient_PropsAdd_ex(&publish.props, &prop1); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Publish property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); + } + + /* Content Type */ + XMEMSET(&prop2, 0, sizeof(MqttProp)); + prop2.type = MQTT_PROP_CONTENT_TYPE; + prop2.data_str.str = (char*)"wolf_type"; + prop2.data_str.len = (word16)XSTRLEN(prop2.data_str.str); + rc = MqttClient_PropsAdd_ex(&publish.props, &prop2); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Publish property add failure: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); + } +#endif do { rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); @@ -641,6 +694,7 @@ static void *ping_task(void *param) if (rc != MQTT_CODE_SUCCESS) { PRINTF("MQTT Ping Keep Alive Error: %s (%d)", MqttClient_ReturnCodeToString(rc), rc); + mqtt_stop_set(); break; } } while (!mqtt_stop_get()); diff --git a/src/mqtt_client.c b/src/mqtt_client.c index c37a443ff..f74f328ba 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1502,7 +1502,7 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) mc_connect->stat.write == MQTT_MSG_AUTH) { MqttAuth auth, *p_auth = &auth; - MqttProp* prop, *conn_prop; + MqttProp prop, *conn_prop; /* Find the AUTH property in the connect structure */ for (conn_prop = mc_connect->props; @@ -1526,14 +1526,14 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) p_auth->reason_code = MQTT_REASON_CONT_AUTH; /* Use the same authentication method property from connect */ - prop = MqttProps_Add(&p_auth->props); - prop->type = MQTT_PROP_AUTH_METHOD; - prop->data_str.str = conn_prop->data_str.str; - prop->data_str.len = conn_prop->data_str.len; + XMEMSET(&prop, 0, sizeof(MqttProp)); + rc = MqttProps_Add_ex(&p_auth->props, &prop); + prop.type = MQTT_PROP_AUTH_METHOD; + prop.data_str.str = conn_prop->data_str.str; + prop.data_str.len = conn_prop->data_str.len; /* Send the AUTH packet */ rc = MqttClient_Auth(client, p_auth); - MqttClient_PropsFree(p_auth->props); #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE) return rc; @@ -2471,6 +2471,11 @@ int MqttClient_PropsFree(MqttProp *head) return MqttProps_Free(head); } +int MqttClient_PropsAdd_ex(MqttProp **head, MqttProp *new_prop) +{ + return MqttProps_Add_ex(head, new_prop); +} + #endif /* WOLFMQTT_V5 */ int MqttClient_WaitMessage_ex(MqttClient *client, MqttObject* msg, diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 63f1d8f56..64f9a0c20 100644 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -500,6 +500,12 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf, break; } + #ifdef WOLFMQTT_MULTITHREAD + rc = wm_SemLock(&clientPropStack_lock); + if (rc != 0) { + break; + } + #endif /* Decode the Identifier */ rc = MqttDecode_Vbi(buf, (word32*)&cur_prop->type, (word32)(buf_len - (buf - pbuf))); @@ -638,6 +644,9 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf, break; } } + #ifdef WOLFMQTT_MULTITHREAD + (void)wm_SemUnlock(&clientPropStack_lock); + #endif }; if (rc < 0) { @@ -1904,6 +1913,34 @@ int MqttProps_Free(MqttProp *head) return ret; } +int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop) +{ + MqttProp *prev = NULL, *cur; + + if ((head == NULL) || (new_prop == NULL)) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); + } + + cur = *head; + + /* Find the end of the parameter list */ + while (cur != NULL) { + prev = cur; + cur = cur->next; + }; + + /* set placeholder until caller sets it to a real type */ + if (prev == NULL) { + /* Start a new list */ + *head = new_prop; + } + else { + /* Add to the existing list */ + prev->next = new_prop; + } + + return MQTT_CODE_SUCCESS; +} #endif /* WOLFMQTT_V5 */ static int MqttPacket_HandleNetError(MqttClient *client, int rc) diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index ac996767d..67b482d6d 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -410,8 +410,14 @@ WOLFMQTT_API int MqttClient_Auth( /*! \brief Add a new property. * Allocate a property structure and add it to the head of the list pointed to by head. To be used prior to calling packet command. + Properties added using this method use the internal stack, and must be + freed using MqttClient_PropsFree after the operation is complete. + + Note: + This API is not thread-safe, use MqttClient_PropsAdd_ex with multi-threaded + applications. * \param head Pointer-pointer to a property structure - * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG + * \return pointer to new MqttProp or NULL */ WOLFMQTT_API MqttProp* MqttClient_PropsAdd( MqttProp **head); @@ -424,8 +430,17 @@ WOLFMQTT_API MqttProp* MqttClient_PropsAdd( */ WOLFMQTT_API int MqttClient_PropsFree( MqttProp *head); -#endif +/*! \brief Add a new property. + * Allocate a property structure and add it to the head of the list + pointed to by head. To be used prior to calling packet command. + * \param head Pointer-pointer to a property structure + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG + */ +WOLFMQTT_API int MqttClient_PropsAdd_ex( + MqttProp **head, MqttProp *new_prop); + +#endif /*! \brief Encodes and sends the MQTT Disconnect packet (no response) * \note This is a non-blocking function that will try and send using diff --git a/wolfmqtt/mqtt_packet.h b/wolfmqtt/mqtt_packet.h index 73e6e0a96..35614c789 100644 --- a/wolfmqtt/mqtt_packet.h +++ b/wolfmqtt/mqtt_packet.h @@ -709,6 +709,7 @@ WOLFMQTT_LOCAL int MqttDecode_Props(MqttPacketType packet, MqttProp** props, WOLFMQTT_LOCAL int MqttProps_Init(void); WOLFMQTT_LOCAL int MqttProps_ShutDown(void); WOLFMQTT_LOCAL MqttProp* MqttProps_Add(MqttProp **head); +WOLFMQTT_LOCAL int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop); WOLFMQTT_LOCAL int MqttProps_Free(MqttProp *head); WOLFMQTT_LOCAL MqttProp* MqttProps_FindType(MqttProp *head, MqttPropertyType type);