From bc7ae0af80ce2053e1e0992eb5e1f51bd3344896 Mon Sep 17 00:00:00 2001 From: David Garske Date: Mon, 27 Nov 2023 14:08:44 -0800 Subject: [PATCH] Fixed issue with QoS2 on received publish ACK getting skipped if write is already locked. --- src/mqtt_client.c | 61 ++++++++++++++++++++---------------------- wolfmqtt/mqtt_client.h | 2 +- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 4a4294dba..4f1aa7fdd 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -307,7 +307,12 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) if (wm_SemLock(&client->lockClient) == 0) #endif { + /* mark read active */ client->read.isActive = 1; + + /* reset the packet state used by MqttPacket_Read */ + client->packet.stat = MQTT_PK_BEGIN; + #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockClient); #endif @@ -1155,9 +1160,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, return rc; } - /* reset the packet state used by MqttPacket_Read */ - client->packet.stat = MQTT_PK_BEGIN; - mms_stat->read = MQTT_MSG_WAIT; } FALL_THROUGH; @@ -1311,6 +1313,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc >= 0) { rc = MQTT_CODE_SUCCESS; } + else { + /* error, break */ + break; + } #ifdef WOLFMQTT_MULTITHREAD if (pendResp) { @@ -1327,39 +1333,21 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } #endif /* WOLFMQTT_MULTITHREAD */ - /* are we sending ACK or done with message? */ + /* Determine if we are sending ACK or done */ if (MqttIsPubRespPacket(resp.packet_type)) { + /* if we get here, then we are sending an ACK */ mms_stat->read = MQTT_MSG_ACK; - } - else { - mms_stat->read = MQTT_MSG_BEGIN; + mms_stat->ack = MQTT_MSG_WAIT; + + /* setup ACK in shared context */ + XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); + #ifdef WOLFMQTT_V5 + client->packetAck.protocol_level = client->protocol_level; + #endif } /* done reading */ MqttReadStop(client, mms_stat); - - /* if error, leave */ - if (rc != MQTT_CODE_SUCCESS) { - break; - } - - /* if not sending an ACK, we are done */ - if (!MqttIsPubRespPacket(resp.packet_type)) { - break; - } - - /* Flag write active / lock mutex */ - if ((rc = MqttWriteStart(client, mms_stat)) != 0) { - break; - } - - /* setup ACK in shared context */ - XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); - #ifdef WOLFMQTT_V5 - client->packetAck.protocol_level = client->protocol_level; - #endif - - mms_stat->ack = MQTT_MSG_ACK; break; } @@ -1382,10 +1370,19 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, switch (mms_stat->ack) { case MQTT_MSG_BEGIN: - case MQTT_MSG_WAIT: /* wait for read to set ack */ break; + case MQTT_MSG_WAIT: + { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, mms_stat)) != 0) { + break; + } + mms_stat->ack = MQTT_MSG_ACK; + } + FALL_THROUGH; + case MQTT_MSG_ACK: { /* send ack */ @@ -1453,7 +1450,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #endif /* no data read or ack done, then reset state */ - if (mms_stat->read == MQTT_MSG_WAIT || mms_stat->read == MQTT_MSG_ACK) { + if (mms_stat->read == MQTT_MSG_WAIT) { mms_stat->read = MQTT_MSG_BEGIN; } diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index a8bcd7d5f..ddb70afe1 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -115,7 +115,7 @@ enum MqttClientFlags { WOLFMQTT_API word32 MqttClient_Flags(struct _MqttClient *client, word32 mask, word32 flags); typedef enum _MqttPkStat { - MQTT_PK_BEGIN, + MQTT_PK_BEGIN = 0, MQTT_PK_READ_HEAD, MQTT_PK_READ } MqttPkStat;