diff --git a/examples/aws/awsiot.c b/examples/aws/awsiot.c index cd77c3807..8ae5516c7 100644 --- a/examples/aws/awsiot.c +++ b/examples/aws/awsiot.c @@ -60,7 +60,9 @@ static int mTestDone = 0; #define APP_HARDWARE "wolf_aws_iot_demo" #define APP_FIRMWARE_VERSION LIBWOLFMQTT_VERSION_STRING +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 512 /* Maximum size for network read/write callbacks */ +#endif #define AWSIOT_HOST "a2dujmi05ideo2-ats.iot.us-west-2.amazonaws.com" #define AWSIOT_DEVICE_ID "demoDevice" #define AWSIOT_QOS MQTT_QOS_1 @@ -615,8 +617,9 @@ int awsiot_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -674,8 +677,8 @@ int awsiot_test(MQTTCtx *mqttCtx) mqttCtx->publish.buffer = (byte*)mqttCtx->app_ctx; mqttCtx->publish.total_len = (word32)XSTRLEN((char*)mqttCtx->app_ctx); rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/azure/azureiothub.c b/examples/azure/azureiothub.c index 463e67256..80189cbc7 100644 --- a/examples/azure/azureiothub.c +++ b/examples/azure/azureiothub.c @@ -76,7 +76,9 @@ static int mTestDone = 0; * https://azure.microsoft.com/en-us/documentation/articles/iot-hub-sas-tokens/#using-sas-tokens-as-a-device * https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */ +#endif #define AZURE_API_VERSION "?api-version=2018-06-30" #define AZURE_HOST "wolfMQTT.azure-devices.net" #define AZURE_DEVICE_ID "demoDevice" @@ -437,8 +439,9 @@ int azureiothub_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -494,8 +497,8 @@ int azureiothub_test(MQTTCtx *mqttCtx) mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/firmware/fwclient.c b/examples/firmware/fwclient.c index 351eaf4b4..22259fd8d 100644 --- a/examples/firmware/fwclient.c +++ b/examples/firmware/fwclient.c @@ -55,7 +55,9 @@ #include "examples/mqttnet.h" /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET +#endif /* Locals */ static int mStopRead = 0; diff --git a/examples/firmware/fwpush.c b/examples/firmware/fwpush.c index 7b2ee2ad6..fbefbdc90 100644 --- a/examples/firmware/fwpush.c +++ b/examples/firmware/fwpush.c @@ -56,7 +56,9 @@ #include "examples/mqttnet.h" /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET +#endif /* Locals */ static int mStopRead = 0; @@ -453,8 +455,8 @@ int fwpush_test(MQTTCtx *mqttCtx) return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; diff --git a/examples/mqttclient/mqttclient.c b/examples/mqttclient/mqttclient.c index 8c65813a2..7869a18ff 100644 --- a/examples/mqttclient/mqttclient.c +++ b/examples/mqttclient/mqttclient.c @@ -34,7 +34,9 @@ static int mStopRead = 0; /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -504,10 +506,12 @@ int mqttclient_test(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); #ifdef WOLFMQTT_V5 if (mqttCtx->qos > 0) { @@ -572,8 +576,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/mqttnet.c b/examples/mqttnet.c index acd9d3340..0a73a2f30 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -32,6 +32,10 @@ typedef struct MulticastCtx { } MulticastCtx; #endif +#ifndef WOLFMQTT_TEST_NONBLOCK_TIMES +#define WOLFMQTT_TEST_NONBLOCK_TIMES 1 +#endif + /* Private functions */ /* -------------------------------------------------------------------------- */ @@ -643,11 +647,16 @@ static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) { SocketContext *sock = (SocketContext*)context; + MQTTCtx* mqttCtx; int rc; SOERROR_T so_error = 0; #ifndef WOLFMQTT_NO_TIMEOUT struct timeval tv; #endif +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testNbWriteAlt = 0; + static int testSmallerWrite = 0; +#endif if (context == NULL || buf == NULL || buf_len <= 0) { return MQTT_CODE_ERROR_BAD_ARG; @@ -656,6 +665,27 @@ static int NetWrite(void *context, const byte* buf, int buf_len, if (sock->fd == SOCKET_INVALID) return MQTT_CODE_ERROR_BAD_ARG; + mqttCtx = sock->mqttCtx; + (void)mqttCtx; + +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + if (mqttCtx->useNonBlockMode) { + if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbWriteAlt++; + return MQTT_CODE_CONTINUE; + } + testNbWriteAlt = 0; + if (!testSmallerWrite) { + if (buf_len > 2) + buf_len /= 2; + testSmallerWrite = 1; + } + else { + testSmallerWrite = 0; + } + } +#endif + #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); @@ -706,6 +736,10 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, fd_set errfds; struct timeval tv; #endif +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testNbReadAlt = 0; + static int testSmallerRead = 0; +#endif if (context == NULL || buf == NULL || buf_len <= 0) { return MQTT_CODE_ERROR_BAD_ARG; @@ -721,6 +755,24 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, mqttCtx = sock->mqttCtx; (void)mqttCtx; +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + if (mqttCtx->useNonBlockMode) { + if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbReadAlt++; + return MQTT_CODE_CONTINUE; + } + testNbReadAlt = 0; + if (!testSmallerRead) { + if (buf_len > 2) + buf_len /= 2; + testSmallerRead = 1; + } + else { + testSmallerRead = 0; + } + } +#endif + #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); diff --git a/examples/mqttsimple/mqttsimple.c b/examples/mqttsimple/mqttsimple.c index 5fa115b53..814c8f370 100644 --- a/examples/mqttsimple/mqttsimple.c +++ b/examples/mqttsimple/mqttsimple.c @@ -424,8 +424,9 @@ int mqttsimple_test(void) if (rc != MQTT_CODE_SUCCESS) { goto exit; } - PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s", - mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer); + PRINTF("MQTT Publish: Topic %s, ID %d, Qos %d, Message %s", + mqttObj.publish.topic_name, mqttObj.publish.packet_id, + mqttObj.publish.qos, mqttObj.publish.buffer); /* Wait for messages */ while (1) { diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 4114d4549..8a6ee52c9 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -38,11 +38,14 @@ /* Configuration */ /* Number of publish tasks. Each will send a unique message to the broker. */ -#define NUM_PUB_TASKS 10 +#define NUM_PUB_TASKS 5 +#define NUM_PUB_PER_TASK 2 /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif /* Total size of test message to build */ #define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */ @@ -468,7 +471,8 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx) /* check if we are in test mode and done */ wm_SemLock(&mtLock); if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && - mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS + mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && + mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) #ifdef WOLFMQTT_NONBLOCK && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) #endif @@ -555,8 +559,8 @@ static void *waitMessage_task(void *param) MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&mqttCtx->publish); } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } @@ -596,37 +600,51 @@ static DWORD WINAPI publish_task( LPVOID param ) static void *publish_task(void *param) #endif { - int rc; + int rc[NUM_PUB_PER_TASK], i; MQTTCtx *mqttCtx = (MQTTCtx*)param; - MqttPublish publish; - word32 startSec = 0; - - /* Publish Topic */ - XMEMSET(&publish, 0, sizeof(MqttPublish)); - publish.retain = 0; - publish.qos = mqttCtx->qos; - publish.duplicate = 0; - publish.topic_name = mqttCtx->topic_name; - publish.packet_id = mqtt_get_packetid_threadsafe(); - publish.buffer = (byte*)mTestMessage; - publish.total_len = sizeof(mTestMessage); + MqttPublish publish[NUM_PUB_PER_TASK]; + word32 startSec[NUM_PUB_PER_TASK]; + + /* Build publish */ + for (i=0; iqos; + publish[i].duplicate = 0; + publish[i].topic_name = mqttCtx->topic_name; + publish[i].packet_id = mqtt_get_packetid_threadsafe(); + publish[i].buffer = (byte*)mTestMessage; + publish[i].total_len = sizeof(mTestMessage); + + rc[i] = MQTT_CODE_CONTINUE; + startSec[i] = 0; + } - do { - rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH, - mqttCtx->cmd_timeout_ms); - } while (rc == MQTT_CODE_CONTINUE); - if (rc != MQTT_CODE_SUCCESS) { - MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish); + /* Send until != continue */ + for (i=0; iclient, &publish[i], + NULL); + rc[i] = check_response(mqttCtx, rc[i], &startSec[i], + MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms); + } } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - publish.topic_name, - MqttClient_ReturnCodeToString(rc), rc); + /* Report result */ + for (i=0; iclient, (MqttObject*)&publish[i]); + } - wm_SemLock(&mtLock); - mNumMsgsDone++; - wm_SemUnlock(&mtLock); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + publish[i].topic_name, publish[i].packet_id, + MqttClient_ReturnCodeToString(rc[i]), rc[i]); + + wm_SemLock(&mtLock); + mNumMsgsDone++; + wm_SemUnlock(&mtLock); + } THREAD_EXIT(0); } diff --git a/examples/nbclient/nbclient.c b/examples/nbclient/nbclient.c index cf337310d..786c12e70 100644 --- a/examples/nbclient/nbclient.c +++ b/examples/nbclient/nbclient.c @@ -38,7 +38,9 @@ static int mTestDone = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -444,10 +446,12 @@ int mqttclient_test(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; diff --git a/examples/pub-sub/mqtt-pub.c b/examples/pub-sub/mqtt-pub.c index 467fccee5..40c1fc0a3 100644 --- a/examples/pub-sub/mqtt-pub.c +++ b/examples/pub-sub/mqtt-pub.c @@ -30,8 +30,10 @@ /* Configuration */ /* Maximum size for network read/write callbacks. There is also a v5 define that - describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ + * describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -381,10 +383,12 @@ int pub_client(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } if (mqttCtx->debug_on) { - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } if (rc != MQTT_CODE_SUCCESS) { diff --git a/examples/pub-sub/mqtt-sub.c b/examples/pub-sub/mqtt-sub.c index cbace383c..84d791e10 100644 --- a/examples/pub-sub/mqtt-sub.c +++ b/examples/pub-sub/mqtt-sub.c @@ -33,8 +33,10 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. There is also a v5 define that - describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ + * describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 diff --git a/examples/sn-client/sn-client.c b/examples/sn-client/sn-client.c index e60c1f5af..15b5a9bf1 100644 --- a/examples/sn-client/sn-client.c +++ b/examples/sn-client/sn-client.c @@ -36,7 +36,9 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "test" #define SHORT_TOPIC_NAME "s1" @@ -215,11 +217,11 @@ int sn_test(MQTTCtx *mqttCtx) /* Send Connect and wait for Connect Ack */ rc = SN_Client_Connect(&mqttCtx->client, connect); - if (rc != MQTT_CODE_SUCCESS) { - PRINTF("MQTT-SN Connect: %s (%d)", - MqttClient_ReturnCodeToString(rc), rc); - goto disconn; - } + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT-SN Connect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + goto disconn; + } /* Validate Connect Ack info */ PRINTF("....MQTT-SN Connect Ack: Return Code %u", diff --git a/examples/sn-client/sn-client_qos-1.c b/examples/sn-client/sn-client_qos-1.c index cee87110a..ed57b09b2 100644 --- a/examples/sn-client/sn-client_qos-1.c +++ b/examples/sn-client/sn-client_qos-1.c @@ -40,7 +40,9 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "QoS-1 test message" char SHORT_TOPIC_NAME[] = {1}; diff --git a/examples/sn-client/sn-multithread.c b/examples/sn-client/sn-multithread.c index 4e7d934ee..7ca310201 100644 --- a/examples/sn-client/sn-multithread.c +++ b/examples/sn-client/sn-multithread.c @@ -36,7 +36,9 @@ /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "test00" /* Number of publish tasks. Each will send a unique message to the broker. */ #define NUM_PUB_TASKS 10 diff --git a/examples/wiot/wiot.c b/examples/wiot/wiot.c index 9c4e59da8..28a06aa41 100644 --- a/examples/wiot/wiot.c +++ b/examples/wiot/wiot.c @@ -41,7 +41,9 @@ static int mStopRead = 0; static int mTestDone = 0; /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */ +#endif /* Undefine if using an IBM WIOT Platform account that you created. */ #define WIOT_USE_QUICKSTART @@ -263,8 +265,9 @@ int wiot_test(MQTTCtx *mqttCtx) rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -314,8 +317,8 @@ int wiot_test(MQTTCtx *mqttCtx) mqttCtx->publish.buffer = mqttCtx->rx_buf; mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/src/mqtt_client.c b/src/mqtt_client.c index f284b0207..4f1aa7fdd 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -26,6 +26,32 @@ #include "wolfmqtt/mqtt_client.h" +/* DOCUMENTED BUILD OPTIONS: + * + * WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on + * client struct, write and read. When a pending response is needed its added + * to a linked list and if another thread reads the expected response it is + * flagged, so the other thread knows it completed. + * + * WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE, + * which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the + * transport (socket) has no data. + * + * WOLFMQTT_V5: Enables MQTT v5.0 support + * + * WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to + * allow unlock if no data was sent/received. Note the TLS stack typically + * requires an attempt to write to continue with same write, not different. + * By default if we attempt a write we keep the mutex locked and return + * MQTT_CODE_CONTINUE + * + * WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the + * user. Example: wm_SemInit + * + * WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code. + */ + + /* Private functions */ /* forward declarations */ @@ -50,7 +76,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); #elif defined(__MACH__) /* Apple style dispatch semaphore */ - int wm_SemInit(wm_Sem *s){ + int wm_SemInit(wm_Sem *s) { /* dispatch_release() fails hard, with Trace/BPT trap signal, if the * sem's internal count is less than the value passed in with * dispatch_semaphore_create(). work around this by initializing @@ -66,7 +92,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); return 0; } - int wm_SemFree(wm_Sem *s){ + int wm_SemFree(wm_Sem *s) { if ((s == NULL) || (s->sem == NULL)) return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); @@ -168,26 +194,52 @@ static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_MULTITHREAD - #ifdef WOLFMQTT_DEBUG_CLIENT +#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) + #ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isWriteActive) { MQTT_TRACE_MSG("Warning, send already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } - if (rc != 0) { + #endif + #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK + /* detect if a write is already in progress */ + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + if (client->write.isActive) { + MQTT_TRACE_MSG("Partial write in progress!"); + rc = MQTT_CODE_CONTINUE; /* can't write yet */ + } + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } + #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ + if (rc != MQTT_CODE_SUCCESS) { return rc; } - #endif /* WOLFMQTT_DEBUG_CLIENT */ +#endif +#ifdef WOLFMQTT_MULTITHREAD rc = wm_SemLock(&client->lockSend); -#endif /* WOLFMQTT_MULTITHREAD */ - if (rc == 0) { +#endif + if (rc == MQTT_CODE_SUCCESS) { stat->isWriteActive = 1; + + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + client->write.isActive = 1; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } + MQTT_TRACE_MSG("lockSend"); } - (void)client; - return rc; } static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) @@ -199,8 +251,16 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) } #endif - /* reset write */ - XMEMSET(&client->write, 0, sizeof(client->write)); +#ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) +#endif + { + /* reset write */ + XMEMSET(&client->write, 0, sizeof(client->write)); + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } if (stat->isWriteActive) { MQTT_TRACE_MSG("unlockSend"); @@ -215,31 +275,52 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_MULTITHREAD - #ifdef WOLFMQTT_DEBUG_CLIENT +#ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isReadActive) { MQTT_TRACE_MSG("Warning, recv already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } - /* detect if a write is already in progress */ - if (wm_SemLock(&client->lockClient) == 0) { - if (client->read.total > 0) { + /* detect if a read is already in progress */ + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + if (client->read.isActive) { MQTT_TRACE_MSG("Partial read in progress!"); rc = MQTT_CODE_CONTINUE; /* can't read yet */ } + #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockClient); + #endif } if (rc != 0) return rc; - #endif /* WOLFMQTT_DEBUG_CLIENT */ +#endif /* WOLFMQTT_DEBUG_CLIENT */ +#ifdef WOLFMQTT_MULTITHREAD rc = wm_SemLock(&client->lockRecv); -#endif /* WOLFMQTT_MULTITHREAD */ - if (rc == 0) { +#endif + if (rc == MQTT_CODE_SUCCESS) { stat->isReadActive = 1; + + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + /* mark read active */ + client->read.isActive = 1; + + /* reset the packet state used by MqttPacket_Read */ + client->packet.stat = MQTT_PK_BEGIN; + + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } + MQTT_TRACE_MSG("lockRecv"); } - (void)client; + return rc; } static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) @@ -251,12 +332,20 @@ static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) } #endif - /* reset read */ - XMEMSET(&client->read, 0, sizeof(client->read)); +#ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) +#endif + { + /* reset read */ + XMEMSET(&client->read, 0, sizeof(client->read)); + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } if (stat->isReadActive) { - stat->isReadActive = 0; MQTT_TRACE_MSG("unlockRecv"); + stat->isReadActive = 0; #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockRecv); #endif @@ -692,7 +781,8 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf, #ifdef WOLFMQTT_DISCONNECT_CB /* Call disconnect callback with reason code */ if ((packet_obj != NULL) && client->disconnect_cb) { - client->disconnect_cb(client, p_disc->reason_code, client->disconnect_ctx); + client->disconnect_cb(client, p_disc->reason_code, + client->disconnect_ctx); } #endif #else @@ -1070,9 +1160,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, return rc; } - /* reset the packet state used by MqttPacket_Read */ - client->packet.stat = MQTT_PK_BEGIN; - mms_stat->read = MQTT_MSG_WAIT; } FALL_THROUGH; @@ -1087,8 +1174,9 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc <= 0) { #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE && - (client->packet.stat > MQTT_PK_BEGIN || - client->read.total > 0)) { + (client->packet.stat > MQTT_PK_BEGIN || + client->read.total > 0) + ) { /* advance state, since we received some data */ mms_stat->read = MQTT_MSG_HEADER; } @@ -1225,6 +1313,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc >= 0) { rc = MQTT_CODE_SUCCESS; } + else { + /* error, break */ + break; + } #ifdef WOLFMQTT_MULTITHREAD if (pendResp) { @@ -1241,39 +1333,21 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } #endif /* WOLFMQTT_MULTITHREAD */ - /* are we sending ACK or done with message? */ + /* Determine if we are sending ACK or done */ if (MqttIsPubRespPacket(resp.packet_type)) { + /* if we get here, then we are sending an ACK */ mms_stat->read = MQTT_MSG_ACK; - } - else { - mms_stat->read = MQTT_MSG_BEGIN; + mms_stat->ack = MQTT_MSG_WAIT; + + /* setup ACK in shared context */ + XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); + #ifdef WOLFMQTT_V5 + client->packetAck.protocol_level = client->protocol_level; + #endif } /* done reading */ MqttReadStop(client, mms_stat); - - /* if error, leave */ - if (rc != MQTT_CODE_SUCCESS) { - break; - } - - /* if not sending an ACK, we are done */ - if (!MqttIsPubRespPacket(resp.packet_type)) { - break; - } - - /* Flag write active / lock mutex */ - if ((rc = MqttWriteStart(client, mms_stat)) != 0) { - break; - } - - /* setup ACK in shared context */ - XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); - #ifdef WOLFMQTT_V5 - client->packetAck.protocol_level = client->protocol_level; - #endif - - mms_stat->ack = MQTT_MSG_ACK; break; } @@ -1296,10 +1370,19 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, switch (mms_stat->ack) { case MQTT_MSG_BEGIN: - case MQTT_MSG_WAIT: /* wait for read to set ack */ break; + case MQTT_MSG_WAIT: + { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, mms_stat)) != 0) { + break; + } + mms_stat->ack = MQTT_MSG_ACK; + } + FALL_THROUGH; + case MQTT_MSG_ACK: { /* send ack */ @@ -1367,7 +1450,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #endif /* no data read or ack done, then reset state */ - if (mms_stat->read == MQTT_MSG_WAIT || mms_stat->read == MQTT_MSG_ACK) { + if (mms_stat->read == MQTT_MSG_WAIT) { mms_stat->read = MQTT_MSG_BEGIN; } @@ -1580,7 +1663,11 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) /* Send connect packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -1986,9 +2073,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, /* Send publish packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; + } #endif client->write.len = 0; /* reset len, so publish chunk resets */ @@ -2175,7 +2267,11 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) /* Send subscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2276,7 +2372,11 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) /* Send unsubscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2369,7 +2469,11 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) /* Send ping req packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2461,8 +2565,11 @@ int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect) rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK /* if disconnect context avail allow partial write in non-blocking mode */ - if (p_disconnect != NULL && - rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2544,7 +2651,11 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) /* Send authentication packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2670,14 +2781,12 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Read Lock"); #endif - mms_stat->isReadActive = 0; MqttReadStop(client, mms_stat); } if (mms_stat->isWriteActive) { #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Write Lock"); #endif - mms_stat->isWriteActive = 0; MqttWriteStop(client, mms_stat); } diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 1b9fd0008..5f105ab3d 100644 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -1984,7 +1984,9 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len, int i; client->packet.stat = MQTT_PK_READ_HEAD; - for (i = 0; i < MQTT_PACKET_MAX_LEN_BYTES; i++) { + for (i = (client->packet.header_len - MQTT_PACKET_HEADER_MIN_SIZE); + i < MQTT_PACKET_MAX_LEN_BYTES; + i++) { /* Check if another byte is needed */ if ((header->len[i] & MQTT_PACKET_LEN_ENCODE_MASK) == 0) { /* Variable byte length can be determined */ diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index 299577e80..7fa70caf9 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -42,11 +42,6 @@ #undef WOLFMQTT_DEBUG_SOCKET #endif -/* #define WOLFMQTT_TEST_NONBLOCK */ -#ifdef WOLFMQTT_TEST_NONBLOCK - #define WOLFMQTT_TEST_NONBLOCK_TIMES 1 -#endif - /* lwip */ #ifdef WOLFSSL_LWIP #undef read @@ -129,15 +124,6 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, { int rc; -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbWriteAlt = 0; - if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { - testNbWriteAlt++; - return MQTT_CODE_CONTINUE; - } - testNbWriteAlt = 0; -#endif - #ifdef ENABLE_MQTT_TLS if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) { client->tls.timeout_ms = timeout_ms; @@ -170,18 +156,6 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, else #endif /* ENABLE_MQTT_TLS */ { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerWrite = 0; - if (!testSmallerWrite) { - if (buf_len > 1) - buf_len /= 2; - testSmallerWrite = 1; - } - else { - testSmallerWrite = 0; - } - #endif - rc = client->net->write(client->net->context, buf, buf_len, timeout_ms); } @@ -252,15 +226,6 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, { int rc; -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbReadAlt = 0; - if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { - testNbReadAlt++; - return MQTT_CODE_CONTINUE; - } - testNbReadAlt = 0; -#endif - #ifdef ENABLE_MQTT_TLS if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) { client->tls.timeout_ms = timeout_ms; @@ -296,17 +261,6 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, else #endif /* ENABLE_MQTT_TLS */ { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerRead = 0; - if (!testSmallerRead) { - if (buf_len > 1) - buf_len /= 2; - testSmallerRead = 1; - } - else { - testSmallerRead = 0; - } - #endif rc = client->net->read(client->net->context, buf, buf_len, timeout_ms); } diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index e8ad0299c..ddb70afe1 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -115,7 +115,7 @@ enum MqttClientFlags { WOLFMQTT_API word32 MqttClient_Flags(struct _MqttClient *client, word32 mask, word32 flags); typedef enum _MqttPkStat { - MQTT_PK_BEGIN, + MQTT_PK_BEGIN = 0, MQTT_PK_READ_HEAD, MQTT_PK_READ } MqttPkStat; @@ -131,6 +131,9 @@ typedef struct _MqttSk { int pos; /* position inside current buffer */ int len; /* length of current segment being sent */ int total; /* number bytes sent or received */ + + /* status bit for if client read or write is active */ + byte isActive:1; } MqttSk; #ifdef WOLFMQTT_DISCONNECT_CB