diff --git a/.github/workflows/fsanitize-check.yml b/.github/workflows/fsanitize-check.yml index 543e55420..073496907 100644 --- a/.github/workflows/fsanitize-check.yml +++ b/.github/workflows/fsanitize-check.yml @@ -2,7 +2,7 @@ name: fsanitize check Test on: push: - branches: [ '*' ] + branches: [ 'master', 'main', 'release/**' ] pull_request: branches: [ '*' ] @@ -88,4 +88,4 @@ jobs: - name: Show logs on failure if: failure() || cancelled() run: | - more test-suite.log + cat test-suite.log diff --git a/.github/workflows/macos-check.yml b/.github/workflows/macos-check.yml index c748881f8..b79b5af59 100644 --- a/.github/workflows/macos-check.yml +++ b/.github/workflows/macos-check.yml @@ -2,7 +2,7 @@ name: macOS Build Test on: push: - branches: [ '*' ] + branches: [ 'master', 'main', 'release/**' ] pull_request: branches: [ '*' ] diff --git a/.github/workflows/ubuntu-check.yml b/.github/workflows/ubuntu-check.yml index 811233988..cd1ecf65b 100644 --- a/.github/workflows/ubuntu-check.yml +++ b/.github/workflows/ubuntu-check.yml @@ -2,7 +2,7 @@ name: Ubuntu Build Test on: push: - branches: [ '*' ] + branches: [ 'master', 'main', 'release/**' ] pull_request: branches: [ '*' ] @@ -59,13 +59,13 @@ jobs: run: ./configure - name: wolfmqtt make run: make + # Note: this will run the external tests for this CI only - name: wolfmqtt make check run: make check - env: - WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 - - name: wolfmqtt configure with SN Enabled + env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 run: ./configure --enable-sn - name: wolfmqtt make run: make @@ -73,6 +73,8 @@ jobs: run: make check - name: wolfmqtt configure with Non-Block + env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 run: ./configure --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" - name: wolfmqtt make run: make @@ -80,6 +82,8 @@ jobs: run: make check - name: wolfmqtt configure with Non-Block and Multi-threading + env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 run: ./configure --enable-mt --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" - name: wolfmqtt make run: make @@ -87,6 +91,8 @@ jobs: run: make check - name: configure with Multi-threading and WOLFMQTT_DYN_PROP + env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 run: ./configure --enable-mt CFLAGS="-DWOLFMQTT_DYN_PROP" - name: make run: make @@ -97,4 +103,4 @@ jobs: - name: Show logs on failure if: failure() || cancelled() run: | - more test-suite.log + cat test-suite.log diff --git a/.github/workflows/windows-check.yml b/.github/workflows/windows-check.yml index 287d7c8f8..5b9bed0e5 100644 --- a/.github/workflows/windows-check.yml +++ b/.github/workflows/windows-check.yml @@ -2,7 +2,7 @@ name: Windows Build Test on: push: - branches: [ '*' ] + branches: [ 'master', 'main', 'release/**' ] pull_request: branches: [ '*' ] diff --git a/examples/aws/awsiot.c b/examples/aws/awsiot.c index 5f8d701de..cd77c3807 100644 --- a/examples/aws/awsiot.c +++ b/examples/aws/awsiot.c @@ -54,6 +54,7 @@ /* Locals */ static int mStopRead = 0; +static int mTestDone = 0; /* Configuration */ #define APP_HARDWARE "wolf_aws_iot_demo" @@ -66,8 +67,9 @@ static int mStopRead = 0; #define AWSIOT_KEEP_ALIVE_SEC DEFAULT_KEEP_ALIVE_SEC #define AWSIOT_CMD_TIMEOUT_MS DEFAULT_CMD_TIMEOUT_MS -#define AWSIOT_SUBSCRIBE_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update/delta" #define AWSIOT_PUBLISH_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update" +#define AWSIOT_SUBSCRIBE_TOPIC AWSIOT_PUBLISH_TOPIC + #define AWSIOT_PUBLISH_MSG_SZ 400 @@ -293,6 +295,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, if (msg_done) { PRINTF("MQTT Message: Done"); + if (mqttCtx->test_mode) { + mTestDone = 1; + } } /* Return negative to terminate publish processing */ @@ -626,13 +631,6 @@ int awsiot_test(MQTTCtx *mqttCtx) mqttCtx->stat = WMQ_WAIT_MSG; do { - /* check for test mode or stop */ - if (mStopRead || mqttCtx->test_mode) { - rc = MQTT_CODE_SUCCESS; - PRINTF("MQTT Exiting..."); - break; - } - /* Try and read packet */ rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms); @@ -646,8 +644,17 @@ int awsiot_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } + + /* check for test mode or stop */ + if (mStopRead || mTestDone) { + rc = MQTT_CODE_SUCCESS; + mqttCtx->stat = WMQ_DISCONNECT; + PRINTF("MQTT Exiting..."); + break; + } + #ifdef WOLFMQTT_ENABLE_STDIN_CAP - else if (rc == MQTT_CODE_STDIN_WAKE) { + if (rc == MQTT_CODE_STDIN_WAKE) { /* Get data from STDIO */ XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) { @@ -672,8 +679,9 @@ int awsiot_test(MQTTCtx *mqttCtx) MqttClient_ReturnCodeToString(rc), rc); } } + else #endif - else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (rc == MQTT_CODE_ERROR_TIMEOUT) { /* Keep Alive */ PRINTF("Keep-alive timeout, sending ping"); @@ -838,7 +846,7 @@ int awsiot_test(MQTTCtx *mqttCtx) #ifdef ENABLE_AWSIOT_EXAMPLE do { rc = awsiot_test(&mqttCtx); - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); #else diff --git a/examples/azure/azureiothub.c b/examples/azure/azureiothub.c index 5b6fbbbef..463e67256 100644 --- a/examples/azure/azureiothub.c +++ b/examples/azure/azureiothub.c @@ -67,6 +67,7 @@ /* Locals */ static int mStopRead = 0; +static int mTestDone = 0; /* Configuration */ /* Reference: @@ -160,6 +161,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, if (msg_done) { PRINTF("MQTT Message: Done"); + if (mqttCtx->test_mode) { + mTestDone = 1; + } } /* Return negative to terminate publish processing */ @@ -449,13 +453,6 @@ int azureiothub_test(MQTTCtx *mqttCtx) mqttCtx->stat = WMQ_WAIT_MSG; do { - /* check for test mode or stop */ - if (mStopRead || mqttCtx->test_mode) { - rc = MQTT_CODE_SUCCESS; - PRINTF("MQTT Exiting..."); - break; - } - /* Try and read packet */ rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms); @@ -469,8 +466,17 @@ int azureiothub_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } + + /* check for test mode or stop */ + if (mStopRead || mTestDone) { + rc = MQTT_CODE_SUCCESS; + mqttCtx->stat = WMQ_DISCONNECT; + PRINTF("MQTT Exiting..."); + goto disconn; + } + #ifdef WOLFMQTT_ENABLE_STDIN_CAP - else if (rc == MQTT_CODE_STDIN_WAKE) { + if (rc == MQTT_CODE_STDIN_WAKE) { /* Get data from STDIO */ XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) { @@ -493,38 +499,53 @@ int azureiothub_test(MQTTCtx *mqttCtx) MqttClient_ReturnCodeToString(rc), rc); } } + else #endif - else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (rc == MQTT_CODE_ERROR_TIMEOUT) { /* Keep Alive */ PRINTF("Keep-alive timeout, sending ping"); - - rc = MqttClient_Ping_ex(&mqttCtx->client, &mqttCtx->ping); - if (rc == MQTT_CODE_CONTINUE) { - return rc; - } - else if (rc != MQTT_CODE_SUCCESS) { - PRINTF("MQTT Ping Keep Alive Error: %s (%d)", - MqttClient_ReturnCodeToString(rc), rc); - break; - } + mqttCtx->stat = WMQ_PING; + break; } else if (rc != MQTT_CODE_SUCCESS) { /* There was an error */ PRINTF("MQTT Message Wait: %s (%d)", MqttClient_ReturnCodeToString(rc), rc); - break; + goto disconn; } } while (1); + } + FALL_THROUGH; - /* Check for error */ - if (rc != MQTT_CODE_SUCCESS) { - goto disconn; + case WMQ_PING: + { + mqttCtx->stat = WMQ_PING; + + rc = MqttClient_Ping_ex(&mqttCtx->client, &mqttCtx->ping); + if (rc == MQTT_CODE_CONTINUE) { + return rc; + } + else if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Ping Keep Alive Error: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + break; + } + + if (mqttCtx->test_mode) { + PRINTF("MQTT Ping Done, exiting for test mode"); + mTestDone = 1; + } + else { + mqttCtx->stat = WMQ_WAIT_MSG; + break; } } FALL_THROUGH; case WMQ_DISCONNECT: { + mqttCtx->stat = WMQ_DISCONNECT; + /* Disconnect */ rc = MqttClient_Disconnect(&mqttCtx->client); if (rc == MQTT_CODE_CONTINUE) { @@ -559,7 +580,6 @@ int azureiothub_test(MQTTCtx *mqttCtx) } case WMQ_UNSUB: /* not used */ - case WMQ_PING: default: rc = MQTT_CODE_ERROR_STAT; goto exit; @@ -659,7 +679,7 @@ int azureiothub_test(MQTTCtx *mqttCtx) #ifdef ENABLE_AZUREIOTHUB_EXAMPLE do { rc = azureiothub_test(&mqttCtx); - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); #else diff --git a/examples/firmware/fwclient.c b/examples/firmware/fwclient.c index 126105e93..b28914ef7 100644 --- a/examples/firmware/fwclient.c +++ b/examples/firmware/fwclient.c @@ -59,6 +59,7 @@ /* Locals */ static int mStopRead = 0; +static int mTestDone = 0; static byte* mFwBuf; @@ -162,13 +163,13 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, /* Verify this message is for the firmware topic */ if (msg_new && - XMEMCMP(msg->topic_name, FIRMWARE_TOPIC_NAME, + XSTRNCMP(msg->topic_name, mqttCtx->topic_name, msg->topic_name_len) == 0 && !mFwBuf) { /* Allocate buffer for entire message */ /* Note: On an embedded system this could just be a write to flash. - If writting to flash change FIRMWARE_MAX_BUFFER to match + If writing to flash change FIRMWARE_MAX_BUFFER to match block size */ mFwBuf = (byte*)WOLFMQTT_MALLOC(msg->total_len); if (mFwBuf == NULL) { @@ -193,7 +194,7 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, /* for test mode stop client */ if (mqttCtx->test_mode) { - mStopRead = 1; + mTestDone = 1; } } } @@ -325,8 +326,6 @@ int fwclient_test(MQTTCtx *mqttCtx) mqttCtx->subscribe.packet_id = mqtt_get_packetid(); mqttCtx->subscribe.topic_count = 1; mqttCtx->subscribe.topics = mqttCtx->topics; - mqttCtx->topics[0].topic_filter = FIRMWARE_TOPIC_NAME; - mqttCtx->topics[0].qos = mqttCtx->qos; } FALL_THROUGH; @@ -365,13 +364,6 @@ int fwclient_test(MQTTCtx *mqttCtx) rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms); - /* check for test mode */ - if (mStopRead) { - rc = MQTT_CODE_SUCCESS; - PRINTF("MQTT Exiting..."); - break; - } - #ifdef WOLFMQTT_NONBLOCK /* Track elapsed time with no activity and trigger timeout */ rc = mqtt_check_timeout(rc, &mqttCtx->start_sec, @@ -382,7 +374,20 @@ int fwclient_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } - else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + + /* check for test mode */ + if (mStopRead || mTestDone) { + rc = MQTT_CODE_SUCCESS; + mqttCtx->stat = WMQ_DISCONNECT; + PRINTF("MQTT Exiting..."); + break; + } + + if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (mqttCtx->test_mode) { + PRINTF("Timeout in test mode, exit early!"); + mTestDone = 1; + } /* Keep Alive */ PRINTF("Keep-alive timeout, sending ping"); @@ -523,7 +528,9 @@ int fwclient_test(MQTTCtx *mqttCtx) /* init defaults */ mqtt_init_ctx(&mqttCtx); mqttCtx.app_name = "fwclient"; - mqttCtx.client_id = FIRMWARE_CLIIENT_ID; + mqttCtx.client_id = mqtt_append_random(FIRMWARE_CLIIENT_ID, + (word32)XSTRLEN(FIRMWARE_CLIIENT_ID)); + mqttCtx.dynamicClientId = 1; mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; mqttCtx.qos = FIRMWARE_MQTT_QOS; mqttCtx.pub_file = FIRMWARE_DEF_SAVE_AS; @@ -548,7 +555,7 @@ int fwclient_test(MQTTCtx *mqttCtx) #ifdef ENABLE_FIRMWARE_EXAMPLE do { rc = fwclient_test(&mqttCtx); - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); #else diff --git a/examples/firmware/fwpush.c b/examples/firmware/fwpush.c index ae1672d6f..c16bbb77e 100644 --- a/examples/firmware/fwpush.c +++ b/examples/firmware/fwpush.c @@ -121,12 +121,11 @@ static int mqtt_publish_cb(MqttPublish *publish) { /* read a buffer of data from the file */ bytes_read = fread(publish->buffer, 1, publish->buffer_len, cbData->fp); - if (bytes_read != 0) { - ret = (int)bytes_read; - } + ret = (int)bytes_read; } if (cbData->fp && feof(cbData->fp)) { fclose(cbData->fp); + cbData->fp = NULL; } } } @@ -404,8 +403,8 @@ int fwpush_test(MQTTCtx *mqttCtx) } /* Calculate the total payload length and store the FirmwareHeader, - signature, and key in fwpushCBdata structure to be used by the - callback. */ + * signature, and key in FwpushCBdata structure to be used by the + * callback. */ cbData = (FwpushCBdata*)WOLFMQTT_MALLOC(sizeof(FwpushCBdata)); if (cbData == NULL) { rc = MQTT_CODE_ERROR_OUT_OF_BUFFER; @@ -418,7 +417,7 @@ int fwpush_test(MQTTCtx *mqttCtx) (int*)&mqttCtx->publish.total_len); /* The publish->ctx is available for use by the application to pass - data to the callback routine. */ + * data to the callback routine. */ mqttCtx->publish.ctx = cbData; if (rc != 0) { @@ -454,6 +453,8 @@ int fwpush_test(MQTTCtx *mqttCtx) case WMQ_DISCONNECT: { + mqttCtx->stat = WMQ_DISCONNECT; + /* Disconnect */ rc = MqttClient_Disconnect(&mqttCtx->client); if (rc == MQTT_CODE_CONTINUE) { @@ -505,6 +506,7 @@ int fwpush_test(MQTTCtx *mqttCtx) if (rc != MQTT_CODE_CONTINUE) { if (cbData) { + if (cbData->fp) fclose(cbData->fp); if (cbData->data) WOLFMQTT_FREE(cbData->data); WOLFMQTT_FREE(cbData); } @@ -561,7 +563,9 @@ int fwpush_test(MQTTCtx *mqttCtx) /* init defaults */ mqtt_init_ctx(&mqttCtx); mqttCtx.app_name = "fwpush"; - mqttCtx.client_id = FIRMWARE_PUSH_CLIENT_ID; + mqttCtx.client_id = mqtt_append_random(FIRMWARE_PUSH_CLIENT_ID, + (word32)XSTRLEN(FIRMWARE_PUSH_CLIENT_ID)); + mqttCtx.dynamicClientId = 1; mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; mqttCtx.qos = FIRMWARE_MQTT_QOS; mqttCtx.pub_file = FIRMWARE_PUSH_DEF_FILE; @@ -586,7 +590,7 @@ int fwpush_test(MQTTCtx *mqttCtx) #ifdef ENABLE_FIRMWARE_EXAMPLE do { rc = fwpush_test(&mqttCtx); - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); #else diff --git a/examples/firmware/fwpush.h b/examples/firmware/fwpush.h index 1f590a67c..270b593bc 100644 --- a/examples/firmware/fwpush.h +++ b/examples/firmware/fwpush.h @@ -33,7 +33,7 @@ typedef struct FwpushCBdata_s { const char *filename; byte *data; FILE *fp; -}FwpushCBdata; +} FwpushCBdata; /* Exposed functions */ int fwpush_test(MQTTCtx *mqttCtx); diff --git a/examples/mqttclient/mqttclient.c b/examples/mqttclient/mqttclient.c index e8dee1912..8c65813a2 100644 --- a/examples/mqttclient/mqttclient.c +++ b/examples/mqttclient/mqttclient.c @@ -497,11 +497,10 @@ int mqttclient_test(MQTTCtx *mqttCtx) #endif /* This loop allows payloads larger than the buffer to be sent by - repeatedly calling publish. - */ + * repeatedly calling publish. */ do { rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - } while(rc == MQTT_CODE_PUB_CONTINUE); + } while (rc == MQTT_CODE_PUB_CONTINUE || rc == MQTT_CODE_CONTINUE); if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); diff --git a/examples/mqttexample.c b/examples/mqttexample.c index b94380eff..fdfa604a6 100644 --- a/examples/mqttexample.c +++ b/examples/mqttexample.c @@ -154,40 +154,56 @@ static int mqtt_get_rand(byte* data, word32 len) return ret; } -#ifndef TEST_RAND_SZ -#define TEST_RAND_SZ 4 -#endif -static char* mqtt_append_random(const char* inStr, word32 inLen) +int mqtt_fill_random_hexstr(char* buf, word32 bufLen) { - int rc; + int rc = 0; + word32 pos = 0, sz, i; const char kHexChar[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; - byte rndBytes[TEST_RAND_SZ], rndHexStr[TEST_RAND_SZ*2]; - char *tmp = NULL; - - rc = mqtt_get_rand(rndBytes, (word32)sizeof(rndBytes)); - if (rc == 0) { - /* Convert random to hex string */ - int i; - for (i=0; i<(int)sizeof(rndBytes); i++) { - byte in = rndBytes[i]; - rndHexStr[(i*2)] = kHexChar[in >> 4]; - rndHexStr[(i*2)+1] = kHexChar[in & 0xf]; + byte rndBytes[32]; /* fill up to x bytes at a time */ + + while (rc == 0 && pos < bufLen) { + sz = bufLen - pos; + if (sz > (int)sizeof(rndBytes)) + sz = (int)sizeof(rndBytes); + sz /= 2; /* 1 byte expands to 2 bytes */ + + rc = mqtt_get_rand(rndBytes, sz); + if (rc == 0) { + /* Convert random to hex string */ + for (i=0; i> 4]; + buf[pos + (i*2)+1] = kHexChar[in & 0xf]; + } + pos += sz*2; } } - if (rc == 0) { - /* Allocate topic name and client id */ - tmp = (char*)WOLFMQTT_MALLOC(inLen + 1 + sizeof(rndHexStr) + 1); - if (tmp == NULL) { - rc = MQTT_CODE_ERROR_MEMORY; - } + return rc; +} + +#ifndef TEST_RAND_SZ +#define TEST_RAND_SZ 4 +#endif +char* mqtt_append_random(const char* inStr, word32 inLen) +{ + int rc = 0; + char *tmp; + + tmp = (char*)WOLFMQTT_MALLOC(inLen + 1 + (TEST_RAND_SZ*2) + 1); + if (tmp == NULL) { + rc = MQTT_CODE_ERROR_MEMORY; } if (rc == 0) { /* Format: inStr + `_` randhex + null term */ XMEMCPY(tmp, inStr, inLen); tmp[inLen] = '_'; - XMEMCPY(tmp + inLen + 1, rndHexStr, sizeof(rndHexStr)); - tmp[inLen + 1 + sizeof(rndHexStr)] = '\0'; + rc = mqtt_fill_random_hexstr(tmp + inLen + 1, (TEST_RAND_SZ*2)); + tmp[inLen + 1 + (TEST_RAND_SZ*2)] = '\0'; /* null term */ + } + if (rc != 0) { + WOLFMQTT_FREE(tmp); + tmp = NULL; } return tmp; } @@ -347,7 +363,6 @@ int mqtt_parse_args(MQTTCtx* mqttCtx, int argc, char** argv) break; case 'T': - mqttCtx->test_mode = 1; break; @@ -407,7 +422,7 @@ int mqtt_parse_args(MQTTCtx* mqttCtx, int argc, char** argv) /* Remove SNI functionality for sn-client */ if(!XSTRNCMP(mqttCtx->app_name, "sn-client", 10)){ - #ifdef HAVE_SNI + #ifdef HAVE_SNI useSNI=0; #endif } diff --git a/examples/mqttexample.h b/examples/mqttexample.h index 9372a04fa..315f7e186 100644 --- a/examples/mqttexample.h +++ b/examples/mqttexample.h @@ -200,6 +200,9 @@ word16 mqtt_get_packetid(void); int mqtt_check_timeout(int rc, word32* start_sec, word32 timeout_sec); #endif +int mqtt_fill_random_hexstr(char* buf, word32 bufLen); +char* mqtt_append_random(const char* inStr, word32 inLen); + int mqtt_file_load(const char* filePath, byte** fileBuf, int *fileLen); #ifdef WOLFSSL_ENCRYPTED_KEYS diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 92ce00692..b00365b95 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -33,19 +33,22 @@ #include +#ifdef WOLFMQTT_MULTITHREAD + /* Configuration */ -/* Maximum size for network read/write callbacks. There is also a v5 define that - describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ -#define MAX_BUFFER_SIZE 1024 -#define TEST_MESSAGE "test00" /* Number of publish tasks. Each will send a unique message to the broker. */ #define NUM_PUB_TASKS 10 +/* Maximum size for network read/write callbacks. There is also a v5 define that + describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#define MAX_BUFFER_SIZE 1024 -#ifdef WOLFMQTT_MULTITHREAD +/* Total size of test message to build */ +#define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */ /* Locals */ +static char mTestMessage[TEST_MESSAGE_SIZE]; static int mStopRead = 0; static int mNumMsgsRecvd; static int mNumMsgsDone; @@ -100,7 +103,8 @@ static int mqtt_stop_get(void) return rc; } -static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, int packet_type) +static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, + int packet_type, word32 timeoutMs) { /* check for test mode */ if (mqtt_stop_get()) { @@ -114,12 +118,9 @@ static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, int packet PRINTF("Test cancel by setting early timeout"); return MQTT_CODE_ERROR_TIMEOUT; } - else -#else - (void)packet_type; #endif /* Track elapsed time with no activity and trigger timeout */ - rc = mqtt_check_timeout(rc, startSec, mqttCtx->cmd_timeout_ms/1000); + rc = mqtt_check_timeout(rc, startSec, timeoutMs/1000); /* check return code */ if (rc == MQTT_CODE_CONTINUE) { @@ -128,11 +129,13 @@ static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, int packet usleep(100*1000); #endif } -#else - (void)packet_type; - (void)startSec; +#endif /* WOLFMQTT_NONBLOCK */ + (void)mqttCtx; -#endif + (void)startSec; + (void)packet_type; + (void)timeoutMs; + return rc; } @@ -170,17 +173,6 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, /* Print incoming message */ PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos); - - /* for test mode: count the number of TEST_MESSAGE matches received */ - if (mqttCtx->test_mode) { - if (XSTRLEN(TEST_MESSAGE) == msg->buffer_len && - /* Only compare the "test" part */ - XSTRNCMP(TEST_MESSAGE, (char*)msg->buffer, - msg->buffer_len-2) == 0) - { - mNumMsgsRecvd++; - } - } } /* Print message payload */ @@ -194,6 +186,17 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); if (msg_done) { + /* for test mode: count the number of messages received */ + if (mqttCtx->test_mode) { + if (msg->buffer_pos + msg->buffer_len == + (word32)sizeof(mTestMessage) && + XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, + msg->buffer_len) == 0) + { + mNumMsgsRecvd++; + } + } + PRINTF("MQTT Message: Done"); } wm_SemUnlock(&mtLock); @@ -311,7 +314,8 @@ static int multithread_test_init(MQTTCtx *mqttCtx) do { rc = MqttClient_NetConnect(&mqttCtx->client, mqttCtx->host, mqttCtx->port, DEFAULT_CON_TIMEOUT_MS, mqttCtx->use_tls, mqtt_tls_cb); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT, + DEFAULT_CON_TIMEOUT_MS); } while (rc == MQTT_CODE_CONTINUE); PRINTF("MQTT Socket Connect: %s (%d)", @@ -348,7 +352,8 @@ static int multithread_test_init(MQTTCtx *mqttCtx) startSec = 0; do { rc = MqttClient_Connect(&mqttCtx->client, &mqttCtx->connect); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT, + DEFAULT_CON_TIMEOUT_MS); } while (rc == MQTT_CODE_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, @@ -421,7 +426,8 @@ static void *subscribe_task(void *param) do { rc = MqttClient_Subscribe(&mqttCtx->client, &mqttCtx->subscribe); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_SUBSCRIBE); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_SUBSCRIBE, + mqttCtx->cmd_timeout_ms); } while (rc == MQTT_CODE_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, @@ -499,12 +505,14 @@ static void *waitMessage_task(void *param) ){ cmd_timeout_ms = 1000; /* short timeout */ } + /* Try and read packet */ rc = MqttClient_WaitMessage(&mqttCtx->client, cmd_timeout_ms); if (mqttCtx->test_mode && rc == MQTT_CODE_ERROR_TIMEOUT) { rc = 0; } - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY, + cmd_timeout_ms); /* check return code */ if (rc == MQTT_CODE_CONTINUE) { @@ -528,8 +536,10 @@ static void *waitMessage_task(void *param) mqttCtx->publish.packet_id = mqtt_get_packetid_threadsafe(); mqttCtx->publish.buffer = mqttCtx->rx_buf; mqttCtx->publish.total_len = (word16)rc; - rc = MqttClient_Publish(&mqttCtx->client, - &mqttCtx->publish); + do { + rc = MqttClient_Publish(&mqttCtx->client, + &mqttCtx->publish); + } while (rc == MQTT_CODE_CONTINUE); PRINTF("MQTT Publish: Topic %s, %s (%d)", mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); @@ -538,6 +548,7 @@ static void *waitMessage_task(void *param) #endif else if (rc == MQTT_CODE_ERROR_TIMEOUT) { if (mqttCtx->test_mode) { + mqtt_stop_set(); /* timeout in test mode should exit */ PRINTF("MQTT Exiting timeout..."); break; @@ -571,7 +582,6 @@ static void *publish_task(void *param) #endif { int rc; - char buf[7]; MQTTCtx *mqttCtx = (MQTTCtx*)param; MqttPublish publish; word32 startSec = 0; @@ -583,15 +593,13 @@ static void *publish_task(void *param) publish.duplicate = 0; publish.topic_name = mqttCtx->topic_name; publish.packet_id = mqtt_get_packetid_threadsafe(); - XSTRNCPY(buf, TEST_MESSAGE, sizeof(buf)); - buf[4] = '0' + ((publish.packet_id / 10) % 10); - buf[5] = '0' + (publish.packet_id % 10); - publish.buffer = (byte*)buf; - publish.total_len = (word16)XSTRLEN(buf); + publish.buffer = (byte*)mTestMessage; + publish.total_len = sizeof(mTestMessage); do { rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH, + mqttCtx->cmd_timeout_ms); } while (rc == MQTT_CODE_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish); @@ -630,9 +638,11 @@ static void *ping_task(void *param) startSec = 0; XMEMSET(&ping, 0, sizeof(ping)); + do { rc = MqttClient_Ping_ex(&mqttCtx->client, &ping); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PING_REQ); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PING_REQ, + mqttCtx->cmd_timeout_ms); } while (rc == MQTT_CODE_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&ping); @@ -663,7 +673,8 @@ static int unsubscribe_do(MQTTCtx *mqttCtx) /* Unsubscribe Topics */ do { rc = MqttClient_Unsubscribe(&mqttCtx->client, &mqttCtx->unsubscribe); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_UNSUBSCRIBE); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_UNSUBSCRIBE, + mqttCtx->cmd_timeout_ms); } while (rc == MQTT_CODE_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, @@ -681,6 +692,12 @@ int multithread_test(MQTTCtx *mqttCtx) int rc = 0, i, threadCount = 0; THREAD_T threadList[NUM_PUB_TASKS+3]; + /* Build test message */ + rc = mqtt_fill_random_hexstr(mTestMessage, (word32)sizeof(mTestMessage)); + if (rc != 0) { + return rc; + } + rc = multithread_test_init(mqttCtx); if (rc == 0) { if (THREAD_CREATE(&threadList[threadCount++], subscribe_task, mqttCtx)) { @@ -705,6 +722,7 @@ int multithread_test(MQTTCtx *mqttCtx) PRINTF("THREAD_CREATE failed: %d", errno); return -1; } + /* Create threads that publish unique messages */ for (i = 0; i < NUM_PUB_TASKS; i++) { if (THREAD_CREATE(&threadList[threadCount++], publish_task, mqttCtx)) { diff --git a/examples/nbclient/nbclient.c b/examples/nbclient/nbclient.c index 487cad893..cf337310d 100644 --- a/examples/nbclient/nbclient.c +++ b/examples/nbclient/nbclient.c @@ -33,6 +33,7 @@ /* Locals */ static int mStopRead = 0; +static int mTestDone = 0; /* Configuration */ @@ -84,7 +85,7 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, XSTRNCMP(DEFAULT_MESSAGE, (char*)msg->buffer, msg->buffer_len) == 0) { - mStopRead = 1; + mTestDone = 1; } } } @@ -467,13 +468,6 @@ int mqttclient_test(MQTTCtx *mqttCtx) rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms); - /* check for test mode */ - if (mStopRead) { - rc = MQTT_CODE_SUCCESS; - PRINTF("MQTT Exiting..."); - break; - } - /* Track elapsed time with no activity and trigger timeout */ rc = mqtt_check_timeout(rc, &mqttCtx->start_sec, mqttCtx->cmd_timeout_ms/1000); @@ -482,8 +476,17 @@ int mqttclient_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } + + /* check for test mode */ + if (mStopRead || mTestDone) { + rc = MQTT_CODE_SUCCESS; + mqttCtx->stat = WMQ_DISCONNECT; + PRINTF("MQTT Exiting..."); + break; + } + #ifdef WOLFMQTT_ENABLE_STDIN_CAP - else if (rc == MQTT_CODE_STDIN_WAKE) { + if (rc == MQTT_CODE_STDIN_WAKE) { XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) @@ -504,8 +507,9 @@ int mqttclient_test(MQTTCtx *mqttCtx) return rc; } } + else #endif - else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (rc == MQTT_CODE_ERROR_TIMEOUT) { /* Need to send keep-alive ping */ PRINTF("Keep-alive timeout, sending ping"); rc = MQTT_CODE_CONTINUE; @@ -715,7 +719,7 @@ int mqttclient_test(MQTTCtx *mqttCtx) #ifdef WOLFSSL_ASYNC_CRYPT wolfSSL_AsyncPoll(mqttCtx.client.tls.ssl, WOLF_POLL_FLAG_CHECK_HW); #endif - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); #else diff --git a/examples/pub-sub/mqtt-pub.c b/examples/pub-sub/mqtt-pub.c index d14029e92..b994cb68c 100644 --- a/examples/pub-sub/mqtt-pub.c +++ b/examples/pub-sub/mqtt-pub.c @@ -374,11 +374,10 @@ int pub_client(MQTTCtx *mqttCtx) #endif /* This loop allows payloads larger than the buffer to be sent by - repeatedly calling publish. - */ + * repeatedly calling publish. */ do { rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - } while(rc == MQTT_CODE_PUB_CONTINUE); + } while (rc == MQTT_CODE_PUB_CONTINUE); if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); diff --git a/examples/wiot/wiot.c b/examples/wiot/wiot.c index a68cc69b4..9c4e59da8 100644 --- a/examples/wiot/wiot.c +++ b/examples/wiot/wiot.c @@ -38,6 +38,7 @@ /* Locals */ static int mStopRead = 0; +static int mTestDone = 0; /* Configuration */ #define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */ @@ -101,7 +102,7 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, if (mqttCtx->test_mode) { if (XSTRLEN(TEST_MESSAGE) == msg->buffer_len && XSTRNCMP(TEST_MESSAGE, (char*)msg->buffer, msg->buffer_len) == 0) { - mStopRead = 1; + mTestDone = 1; } } } @@ -276,16 +277,28 @@ int wiot_test(MQTTCtx *mqttCtx) rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms); + #ifdef WOLFMQTT_NONBLOCK + /* Track elapsed time with no activity and trigger timeout */ + rc = mqtt_check_timeout(rc, &mqttCtx->start_sec, + mqttCtx->cmd_timeout_ms/1000); + #endif + + /* check return code */ + if (rc == MQTT_CODE_CONTINUE) { + return rc; + } + /* check for test mode */ - if (mStopRead) { + if (mStopRead || mTestDone) { rc = MQTT_CODE_SUCCESS; + mqttCtx->stat = WMQ_DISCONNECT; PRINTF("MQTT Exiting..."); break; } /* check return code */ #ifdef WOLFMQTT_ENABLE_STDIN_CAP - else if (rc == MQTT_CODE_STDIN_WAKE) { + if (rc == MQTT_CODE_STDIN_WAKE) { XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) { rc = (int)XSTRLEN((char*)mqttCtx->rx_buf); @@ -306,8 +319,9 @@ int wiot_test(MQTTCtx *mqttCtx) MqttClient_ReturnCodeToString(rc), rc); } } + else #endif - else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (rc == MQTT_CODE_ERROR_TIMEOUT) { /* Keep Alive */ PRINTF("Keep-alive timeout, sending ping"); @@ -437,7 +451,7 @@ int main(int argc, char** argv) do { rc = wiot_test(&mqttCtx); - } while (rc == MQTT_CODE_CONTINUE); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); mqtt_free_ctx(&mqttCtx); diff --git a/scripts/awsiot.test b/scripts/awsiot.test index a153dcf41..ce7e90472 100755 --- a/scripts/awsiot.test +++ b/scripts/awsiot.test @@ -11,7 +11,6 @@ else def_args="-T -C 2000" # Run with TLS and QoS 0-1 - ./examples/aws/awsiot $def_args -t -q 0 $1 RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nAWS IoT MQTT Client failed! TLS=On, QoS=0" && exit 1 diff --git a/scripts/azureiothub.test b/scripts/azureiothub.test index 37c94d193..9616946a5 100755 --- a/scripts/azureiothub.test +++ b/scripts/azureiothub.test @@ -8,10 +8,11 @@ if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS"; then echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set, won't run" else + # Use short timeout here, since we can't get a publish response to complete test + # So use the timeout and ping response to complete test def_args="-T -C 2000" # Run with TLS and QoS 0-1 - ./examples/azure/azureiothub $def_args -t -q 0 $1 RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nAzureIotHub MQTT Client failed! TLS=On, QoS=0" && exit 1 diff --git a/scripts/firmware.test b/scripts/firmware.test index 6f2ce615f..52926617b 100755 --- a/scripts/firmware.test +++ b/scripts/firmware.test @@ -28,7 +28,7 @@ do_cleanup() { [ ! -x ./examples/firmware/fwpush ] && echo -e "\n\nMQTT Example fwpush doesn't exist" && exit 1 [ ! -x ./examples/firmware/fwclient ] && echo -e "\n\nMQTT Example fwclient doesn't exist" && exit 1 -def_args="-t -T -C 2000" +def_args="-t -T -C 5000 -n wolfMQTT/example/firmware_$((RANDOM))" filein=./examples/publish.dat fileout=./examples/publish.dat.trs @@ -54,16 +54,19 @@ fi grep -F -e 'ENABLE_MQTT_TLS' ./wolfmqtt/options.h ENABLE_MQTT_TLS=$? +# Start firmware client +./examples/firmware/fwclient $def_args -f $fileout $1 & +client_result=$? +[ $client_result -ne 0 ] && echo -e "\n\nMQTT Example fwclient failed!" && do_cleanup "-1" + +# give some time for the client to connect and wait (it starts a new session) +sleep 0.5 + # Start firmware push ./examples/firmware/fwpush $def_args -r -f $filein $1 server_result=$? [ $server_result -ne 0 ] && echo -e "\n\nMQTT Example fwpush failed!" && do_cleanup "-1" -# Start firmware client -./examples/firmware/fwclient $def_args -f $fileout $1 -client_result=$? -[ $client_result -ne 0 ] && echo -e "\n\nMQTT Example fwclient failed!" && do_cleanup "-1" - if [ $ENABLE_MQTT_TLS -ne 1 ]; then # Compare files echo "Comparing files" @@ -74,7 +77,7 @@ fi # End broker do_cleanup "0" - + echo -e "\n\nFirmware Example MQTT Client Tests Passed" exit 0 diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 0a5091bb8..fe3c05f76 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1233,8 +1233,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, rc = MqttPacket_Write(client, client->tx_buf, client->write.len); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) - break; + if (rc == MQTT_CODE_CONTINUE) { + /* keep send mutex locked and return to caller */ + return rc; + } #endif if (rc == client->write.len) { rc = 0; /* success */ @@ -1312,7 +1314,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (!waitMatchFound) { /* if we get here, then the we are still waiting for a packet */ mms_stat->read = MQTT_MSG_BEGIN; - MQTT_TRACE_MSG("Wait Again"); #ifdef WOLFMQTT_NONBLOCK /* for non-blocking return with code continue instead of waiting again * if called with packet type and id of 'any' */ @@ -1320,6 +1321,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, return MQTT_CODE_CONTINUE; } #endif + MQTT_TRACE_MSG("Wait Again"); goto wait_again; } @@ -1679,13 +1681,20 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, if (client == NULL || publish == NULL) return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); - if (pubCb) { + if (pubCb) { /* use publish callback to get data */ word32 tmp_len = publish->buffer_len; do { - /* Use the callback to get payload */ - if ((client->write.len = pubCb(publish)) < 0) { - return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK); + /* use the client->write.len to handle non-blocking re-entry when + * new publish callback data is needed */ + if (client->write.len == 0) { + /* Use the callback to get payload */ + if ((client->write.len = pubCb(publish)) < 0) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish callback error %d", client->write.len); + #endif + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK); + } } if ((word32)client->write.len < publish->buffer_len) { @@ -1715,6 +1724,7 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, publish->buffer_pos += publish->intBuf_pos; publish->intBuf_pos = 0; + client->write.len = 0; /* reset current write len */ } while (publish->buffer_pos < publish->total_len); } @@ -1746,8 +1756,17 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, /* Check if we are done sending publish message */ if (publish->buffer_pos < publish->buffer_len) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish Write: not done (%d remain)", + publish->buffer_len - publish->buffer_pos); + #endif return MQTT_CODE_PUB_CONTINUE; } + #ifdef WOLFMQTT_DEBUG_CLIENT + else { + PRINTF("Publish Write: done"); + } + #endif #else do { rc = MqttPacket_Write(client, client->tx_buf, client->write.len); @@ -1779,6 +1798,11 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, /* If transferring more chunks */ publish->buffer_pos += publish->intBuf_pos; if (publish->buffer_pos < publish->total_len) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish Write: chunk (%d remain)", + publish->total_len - publish->buffer_pos); + #endif + /* Build next payload to send */ client->write.len = (publish->total_len - publish->buffer_pos); if (client->write.len > client->tx_buf_len) { @@ -1786,6 +1810,11 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, } rc = MQTT_CODE_PUB_CONTINUE; } + #ifdef WOLFMQTT_DEBUG_CLIENT + else { + PRINTF("Publish Write: chunked done"); + } + #endif } } return rc; @@ -1897,6 +1926,9 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, return rc; } + /* reset client->write.len */ + client->write.len = 0; + /* advance state */ publish->stat.write = MQTT_MSG_PAYLOAD; } @@ -1906,7 +1938,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, { rc = MqttClient_Publish_WritePayload(client, publish, pubCb); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) + if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) return rc; #endif #ifdef WOLFMQTT_MULTITHREAD @@ -2562,6 +2594,7 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Write Lock"); #endif + client->write.pos = 0; /* reset current write position */ mms_stat->isWriteLocked = 0; wm_SemUnlock(&client->lockSend); } diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index ce6b2cefb..f8888d254 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -123,6 +123,15 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, { int rc; +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testNbWriteAlt = 0; + if (!testNbWriteAlt) { + testNbWriteAlt = 1; + return MQTT_CODE_CONTINUE; + } + testNbWriteAlt = 0; +#endif + #ifdef ENABLE_MQTT_TLS if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) { client->tls.timeout_ms = timeout_ms; @@ -133,8 +142,11 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, int error = wolfSSL_get_error(client->tls.ssl, 0); #endif #ifdef WOLFMQTT_DEBUG_SOCKET - if (error != WOLFSSL_ERROR_WANT_WRITE && - error != WC_PENDING_E) { + if (error != WOLFSSL_ERROR_WANT_WRITE + #ifdef WOLFSSL_ASYNC_CRYPT + && error != WC_PENDING_E + #endif + ) { PRINTF("MqttSocket_WriteDo: SSL Error=%d (rc %d, sockrc %d)", error, rc, client->tls.sockRcWrite); } @@ -152,6 +164,19 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, else #endif /* ENABLE_MQTT_TLS */ { + #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testSmallerWrite = 0; + if (!testSmallerWrite) { + if (buf_len > 100) { + buf_len /= 2; + } + testSmallerWrite = 1; + } + else { + testSmallerWrite = 0; + } + #endif + rc = client->net->write(client->net->context, buf, buf_len, timeout_ms); } @@ -221,12 +246,12 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, int rc; #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbAlt = 0; - if (!testNbAlt) { - testNbAlt = 1; + static int testNbReadAlt = 0; + if (!testNbReadAlt) { + testNbReadAlt = 1; return MQTT_CODE_CONTINUE; } - testNbAlt = 0; + testNbReadAlt = 0; #endif #ifdef ENABLE_MQTT_TLS @@ -235,12 +260,13 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, client->tls.sockRcRead = 0; /* init value */ rc = wolfSSL_read(client->tls.ssl, (char*)buf, buf_len); if (rc < 0) { - #if defined(WOLFMQTT_DEBUG_SOCKET) || defined(WOLFSSL_ASYNC_CRYPT) int error = wolfSSL_get_error(client->tls.ssl, 0); - #endif #ifdef WOLFMQTT_DEBUG_SOCKET - if (error != WOLFSSL_ERROR_WANT_READ && - error != WC_PENDING_E) { + if (error != WOLFSSL_ERROR_WANT_READ + #ifdef WOLFSSL_ASYNC_CRYPT + && error != WC_PENDING_E + #endif + ) { PRINTF("MqttSocket_ReadDo: SSL Error=%d (rc %d, sockrc %d)", error, rc, client->tls.sockRcRead); } @@ -252,7 +278,12 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, if (error == WC_PENDING_E) { rc = MQTT_CODE_CONTINUE; } + else #endif + /* used with compatibility layer to communicate peer close */ + if (error == WOLFSSL_ERROR_ZERO_RETURN) { + rc = MQTT_CODE_ERROR_NETWORK; + } } } else @@ -485,9 +516,12 @@ int MqttSocket_Connect(MqttClient *client, const char* host, word16 port, int errnum = 0; if (client->tls.ssl) { errnum = wolfSSL_get_error(client->tls.ssl, 0); - if (errnum == WOLFSSL_ERROR_WANT_READ || - errnum == WOLFSSL_ERROR_WANT_WRITE || - errnum == WC_PENDING_E) { + if ( errnum == WOLFSSL_ERROR_WANT_READ + || errnum == WOLFSSL_ERROR_WANT_WRITE + #ifdef WOLFSSL_ASYNC_CRYPT + || errnum == WC_PENDING_E + #endif + ) { return MQTT_CODE_CONTINUE; } #ifdef WOLFMQTT_DEBUG_SOCKET