diff --git a/src/mqtt_client.c b/src/mqtt_client.c index f284b0207..badcfdea4 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -26,6 +26,32 @@ #include "wolfmqtt/mqtt_client.h" +/* DOCUMENTED BUILD OPTIONS: + * + * WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on + * client struct, write and read. When a pending response is needed its added + * to a linked list and if another thread reads the expected response it is + * flagged, so the other thread knows it completed. + * + * WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE, + * which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the + * transport (socket) has no data. + * + * WOLFMQTT_V5: Enables MQTT v5.0 support + * + * WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to + * allow unlock if no data was sent/received. Note the TLS stack typically + * requires an attempt to write to continue with same write, not different. + * By default if we attempt a write we keep the mutex locked and return + * MQTT_CODE_CONTINUE + * + * WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the + * user. Example: wm_SemInit + * + * WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code. + */ + + /* Private functions */ /* forward declarations */ @@ -221,7 +247,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) MQTT_TRACE_MSG("Warning, recv already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } - /* detect if a write is already in progress */ + /* detect if a read is already in progress */ if (wm_SemLock(&client->lockClient) == 0) { if (client->read.total > 0) { MQTT_TRACE_MSG("Partial read in progress!"); @@ -692,7 +718,8 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf, #ifdef WOLFMQTT_DISCONNECT_CB /* Call disconnect callback with reason code */ if ((packet_obj != NULL) && client->disconnect_cb) { - client->disconnect_cb(client, p_disc->reason_code, client->disconnect_ctx); + client->disconnect_cb(client, p_disc->reason_code, + client->disconnect_ctx); } #endif #else @@ -1087,8 +1114,9 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc <= 0) { #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE && - (client->packet.stat > MQTT_PK_BEGIN || - client->read.total > 0)) { + (client->packet.stat > MQTT_PK_BEGIN || + client->read.total > 0) + ) { /* advance state, since we received some data */ mms_stat->read = MQTT_MSG_HEADER; } @@ -1580,7 +1608,11 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) /* Send connect packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -1986,9 +2018,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, /* Send publish packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; + } #endif client->write.len = 0; /* reset len, so publish chunk resets */ @@ -2175,7 +2212,11 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) /* Send subscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2276,7 +2317,11 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) /* Send unsubscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2369,7 +2414,11 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) /* Send ping req packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2461,8 +2510,11 @@ int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect) rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK /* if disconnect context avail allow partial write in non-blocking mode */ - if (p_disconnect != NULL && - rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2544,7 +2596,11 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) /* Send authentication packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; }