Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for non-blocking with larger payload and improvements to the test and examples #373

Merged
merged 9 commits into from
Nov 28, 2023
11 changes: 7 additions & 4 deletions examples/aws/awsiot.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ static int mTestDone = 0;
#define APP_HARDWARE "wolf_aws_iot_demo"
#define APP_FIRMWARE_VERSION LIBWOLFMQTT_VERSION_STRING

#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 512 /* Maximum size for network read/write callbacks */
#endif
#define AWSIOT_HOST "a2dujmi05ideo2-ats.iot.us-west-2.amazonaws.com"
#define AWSIOT_DEVICE_ID "demoDevice"
#define AWSIOT_QOS MQTT_QOS_1
Expand Down Expand Up @@ -615,8 +617,9 @@ int awsiot_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}
Expand Down Expand Up @@ -674,8 +677,8 @@ int awsiot_test(MQTTCtx *mqttCtx)
mqttCtx->publish.buffer = (byte*)mqttCtx->app_ctx;
mqttCtx->publish.total_len = (word32)XSTRLEN((char*)mqttCtx->app_ctx);
rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
11 changes: 7 additions & 4 deletions examples/azure/azureiothub.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ static int mTestDone = 0;
* https://azure.microsoft.com/en-us/documentation/articles/iot-hub-sas-tokens/#using-sas-tokens-as-a-device
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support
*/
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */
#endif
#define AZURE_API_VERSION "?api-version=2018-06-30"
#define AZURE_HOST "wolfMQTT.azure-devices.net"
#define AZURE_DEVICE_ID "demoDevice"
Expand Down Expand Up @@ -437,8 +439,9 @@ int azureiothub_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}
Expand Down Expand Up @@ -494,8 +497,8 @@ int azureiothub_test(MQTTCtx *mqttCtx)
mqttCtx->publish.total_len = (word16)rc;

rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
2 changes: 2 additions & 0 deletions examples/firmware/fwclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
#include "examples/mqttnet.h"

/* Configuration */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET
#endif

/* Locals */
static int mStopRead = 0;
Expand Down
6 changes: 4 additions & 2 deletions examples/firmware/fwpush.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
#include "examples/mqttnet.h"

/* Configuration */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET
#endif

/* Locals */
static int mStopRead = 0;
Expand Down Expand Up @@ -453,8 +455,8 @@ int fwpush_test(MQTTCtx *mqttCtx)
return rc;
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
Expand Down
12 changes: 8 additions & 4 deletions examples/mqttclient/mqttclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ static int mStopRead = 0;

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

#ifdef WOLFMQTT_PROPERTY_CB
#define MAX_CLIENT_ID_LEN 64
Expand Down Expand Up @@ -504,10 +506,12 @@ int mqttclient_test(MQTTCtx *mqttCtx)

if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) {
WOLFMQTT_FREE(mqttCtx->publish.buffer);
mqttCtx->publish.buffer = NULL;
mqttCtx->pub_file = NULL; /* don't try and send file again */
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
#ifdef WOLFMQTT_V5
if (mqttCtx->qos > 0) {
Expand Down Expand Up @@ -572,8 +576,8 @@ int mqttclient_test(MQTTCtx *mqttCtx)
mqttCtx->publish.total_len = (word16)rc;
rc = MqttClient_Publish(&mqttCtx->client,
&mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
52 changes: 52 additions & 0 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ typedef struct MulticastCtx {
} MulticastCtx;
#endif

#ifndef WOLFMQTT_TEST_NONBLOCK_TIMES
#define WOLFMQTT_TEST_NONBLOCK_TIMES 1
#endif

/* Private functions */

/* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -643,11 +647,16 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
int timeout_ms)
{
SocketContext *sock = (SocketContext*)context;
MQTTCtx* mqttCtx;
int rc;
SOERROR_T so_error = 0;
#ifndef WOLFMQTT_NO_TIMEOUT
struct timeval tv;
#endif
#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbWriteAlt = 0;
static int testSmallerWrite = 0;
#endif

if (context == NULL || buf == NULL || buf_len <= 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -656,6 +665,27 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
if (sock->fd == SOCKET_INVALID)
return MQTT_CODE_ERROR_BAD_ARG;

mqttCtx = sock->mqttCtx;
(void)mqttCtx;

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (mqttCtx->useNonBlockMode) {
if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbWriteAlt++;
return MQTT_CODE_CONTINUE;
}
testNbWriteAlt = 0;
if (!testSmallerWrite) {
if (buf_len > 2)
buf_len /= 2;
testSmallerWrite = 1;
}
else {
testSmallerWrite = 0;
}
}
#endif

#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
Expand Down Expand Up @@ -706,6 +736,10 @@ static int NetRead_ex(void *context, byte* buf, int buf_len,
fd_set errfds;
struct timeval tv;
#endif
#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbReadAlt = 0;
static int testSmallerRead = 0;
#endif

if (context == NULL || buf == NULL || buf_len <= 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -721,6 +755,24 @@ static int NetRead_ex(void *context, byte* buf, int buf_len,
mqttCtx = sock->mqttCtx;
(void)mqttCtx;

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (mqttCtx->useNonBlockMode) {
if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbReadAlt++;
return MQTT_CODE_CONTINUE;
}
testNbReadAlt = 0;
if (!testSmallerRead) {
if (buf_len > 2)
buf_len /= 2;
testSmallerRead = 1;
}
else {
testSmallerRead = 0;
}
}
#endif

#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
Expand Down
5 changes: 3 additions & 2 deletions examples/mqttsimple/mqttsimple.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ int mqttsimple_test(void)
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s",
mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer);
PRINTF("MQTT Publish: Topic %s, ID %d, Qos %d, Message %s",
mqttObj.publish.topic_name, mqttObj.publish.packet_id,
mqttObj.publish.qos, mqttObj.publish.buffer);

/* Wait for messages */
while (1) {
Expand Down
78 changes: 48 additions & 30 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
/* Configuration */

/* Number of publish tasks. Each will send a unique message to the broker. */
#define NUM_PUB_TASKS 10
#define NUM_PUB_TASKS 5
#define NUM_PUB_PER_TASK 2

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

/* Total size of test message to build */
#define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */
Expand Down Expand Up @@ -468,7 +471,8 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
Expand Down Expand Up @@ -555,8 +559,8 @@ static void *waitMessage_task(void *param)
MqttClient_CancelMessage(&mqttCtx->client,
(MqttObject*)&mqttCtx->publish);
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down Expand Up @@ -596,37 +600,51 @@ static DWORD WINAPI publish_task( LPVOID param )
static void *publish_task(void *param)
#endif
{
int rc;
int rc[NUM_PUB_PER_TASK], i;
MQTTCtx *mqttCtx = (MQTTCtx*)param;
MqttPublish publish;
word32 startSec = 0;

/* Publish Topic */
XMEMSET(&publish, 0, sizeof(MqttPublish));
publish.retain = 0;
publish.qos = mqttCtx->qos;
publish.duplicate = 0;
publish.topic_name = mqttCtx->topic_name;
publish.packet_id = mqtt_get_packetid_threadsafe();
publish.buffer = (byte*)mTestMessage;
publish.total_len = sizeof(mTestMessage);
MqttPublish publish[NUM_PUB_PER_TASK];
word32 startSec[NUM_PUB_PER_TASK];

/* Build publish */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
/* Publish Topic */
XMEMSET(&publish[i], 0, sizeof(MqttPublish));
publish[i].retain = 0;
publish[i].qos = mqttCtx->qos;
publish[i].duplicate = 0;
publish[i].topic_name = mqttCtx->topic_name;
publish[i].packet_id = mqtt_get_packetid_threadsafe();
publish[i].buffer = (byte*)mTestMessage;
publish[i].total_len = sizeof(mTestMessage);

rc[i] = MQTT_CODE_CONTINUE;
startSec[i] = 0;
}

do {
rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL);
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH,
mqttCtx->cmd_timeout_ms);
} while (rc == MQTT_CODE_CONTINUE);
if (rc != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish);
/* Send until != continue */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
while (rc[i] == MQTT_CODE_CONTINUE) {
rc[i] = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish[i],
NULL);
rc[i] = check_response(mqttCtx, rc[i], &startSec[i],
MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms);
}
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
publish.topic_name,
MqttClient_ReturnCodeToString(rc), rc);
/* Report result */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
if (rc[i] != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish[i]);
}

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
publish[i].topic_name, publish[i].packet_id,
MqttClient_ReturnCodeToString(rc[i]), rc[i]);

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
}

THREAD_EXIT(0);
}
Expand Down
8 changes: 6 additions & 2 deletions examples/nbclient/nbclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ static int mTestDone = 0;
/* Configuration */

/* Maximum size for network read/write callbacks. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

#ifdef WOLFMQTT_PROPERTY_CB
#define MAX_CLIENT_ID_LEN 64
Expand Down Expand Up @@ -444,10 +446,12 @@ int mqttclient_test(MQTTCtx *mqttCtx)

if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) {
WOLFMQTT_FREE(mqttCtx->publish.buffer);
mqttCtx->publish.buffer = NULL;
mqttCtx->pub_file = NULL; /* don't try and send file again */
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
Expand Down
Loading
Loading