Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By default keep mutex locked if we tried to write #374

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 68 additions & 12 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,32 @@

#include "wolfmqtt/mqtt_client.h"

/* DOCUMENTED BUILD OPTIONS:
*
* WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on
* client struct, write and read. When a pending response is needed its added
* to a linked list and if another thread reads the expected response it is
* flagged, so the other thread knows it completed.
*
* WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE,
* which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the
* transport (socket) has no data.
*
* WOLFMQTT_V5: Enables MQTT v5.0 support
*
* WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to
* allow unlock if no data was sent/received. Note the TLS stack typically
* requires an attempt to write to continue with same write, not different.
* By default if we attempt a write we keep the mutex locked and return
* MQTT_CODE_CONTINUE
*
* WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the
* user. Example: wm_SemInit
*
* WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code.
*/


/* Private functions */

/* forward declarations */
Expand Down Expand Up @@ -221,7 +247,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
MQTT_TRACE_MSG("Warning, recv already locked!");
rc = MQTT_CODE_ERROR_SYSTEM;
}
/* detect if a write is already in progress */
/* detect if a read is already in progress */
if (wm_SemLock(&client->lockClient) == 0) {
if (client->read.total > 0) {
MQTT_TRACE_MSG("Partial read in progress!");
Expand Down Expand Up @@ -692,7 +718,8 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf,
#ifdef WOLFMQTT_DISCONNECT_CB
/* Call disconnect callback with reason code */
if ((packet_obj != NULL) && client->disconnect_cb) {
client->disconnect_cb(client, p_disc->reason_code, client->disconnect_ctx);
client->disconnect_cb(client, p_disc->reason_code,
client->disconnect_ctx);
}
#endif
#else
Expand Down Expand Up @@ -1087,8 +1114,9 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
if (rc <= 0) {
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE &&
(client->packet.stat > MQTT_PK_BEGIN ||
client->read.total > 0)) {
(client->packet.stat > MQTT_PK_BEGIN ||
client->read.total > 0)
) {
/* advance state, since we received some data */
mms_stat->read = MQTT_MSG_HEADER;
}
Expand Down Expand Up @@ -1580,7 +1608,11 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
/* Send connect packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -1986,9 +2018,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
/* Send publish packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0)
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
#endif
client->write.len = 0; /* reset len, so publish chunk resets */

Expand Down Expand Up @@ -2175,7 +2212,11 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe)
/* Send subscribe packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2276,7 +2317,11 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe)
/* Send unsubscribe packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2369,7 +2414,11 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping)
/* Send ping req packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2461,8 +2510,11 @@ int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect)
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
/* if disconnect context avail allow partial write in non-blocking mode */
if (p_disconnect != NULL &&
rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down Expand Up @@ -2544,7 +2596,11 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth)
/* Send authentication packet */
rc = MqttPacket_Write(client, client->tx_buf, xfer);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) {
if (rc == MQTT_CODE_CONTINUE
#ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
&& client->write.total > 0
#endif
) {
/* keep send locked and return early */
return rc;
}
Expand Down
Loading