From e753456bd104c2b5957bf5a450bb3ebe9726165e Mon Sep 17 00:00:00 2001 From: Eric Blankenhorn Date: Tue, 19 Dec 2023 12:55:34 -0600 Subject: [PATCH] Med fixes --- examples/mqttnet.c | 3 +- examples/mqttport.h | 2 +- examples/mqttsimple/mqttsimple.c | 8 +- examples/multithread/multithread.c | 125 ++++++++++++++------------- examples/sn-client/sn-client_qos-1.c | 5 -- examples/sn-client/sn-multithread.c | 17 ++-- src/mqtt_client.c | 47 +++++----- 7 files changed, 113 insertions(+), 94 deletions(-) diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 3f811b74f..4532409dc 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -704,7 +704,8 @@ static int NetWrite(void *context, const byte* buf, int buf_len, #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); - setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv)); + (void)setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, + sizeof(tv)); #endif rc = (int)SOCK_SEND(sock->fd, buf, buf_len, 0); diff --git a/examples/mqttport.h b/examples/mqttport.h index afec2263d..1061901b3 100644 --- a/examples/mqttport.h +++ b/examples/mqttport.h @@ -270,7 +270,7 @@ extern "C" { #ifndef GET_SOCK_ERROR #define GET_SOCK_ERROR(f,s,o,e) \ socklen_t len = sizeof(so_error); \ - getsockopt((f), (s), (o), &(e), &len) + (void)getsockopt((f), (s), (o), &(e), &len) #endif #ifndef SOCK_EQ_ERROR #define SOCK_EQ_ERROR(e) (((e) == EWOULDBLOCK) || ((e) == EAGAIN)) diff --git a/examples/mqttsimple/mqttsimple.c b/examples/mqttsimple/mqttsimple.c index 5b2089ba0..f508512d0 100644 --- a/examples/mqttsimple/mqttsimple.c +++ b/examples/mqttsimple/mqttsimple.c @@ -134,7 +134,7 @@ static int socket_get_error(int sockFd) { int so_error = 0; socklen_t len = sizeof(so_error); - getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len); + (void)getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len); return so_error; } @@ -221,7 +221,8 @@ static int mqtt_net_read(void *context, byte* buf, int buf_len, int timeout_ms) /* Setup timeout */ setup_timeout(&tv, timeout_ms); - setsockopt(*pSockFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)); + (void)setsockopt(*pSockFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, + sizeof(tv)); /* Loop until buf_len has been read, error or timeout */ while (bytes < buf_len) { @@ -256,7 +257,8 @@ static int mqtt_net_write(void *context, const byte* buf, int buf_len, /* Setup timeout */ setup_timeout(&tv, timeout_ms); - setsockopt(*pSockFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv)); + (void)setsockopt(*pSockFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, + sizeof(tv)); rc = (int)send(*pSockFd, buf, buf_len, 0); if (rc < 0) { diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 8a6ee52c9..7a9b02d42 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -82,27 +82,30 @@ static MQTTCtx gMqttCtx; static word16 mqtt_get_packetid_threadsafe(void) { - word16 packet_id; - wm_SemLock(&mtLock); - packet_id = mqtt_get_packetid(); - wm_SemUnlock(&mtLock); + word16 packet_id = 0; + if (wm_SemLock(&mtLock) == 0) { + packet_id = mqtt_get_packetid(); + wm_SemUnlock(&mtLock); + } return packet_id; } static void mqtt_stop_set(void) { - wm_SemLock(&mtLock); - PRINTF("MQTT Stopping"); - mStopRead = 1; - wm_SemUnlock(&mtLock); + if (wm_SemLock(&mtLock) == 0) { + PRINTF("MQTT Stopping"); + mStopRead = 1; + wm_SemUnlock(&mtLock); + } } static int mqtt_stop_get(void) { - int rc; - wm_SemLock(&mtLock); - rc = mStopRead; - wm_SemUnlock(&mtLock); + int rc = 0; + if (wm_SemLock(&mtLock) == 0) { + rc = mStopRead; + wm_SemUnlock(&mtLock); + } return rc; } @@ -164,47 +167,48 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, MQTTCtx* mqttCtx = (MQTTCtx*)client->ctx; (void)mqttCtx; - wm_SemLock(&mtLock); - if (msg_new) { - /* Determine min size to dump */ - len = msg->topic_name_len; + if (wm_SemLock(&mtLock) == 0) { + if (msg_new) { + /* Determine min size to dump */ + len = msg->topic_name_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->topic_name, len); + buf[len] = '\0'; /* Make sure its null terminated */ + + /* Print incoming message */ + PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", + buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, + msg->buffer_pos); + } + + /* Print message payload */ + len = msg->buffer_len; if (len > PRINT_BUFFER_SIZE) { len = PRINT_BUFFER_SIZE; } - XMEMCPY(buf, msg->topic_name, len); + XMEMCPY(buf, msg->buffer, len); buf[len] = '\0'; /* Make sure its null terminated */ + PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s", + msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); - /* Print incoming message */ - PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", - buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos); - } - - /* Print message payload */ - len = msg->buffer_len; - if (len > PRINT_BUFFER_SIZE) { - len = PRINT_BUFFER_SIZE; - } - XMEMCPY(buf, msg->buffer, len); - buf[len] = '\0'; /* Make sure its null terminated */ - PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s", - msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); - - if (msg_done) { - /* for test mode: count the number of messages received */ - if (mqttCtx->test_mode) { - if (msg->buffer_pos + msg->buffer_len == - (word32)sizeof(mTestMessage) && - XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, - msg->buffer_len) == 0) - { - mNumMsgsRecvd++; + if (msg_done) { + /* for test mode: count the number of messages received */ + if (mqttCtx->test_mode) { + if (msg->buffer_pos + msg->buffer_len == + (word32)sizeof(mTestMessage) && + XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, + msg->buffer_len) == 0) + { + mNumMsgsRecvd++; + } } - } - PRINTF("MQTT Message: Done"); + PRINTF("MQTT Message: Done"); + } + wm_SemUnlock(&mtLock); } - wm_SemUnlock(&mtLock); - /* Return negative to terminate publish processing */ return MQTT_CODE_SUCCESS; } @@ -266,7 +270,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx) wm_SemFree(&mtLock); client_exit(mqttCtx); } - wm_SemLock(&pingSignal); /* default to locked */ + if (wm_SemLock(&pingSignal) != 0) { /* default to locked */ + client_exit(mqttCtx); + } PRINTF("MQTT Client: QoS %d, Use TLS %d", mqttCtx->qos, mqttCtx->use_tls); @@ -469,19 +475,20 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx) { int isDone = 0; /* check if we are in test mode and done */ - wm_SemLock(&mtLock); - if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && - mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && - mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) - #ifdef WOLFMQTT_NONBLOCK - && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) - #endif - ) { + if (wm_SemLock(&mtLock) == 0) { + if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && + mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && + mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) + #ifdef WOLFMQTT_NONBLOCK + && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) + #endif + ) { + wm_SemUnlock(&mtLock); + mqtt_stop_set(); + isDone = 1; /* done */ + } wm_SemUnlock(&mtLock); - mqtt_stop_set(); - isDone = 1; /* done */ } - wm_SemUnlock(&mtLock); return isDone; } @@ -661,7 +668,9 @@ static void *ping_task(void *param) word32 startSec; do { - wm_SemLock(&pingSignal); + if (wm_SemLock(&pingSignal) != 0) { + break; + } if (mqtt_stop_get()) { break; } diff --git a/examples/sn-client/sn-client_qos-1.c b/examples/sn-client/sn-client_qos-1.c index ed57b09b2..cb89db29e 100644 --- a/examples/sn-client/sn-client_qos-1.c +++ b/examples/sn-client/sn-client_qos-1.c @@ -111,11 +111,6 @@ int sn_testQoSn1(MQTTCtx *mqttCtx) } } - /* Check for error */ - if (rc != MQTT_CODE_SUCCESS) { - goto disconn; - } - disconn: rc = MqttClient_NetDisconnect(&mqttCtx->client); diff --git a/examples/sn-client/sn-multithread.c b/examples/sn-client/sn-multithread.c index 7ca310201..84348d494 100644 --- a/examples/sn-client/sn-multithread.c +++ b/examples/sn-client/sn-multithread.c @@ -79,10 +79,11 @@ static MQTTCtx gMqttCtx; static word16 mqtt_get_packetid_threadsafe(void) { - word16 packet_id; - wm_SemLock(&packetIdLock); - packet_id = mqtt_get_packetid(); - wm_SemUnlock(&packetIdLock); + word16 packet_id = 0; + if (wm_SemLock(&packetIdLock) == 0) { + packet_id = mqtt_get_packetid(); + wm_SemUnlock(&packetIdLock); + } return packet_id; } @@ -243,7 +244,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx) wm_SemFree(&packetIdLock); client_exit(mqttCtx); } - wm_SemLock(&pingSignal); /* default to locked */ + if (wm_SemLock(&pingSignal) != 0) { /* default to locked */ + client_exit(mqttCtx); + } PRINTF("MQTT-SN Client: QoS %d, Use TLS %d", mqttCtx->qos, mqttCtx->use_tls); @@ -518,7 +521,9 @@ static void *ping_task(void *param) XMEMSET(&ping, 0, sizeof(ping)); do { - wm_SemLock(&pingSignal); + if (wm_SemLock(&pingSignal) != 0) + break; + if (mStopRead) break; diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 4f1aa7fdd..f20753fdf 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -2844,6 +2844,7 @@ int MqttClient_NetDisconnect(MqttClient *client) { #ifdef WOLFMQTT_MULTITHREAD MqttPendResp *tmpResp; + int rc; #endif if (client == NULL) { @@ -2852,24 +2853,28 @@ int MqttClient_NetDisconnect(MqttClient *client) #ifdef WOLFMQTT_MULTITHREAD /* Get client lock on to ensure no other threads are active */ - wm_SemLock(&client->lockClient); - -#ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("Net Disconnect: Removing pending responses"); -#endif - for (tmpResp = client->firstPendResp; - tmpResp != NULL; - tmpResp = tmpResp->next) { + rc = wm_SemLock(&client->lockClient); + if (rc == 0) { #ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", - tmpResp, tmpResp->packet_obj, - MqttPacket_TypeDesc(tmpResp->packet_type), - tmpResp->packet_type, tmpResp->packet_id, - tmpResp->packetProcessing, tmpResp->packetDone); + PRINTF("Net Disconnect: Removing pending responses"); #endif - MqttClient_RespList_Remove(client, tmpResp); + for (tmpResp = client->firstPendResp; + tmpResp != NULL; + tmpResp = tmpResp->next) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", + tmpResp, tmpResp->packet_obj, + MqttPacket_TypeDesc(tmpResp->packet_type), + tmpResp->packet_type, tmpResp->packet_id, + tmpResp->packetProcessing, tmpResp->packetDone); + #endif + MqttClient_RespList_Remove(client, tmpResp); + } + wm_SemUnlock(&client->lockClient); + } + else { + return rc; } - wm_SemUnlock(&client->lockClient); #endif return MqttSocket_Disconnect(client); @@ -3029,14 +3034,16 @@ word32 MqttClient_Flags(MqttClient *client, word32 mask, word32 flags) if (client != NULL) { #ifdef WOLFMQTT_MULTITHREAD /* Get client lock on to ensure no other threads are active */ - wm_SemLock(&client->lockClient); + if (wm_SemLock(&client->lockClient) == 0) #endif - client->flags &= ~mask; - client->flags |= flags; - ret = client->flags; + { + client->flags &= ~mask; + client->flags |= flags; + ret = client->flags; #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockClient); + wm_SemUnlock(&client->lockClient); #endif + } } return ret; }