Skip to content

Commit

Permalink
By default keep mutex locked if we tried to write. The wolfSSL TLS en…
Browse files Browse the repository at this point in the history
…gine requires an SSL_Write that returns WANT_WRITE to be called with the same buffer/sz, not a different one, even if no data was sent. If user wants to enable the feature anyways they can use `WOLFMQTT_ALLOW_NODATA_UNLOCK`. Only the write that has this logic as the issue doesn't exist for an SSL_Read.
  • Loading branch information
dgarske committed Nov 24, 2023
1 parent 53d541f commit ecf47f8
Showing 1 changed file with 68 additions and 12 deletions.
80 changes: 68 additions & 12 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,32 @@

#include "wolfmqtt/mqtt_client.h"

/* DOCUMENTED BUILD OPTIONS:
*
* WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on
* client struct, write and read. When a pending response is needed its added
* to a linked list and if another thread reads the expected response it is
* flagged, so the other thread knows it completed.
*
* WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE,
* which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the
* transport (socket) has no data.
*
* WOLFMQTT_V5: Enables MQTT v5.0 support
*
* WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to
* allow unlock if no data was sent/received. Note the TLS stack typically
* requires an attempt to write to continue with same write, not different.
* By default if we attempt a write we keep the mutex locked and return
* MQTT_CODE_CONTINUE
*
* WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the
* user. Example: wm_SemInit
*
* WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code.
*/


/* Private functions */

/* forward declarations */
Expand Down Expand Up @@ -221,7 +247,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
MQTT_TRACE_MSG("Warning, recv already locked!");
rc = MQTT_CODE_ERROR_SYSTEM;
}
/* detect if a write is already in progress */
/* detect if a read is already in progress */
if (wm_SemLock(&client->lockClient) == 0) {
if (client->read.total > 0) {
MQTT_TRACE_MSG("Partial read in progress!");
Expand Down Expand Up @@ -692,7 +718,8 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf,
#ifdef WOLFMQTT_DISCONNECT_CB
/* Call disconnect callback with reason code */
if ((packet_obj != NULL) && client->disconnect_cb) {
client->disconnect_cb(client, p_disc->reason_code, client->disconnect_ctx);
client->disconnect_cb(client, p_disc->reason_code,
client->disconnect_ctx);
}
#endif
#else
Expand Down Expand Up @@ -1087,8 +1114,9 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
if (rc <= 0) {
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE &&
(client->packet.stat > MQTT_PK_BEGIN ||
client->read.total > 0)) {
(client->packet.stat > MQTT_PK_BEGIN ||
client->read.total > 0)
) {
/* advance state, since we received some data */
mms_stat->read = MQTT_MSG_HEADER;
}
Expand Down Expand Up @@ -1580,7 +1608,11 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
/* Send connect packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -1986,9 +2018,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
/* Send publish packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0)
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
#endif
client->write.len = 0; /* reset len, so publish chunk resets */

Expand Down Expand Up @@ -2175,7 +2212,11 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe)
/* Send subscribe packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2276,7 +2317,11 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe)
/* Send unsubscribe packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2369,7 +2414,11 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping)
/* Send ping req packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2461,8 +2510,11 @@ int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect)
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
/* if disconnect context avail allow partial write in non-blocking mode */
if (p_disconnect != NULL &&
rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2544,7 +2596,11 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth)
/* Send authentication packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down

0 comments on commit ecf47f8

Please sign in to comment.