Skip to content

Commit

Permalink
* Fix for non-blocking publish with payload larger than maximum TX bu…
Browse files Browse the repository at this point in the history
…ffer. ZD 16769

* Fix to make sure ctrl+c is honored during a want read/write case.
* Fix the firmware update example to properly synchronize publish and use a unique topic name.
* Refactor of the read/write and improved handling for partial write use-cases.
* Improve multi-thread test message to use larger size (> tx but) and in test mode check for actual message.
* Improve remote test done logic.
  • Loading branch information
dgarske committed Oct 26, 2023
1 parent 02782ea commit 72643e1
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 165 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/fsanitize-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: fsanitize check Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down Expand Up @@ -88,4 +88,4 @@ jobs:
- name: Show logs on failure
if: failure() || cancelled()
run: |
more test-suite.log
cat test-suite.log
2 changes: 1 addition & 1 deletion .github/workflows/macos-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: macOS Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/ubuntu-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Ubuntu Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down Expand Up @@ -59,13 +59,13 @@ jobs:
run: ./configure
- name: wolfmqtt make
run: make
# Note: this will run the external tests for this CI only
- name: wolfmqtt make check
run: make check

env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1

- name: wolfmqtt configure with SN Enabled
env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1
run: ./configure --enable-sn
- name: wolfmqtt make
run: make
Expand Down Expand Up @@ -97,4 +97,4 @@ jobs:
- name: Show logs on failure
if: failure() || cancelled()
run: |
more test-suite.log
cat test-suite.log
2 changes: 1 addition & 1 deletion .github/workflows/windows-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Windows Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down
30 changes: 19 additions & 11 deletions examples/aws/awsiot.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

/* Locals */
static int mStopRead = 0;
static int mTestDone = 0;

/* Configuration */
#define APP_HARDWARE "wolf_aws_iot_demo"
Expand All @@ -66,8 +67,9 @@ static int mStopRead = 0;
#define AWSIOT_KEEP_ALIVE_SEC DEFAULT_KEEP_ALIVE_SEC
#define AWSIOT_CMD_TIMEOUT_MS DEFAULT_CMD_TIMEOUT_MS

#define AWSIOT_SUBSCRIBE_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update/delta"
#define AWSIOT_PUBLISH_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update"
#define AWSIOT_SUBSCRIBE_TOPIC AWSIOT_PUBLISH_TOPIC


#define AWSIOT_PUBLISH_MSG_SZ 400

Expand Down Expand Up @@ -293,6 +295,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

if (msg_done) {
PRINTF("MQTT Message: Done");
if (mqttCtx->test_mode) {
mTestDone = 1;
}
}

/* Return negative to terminate publish processing */
Expand Down Expand Up @@ -626,13 +631,6 @@ int awsiot_test(MQTTCtx *mqttCtx)
mqttCtx->stat = WMQ_WAIT_MSG;

do {
/* check for test mode or stop */
if (mStopRead || mqttCtx->test_mode) {
rc = MQTT_CODE_SUCCESS;
PRINTF("MQTT Exiting...");
break;
}

/* Try and read packet */
rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms);

Expand All @@ -646,8 +644,17 @@ int awsiot_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}

/* check for test mode or stop */
if (mStopRead || mTestDone) {
rc = MQTT_CODE_SUCCESS;
mqttCtx->stat = WMQ_DISCONNECT;
PRINTF("MQTT Exiting...");
break;
}

#ifdef WOLFMQTT_ENABLE_STDIN_CAP
else if (rc == MQTT_CODE_STDIN_WAKE) {
if (rc == MQTT_CODE_STDIN_WAKE) {
/* Get data from STDIO */
XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE);
if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) {
Expand All @@ -672,8 +679,9 @@ int awsiot_test(MQTTCtx *mqttCtx)
MqttClient_ReturnCodeToString(rc), rc);
}
}
else
#endif
else if (rc == MQTT_CODE_ERROR_TIMEOUT) {
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
/* Keep Alive */
PRINTF("Keep-alive timeout, sending ping");

Expand Down Expand Up @@ -838,7 +846,7 @@ int awsiot_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_AWSIOT_EXAMPLE
do {
rc = awsiot_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
34 changes: 24 additions & 10 deletions examples/azure/azureiothub.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

/* Locals */
static int mStopRead = 0;
static int mTestDone = 0;

/* Configuration */
/* Reference:
Expand Down Expand Up @@ -160,6 +161,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

if (msg_done) {
PRINTF("MQTT Message: Done");
if (mqttCtx->test_mode) {
mTestDone = 1;
}
}

/* Return negative to terminate publish processing */
Expand Down Expand Up @@ -449,13 +453,6 @@ int azureiothub_test(MQTTCtx *mqttCtx)
mqttCtx->stat = WMQ_WAIT_MSG;

do {
/* check for test mode or stop */
if (mStopRead || mqttCtx->test_mode) {
rc = MQTT_CODE_SUCCESS;
PRINTF("MQTT Exiting...");
break;
}

/* Try and read packet */
rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms);

Expand All @@ -469,8 +466,17 @@ int azureiothub_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}

/* check for test mode or stop */
if (mStopRead || mTestDone) {
rc = MQTT_CODE_SUCCESS;
mqttCtx->stat = WMQ_DISCONNECT;
PRINTF("MQTT Exiting...");
break;
}

#ifdef WOLFMQTT_ENABLE_STDIN_CAP
else if (rc == MQTT_CODE_STDIN_WAKE) {
if (rc == MQTT_CODE_STDIN_WAKE) {
/* Get data from STDIO */
XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE);
if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) {
Expand All @@ -493,8 +499,9 @@ int azureiothub_test(MQTTCtx *mqttCtx)
MqttClient_ReturnCodeToString(rc), rc);
}
}
else
#endif
else if (rc == MQTT_CODE_ERROR_TIMEOUT) {
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
/* Keep Alive */
PRINTF("Keep-alive timeout, sending ping");

Expand All @@ -507,6 +514,13 @@ int azureiothub_test(MQTTCtx *mqttCtx)
MqttClient_ReturnCodeToString(rc), rc);
break;
}
else {
if (mqttCtx->test_mode) {
PRINTF("MQTT Ping Done, exiting for test mode");
mTestDone = 1;
break;
}
}
}
else if (rc != MQTT_CODE_SUCCESS) {
/* There was an error */
Expand Down Expand Up @@ -659,7 +673,7 @@ int azureiothub_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_AZUREIOTHUB_EXAMPLE
do {
rc = azureiothub_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
37 changes: 22 additions & 15 deletions examples/firmware/fwclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

/* Locals */
static int mStopRead = 0;
static int mTestDone = 0;
static byte* mFwBuf;


Expand Down Expand Up @@ -162,13 +163,13 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

/* Verify this message is for the firmware topic */
if (msg_new &&
XMEMCMP(msg->topic_name, FIRMWARE_TOPIC_NAME,
XSTRNCMP(msg->topic_name, mqttCtx->topic_name,
msg->topic_name_len) == 0 &&
!mFwBuf)
{
/* Allocate buffer for entire message */
/* Note: On an embedded system this could just be a write to flash.
If writting to flash change FIRMWARE_MAX_BUFFER to match
If writing to flash change FIRMWARE_MAX_BUFFER to match
block size */
mFwBuf = (byte*)WOLFMQTT_MALLOC(msg->total_len);
if (mFwBuf == NULL) {
Expand All @@ -193,7 +194,7 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

/* for test mode stop client */
if (mqttCtx->test_mode) {
mStopRead = 1;
mTestDone = 1;
}
}
}
Expand Down Expand Up @@ -325,8 +326,6 @@ int fwclient_test(MQTTCtx *mqttCtx)
mqttCtx->subscribe.packet_id = mqtt_get_packetid();
mqttCtx->subscribe.topic_count = 1;
mqttCtx->subscribe.topics = mqttCtx->topics;
mqttCtx->topics[0].topic_filter = FIRMWARE_TOPIC_NAME;
mqttCtx->topics[0].qos = mqttCtx->qos;
}
FALL_THROUGH;

Expand Down Expand Up @@ -365,13 +364,6 @@ int fwclient_test(MQTTCtx *mqttCtx)
rc = MqttClient_WaitMessage(&mqttCtx->client,
mqttCtx->cmd_timeout_ms);

/* check for test mode */
if (mStopRead) {
rc = MQTT_CODE_SUCCESS;
PRINTF("MQTT Exiting...");
break;
}

#ifdef WOLFMQTT_NONBLOCK
/* Track elapsed time with no activity and trigger timeout */
rc = mqtt_check_timeout(rc, &mqttCtx->start_sec,
Expand All @@ -382,7 +374,20 @@ int fwclient_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
else if (rc == MQTT_CODE_ERROR_TIMEOUT) {

/* check for test mode */
if (mStopRead || mTestDone) {
rc = MQTT_CODE_SUCCESS;
mqttCtx->stat = WMQ_DISCONNECT;
PRINTF("MQTT Exiting...");
break;
}

if (rc == MQTT_CODE_ERROR_TIMEOUT) {
if (mqttCtx->test_mode) {
PRINTF("Timeout in test mode, exit early!");
mTestDone = 1;
}
/* Keep Alive */
PRINTF("Keep-alive timeout, sending ping");

Expand Down Expand Up @@ -523,7 +528,9 @@ int fwclient_test(MQTTCtx *mqttCtx)
/* init defaults */
mqtt_init_ctx(&mqttCtx);
mqttCtx.app_name = "fwclient";
mqttCtx.client_id = FIRMWARE_CLIIENT_ID;
mqttCtx.client_id = mqtt_append_random(FIRMWARE_CLIIENT_ID,
(word32)XSTRLEN(FIRMWARE_CLIIENT_ID));
mqttCtx.dynamicClientId = 1;
mqttCtx.topic_name = FIRMWARE_TOPIC_NAME;
mqttCtx.qos = FIRMWARE_MQTT_QOS;
mqttCtx.pub_file = FIRMWARE_DEF_SAVE_AS;
Expand All @@ -548,7 +555,7 @@ int fwclient_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_FIRMWARE_EXAMPLE
do {
rc = fwclient_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
18 changes: 10 additions & 8 deletions examples/firmware/fwpush.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ static int mqtt_publish_cb(MqttPublish *publish) {
/* read a buffer of data from the file */
bytes_read = fread(publish->buffer, 1, publish->buffer_len,
cbData->fp);
if (bytes_read != 0) {
ret = (int)bytes_read;
}
ret = (int)bytes_read;
}
if (cbData->fp && feof(cbData->fp)) {
fclose(cbData->fp);
cbData->fp = NULL;
}
}
}
Expand Down Expand Up @@ -404,8 +403,8 @@ int fwpush_test(MQTTCtx *mqttCtx)
}

/* Calculate the total payload length and store the FirmwareHeader,
signature, and key in fwpushCBdata structure to be used by the
callback. */
* signature, and key in FwpushCBdata structure to be used by the
* callback. */
cbData = (FwpushCBdata*)WOLFMQTT_MALLOC(sizeof(FwpushCBdata));
if (cbData == NULL) {
rc = MQTT_CODE_ERROR_OUT_OF_BUFFER;
Expand All @@ -418,7 +417,7 @@ int fwpush_test(MQTTCtx *mqttCtx)
(int*)&mqttCtx->publish.total_len);

/* The publish->ctx is available for use by the application to pass
data to the callback routine. */
* data to the callback routine. */
mqttCtx->publish.ctx = cbData;

if (rc != 0) {
Expand Down Expand Up @@ -505,6 +504,7 @@ int fwpush_test(MQTTCtx *mqttCtx)

if (rc != MQTT_CODE_CONTINUE) {
if (cbData) {
if (cbData->fp) fclose(cbData->fp);
if (cbData->data) WOLFMQTT_FREE(cbData->data);
WOLFMQTT_FREE(cbData);
}
Expand Down Expand Up @@ -561,7 +561,9 @@ int fwpush_test(MQTTCtx *mqttCtx)
/* init defaults */
mqtt_init_ctx(&mqttCtx);
mqttCtx.app_name = "fwpush";
mqttCtx.client_id = FIRMWARE_PUSH_CLIENT_ID;
mqttCtx.client_id = mqtt_append_random(FIRMWARE_PUSH_CLIENT_ID,
(word32)XSTRLEN(FIRMWARE_PUSH_CLIENT_ID));
mqttCtx.dynamicClientId = 1;
mqttCtx.topic_name = FIRMWARE_TOPIC_NAME;
mqttCtx.qos = FIRMWARE_MQTT_QOS;
mqttCtx.pub_file = FIRMWARE_PUSH_DEF_FILE;
Expand All @@ -586,7 +588,7 @@ int fwpush_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_FIRMWARE_EXAMPLE
do {
rc = fwpush_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
Loading

0 comments on commit 72643e1

Please sign in to comment.