Skip to content

Commit

Permalink
Fixed issue with QoS2 on received publish ACK getting skipped if writ…
Browse files Browse the repository at this point in the history
…e is already locked.
  • Loading branch information
dgarske committed Nov 27, 2023
1 parent 8a0cafe commit bc7ae0a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
61 changes: 29 additions & 32 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,12 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
/* mark read active */
client->read.isActive = 1;

/* reset the packet state used by MqttPacket_Read */
client->packet.stat = MQTT_PK_BEGIN;

#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
Expand Down Expand Up @@ -1155,9 +1160,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
return rc;
}

/* reset the packet state used by MqttPacket_Read */
client->packet.stat = MQTT_PK_BEGIN;

mms_stat->read = MQTT_MSG_WAIT;
}
FALL_THROUGH;
Expand Down Expand Up @@ -1311,6 +1313,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
if (rc >= 0) {
rc = MQTT_CODE_SUCCESS;
}
else {
/* error, break */
break;
}

#ifdef WOLFMQTT_MULTITHREAD
if (pendResp) {
Expand All @@ -1327,39 +1333,21 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
}
#endif /* WOLFMQTT_MULTITHREAD */

/* are we sending ACK or done with message? */
/* Determine if we are sending ACK or done */
if (MqttIsPubRespPacket(resp.packet_type)) {
/* if we get here, then we are sending an ACK */
mms_stat->read = MQTT_MSG_ACK;
}
else {
mms_stat->read = MQTT_MSG_BEGIN;
mms_stat->ack = MQTT_MSG_WAIT;

/* setup ACK in shared context */
XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp));
#ifdef WOLFMQTT_V5
client->packetAck.protocol_level = client->protocol_level;
#endif
}

/* done reading */
MqttReadStop(client, mms_stat);

/* if error, leave */
if (rc != MQTT_CODE_SUCCESS) {
break;
}

/* if not sending an ACK, we are done */
if (!MqttIsPubRespPacket(resp.packet_type)) {
break;
}

/* Flag write active / lock mutex */
if ((rc = MqttWriteStart(client, mms_stat)) != 0) {
break;
}

/* setup ACK in shared context */
XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp));
#ifdef WOLFMQTT_V5
client->packetAck.protocol_level = client->protocol_level;
#endif

mms_stat->ack = MQTT_MSG_ACK;
break;
}

Expand All @@ -1382,10 +1370,19 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
switch (mms_stat->ack)
{
case MQTT_MSG_BEGIN:
case MQTT_MSG_WAIT:
/* wait for read to set ack */
break;

case MQTT_MSG_WAIT:
{
/* Flag write active / lock mutex */
if ((rc = MqttWriteStart(client, mms_stat)) != 0) {
break;
}
mms_stat->ack = MQTT_MSG_ACK;
}
FALL_THROUGH;

case MQTT_MSG_ACK:
{
/* send ack */
Expand Down Expand Up @@ -1453,7 +1450,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
#endif

/* no data read or ack done, then reset state */
if (mms_stat->read == MQTT_MSG_WAIT || mms_stat->read == MQTT_MSG_ACK) {
if (mms_stat->read == MQTT_MSG_WAIT) {
mms_stat->read = MQTT_MSG_BEGIN;
}

Expand Down
2 changes: 1 addition & 1 deletion wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ enum MqttClientFlags {
WOLFMQTT_API word32 MqttClient_Flags(struct _MqttClient *client, word32 mask, word32 flags);

typedef enum _MqttPkStat {
MQTT_PK_BEGIN,
MQTT_PK_BEGIN = 0,
MQTT_PK_READ_HEAD,
MQTT_PK_READ
} MqttPkStat;
Expand Down

0 comments on commit bc7ae0a

Please sign in to comment.