Skip to content

Commit

Permalink
* Fixes for non-blocking publish with payload larger than maximum TX …
Browse files Browse the repository at this point in the history
…buffer. ZD 16769

* Fixes for write non-blocking (would block) edge cases.
* Fix for write position on cancel of message in progress.
* Added check for zero return on read, which is a compatibility layer indicator for disconnect.
  • Loading branch information
dgarske committed Oct 27, 2023
1 parent 02782ea commit c7a4c7b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
49 changes: 41 additions & 8 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
rc = MqttPacket_Write(client, client->tx_buf,
client->write.len);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE)
break;
if (rc == MQTT_CODE_CONTINUE) {
/* keep send mutex locked and return to caller */
return rc;
}
#endif
if (rc == client->write.len) {
rc = 0; /* success */
Expand Down Expand Up @@ -1312,14 +1314,14 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
if (!waitMatchFound) {
/* if we get here, then the we are still waiting for a packet */
mms_stat->read = MQTT_MSG_BEGIN;
MQTT_TRACE_MSG("Wait Again");
#ifdef WOLFMQTT_NONBLOCK
/* for non-blocking return with code continue instead of waiting again
* if called with packet type and id of 'any' */
if (wait_type == MQTT_PACKET_TYPE_ANY && wait_packet_id == 0) {
return MQTT_CODE_CONTINUE;
}
#endif
MQTT_TRACE_MSG("Wait Again");
goto wait_again;
}

Expand Down Expand Up @@ -1679,13 +1681,20 @@ static int MqttClient_Publish_WritePayload(MqttClient *client,
if (client == NULL || publish == NULL)
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);

if (pubCb) {
if (pubCb) { /* use publish callback to get data */
word32 tmp_len = publish->buffer_len;

do {
/* Use the callback to get payload */
if ((client->write.len = pubCb(publish)) < 0) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK);
/* use the client->write.len to handle non-blocking re-entry when
* new publish callback data is needed */
if (client->write.len == 0) {
/* Use the callback to get payload */
if ((client->write.len = pubCb(publish)) < 0) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Publish callback error %d", client->write.len);
#endif
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK);
}
}

if ((word32)client->write.len < publish->buffer_len) {
Expand Down Expand Up @@ -1715,6 +1724,7 @@ static int MqttClient_Publish_WritePayload(MqttClient *client,

publish->buffer_pos += publish->intBuf_pos;
publish->intBuf_pos = 0;
client->write.len = 0; /* reset current write len */

} while (publish->buffer_pos < publish->total_len);
}
Expand Down Expand Up @@ -1746,8 +1756,17 @@ static int MqttClient_Publish_WritePayload(MqttClient *client,

/* Check if we are done sending publish message */
if (publish->buffer_pos < publish->buffer_len) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Publish Write: not done (%d remain)",
publish->buffer_len - publish->buffer_pos);
#endif
return MQTT_CODE_PUB_CONTINUE;
}
#ifdef WOLFMQTT_DEBUG_CLIENT
else {
PRINTF("Publish Write: done");
}
#endif
#else
do {
rc = MqttPacket_Write(client, client->tx_buf, client->write.len);
Expand Down Expand Up @@ -1779,13 +1798,23 @@ static int MqttClient_Publish_WritePayload(MqttClient *client,
/* If transferring more chunks */
publish->buffer_pos += publish->intBuf_pos;
if (publish->buffer_pos < publish->total_len) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Publish Write: chunk (%d remain)",
publish->total_len - publish->buffer_pos);
#endif

/* Build next payload to send */
client->write.len = (publish->total_len - publish->buffer_pos);
if (client->write.len > client->tx_buf_len) {
client->write.len = client->tx_buf_len;
}
rc = MQTT_CODE_PUB_CONTINUE;
}
#ifdef WOLFMQTT_DEBUG_CLIENT
else {
PRINTF("Publish Write: chunked done");
}
#endif
}
}
return rc;
Expand Down Expand Up @@ -1897,6 +1926,9 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
return rc;
}

/* reset client->write.len */
client->write.len = 0;

/* advance state */
publish->stat.write = MQTT_MSG_PAYLOAD;
}
Expand All @@ -1906,7 +1938,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
{
rc = MqttClient_Publish_WritePayload(client, publish, pubCb);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE)
if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE)
return rc;
#endif
#ifdef WOLFMQTT_MULTITHREAD
Expand Down Expand Up @@ -2562,6 +2594,7 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg)
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Cancel Write Lock");
#endif
client->write.pos = 0; /* reset current write position */
mms_stat->isWriteLocked = 0;
wm_SemUnlock(&client->lockSend);
}
Expand Down
7 changes: 5 additions & 2 deletions src/mqtt_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,
client->tls.sockRcRead = 0; /* init value */
rc = wolfSSL_read(client->tls.ssl, (char*)buf, buf_len);
if (rc < 0) {
#if defined(WOLFMQTT_DEBUG_SOCKET) || defined(WOLFSSL_ASYNC_CRYPT)
int error = wolfSSL_get_error(client->tls.ssl, 0);
#endif
#ifdef WOLFMQTT_DEBUG_SOCKET
if (error != WOLFSSL_ERROR_WANT_READ &&
error != WC_PENDING_E) {
Expand All @@ -252,7 +250,12 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,
if (error == WC_PENDING_E) {
rc = MQTT_CODE_CONTINUE;
}
else
#endif
/* used with compatibility layer to communicate peer close */
if (error == WOLFSSL_ERROR_ZERO_RETURN) {
rc = MQTT_CODE_ERROR_NETWORK;
}
}
}
else
Expand Down

0 comments on commit c7a4c7b

Please sign in to comment.