Skip to content

Commit

Permalink
Med fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
embhorn committed Dec 19, 2023
1 parent 906e574 commit e753456
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 94 deletions.
3 changes: 2 additions & 1 deletion examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,8 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv));
(void)setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
sizeof(tv));
#endif

rc = (int)SOCK_SEND(sock->fd, buf, buf_len, 0);
Expand Down
2 changes: 1 addition & 1 deletion examples/mqttport.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ extern "C" {
#ifndef GET_SOCK_ERROR
#define GET_SOCK_ERROR(f,s,o,e) \
socklen_t len = sizeof(so_error); \
getsockopt((f), (s), (o), &(e), &len)
(void)getsockopt((f), (s), (o), &(e), &len)
#endif
#ifndef SOCK_EQ_ERROR
#define SOCK_EQ_ERROR(e) (((e) == EWOULDBLOCK) || ((e) == EAGAIN))
Expand Down
8 changes: 5 additions & 3 deletions examples/mqttsimple/mqttsimple.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static int socket_get_error(int sockFd)
{
int so_error = 0;
socklen_t len = sizeof(so_error);
getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len);
(void)getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len);
return so_error;
}

Expand Down Expand Up @@ -221,7 +221,8 @@ static int mqtt_net_read(void *context, byte* buf, int buf_len, int timeout_ms)

/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(*pSockFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv));
(void)setsockopt(*pSockFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,
sizeof(tv));

/* Loop until buf_len has been read, error or timeout */
while (bytes < buf_len) {
Expand Down Expand Up @@ -256,7 +257,8 @@ static int mqtt_net_write(void *context, const byte* buf, int buf_len,

/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(*pSockFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv));
(void)setsockopt(*pSockFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
sizeof(tv));

rc = (int)send(*pSockFd, buf, buf_len, 0);
if (rc < 0) {
Expand Down
125 changes: 67 additions & 58 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,30 @@ static MQTTCtx gMqttCtx;

static word16 mqtt_get_packetid_threadsafe(void)
{
word16 packet_id;
wm_SemLock(&mtLock);
packet_id = mqtt_get_packetid();
wm_SemUnlock(&mtLock);
word16 packet_id = 0;
if (wm_SemLock(&mtLock) == 0) {
packet_id = mqtt_get_packetid();
wm_SemUnlock(&mtLock);
}
return packet_id;
}

static void mqtt_stop_set(void)
{
wm_SemLock(&mtLock);
PRINTF("MQTT Stopping");
mStopRead = 1;
wm_SemUnlock(&mtLock);
if (wm_SemLock(&mtLock) == 0) {
PRINTF("MQTT Stopping");
mStopRead = 1;
wm_SemUnlock(&mtLock);
}
}

static int mqtt_stop_get(void)
{
int rc;
wm_SemLock(&mtLock);
rc = mStopRead;
wm_SemUnlock(&mtLock);
int rc = 0;
if (wm_SemLock(&mtLock) == 0) {
rc = mStopRead;
wm_SemUnlock(&mtLock);
}
return rc;
}

Expand Down Expand Up @@ -164,47 +167,48 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
MQTTCtx* mqttCtx = (MQTTCtx*)client->ctx;
(void)mqttCtx;

wm_SemLock(&mtLock);
if (msg_new) {
/* Determine min size to dump */
len = msg->topic_name_len;
if (wm_SemLock(&mtLock) == 0) {
if (msg_new) {
/* Determine min size to dump */
len = msg->topic_name_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
buf[len] = '\0'; /* Make sure its null terminated */

/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len,
msg->buffer_pos);
}

/* Print message payload */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0'; /* Make sure its null terminated */
PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s",
msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf);

/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos);
}

/* Print message payload */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0'; /* Make sure its null terminated */
PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s",
msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf);

if (msg_done) {
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (msg->buffer_pos + msg->buffer_len ==
(word32)sizeof(mTestMessage) &&
XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer,
msg->buffer_len) == 0)
{
mNumMsgsRecvd++;
if (msg_done) {
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (msg->buffer_pos + msg->buffer_len ==
(word32)sizeof(mTestMessage) &&
XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer,
msg->buffer_len) == 0)
{
mNumMsgsRecvd++;
}
}
}

PRINTF("MQTT Message: Done");
PRINTF("MQTT Message: Done");
}
wm_SemUnlock(&mtLock);
}
wm_SemUnlock(&mtLock);

/* Return negative to terminate publish processing */
return MQTT_CODE_SUCCESS;
}
Expand Down Expand Up @@ -266,7 +270,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx)
wm_SemFree(&mtLock);
client_exit(mqttCtx);
}
wm_SemLock(&pingSignal); /* default to locked */
if (wm_SemLock(&pingSignal) != 0) { /* default to locked */
client_exit(mqttCtx);
}

PRINTF("MQTT Client: QoS %d, Use TLS %d", mqttCtx->qos,
mqttCtx->use_tls);
Expand Down Expand Up @@ -469,19 +475,20 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
{
int isDone = 0;
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
) {
if (wm_SemLock(&mtLock) == 0) {
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
) {
wm_SemUnlock(&mtLock);
mqtt_stop_set();
isDone = 1; /* done */
}
wm_SemUnlock(&mtLock);
mqtt_stop_set();
isDone = 1; /* done */
}
wm_SemUnlock(&mtLock);
return isDone;
}

Expand Down Expand Up @@ -661,7 +668,9 @@ static void *ping_task(void *param)
word32 startSec;

do {
wm_SemLock(&pingSignal);
if (wm_SemLock(&pingSignal) != 0) {
break;
}
if (mqtt_stop_get()) {
break;
}
Expand Down
5 changes: 0 additions & 5 deletions examples/sn-client/sn-client_qos-1.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ int sn_testQoSn1(MQTTCtx *mqttCtx)
}
}

/* Check for error */
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}

disconn:

rc = MqttClient_NetDisconnect(&mqttCtx->client);
Expand Down
17 changes: 11 additions & 6 deletions examples/sn-client/sn-multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ static MQTTCtx gMqttCtx;

static word16 mqtt_get_packetid_threadsafe(void)
{
word16 packet_id;
wm_SemLock(&packetIdLock);
packet_id = mqtt_get_packetid();
wm_SemUnlock(&packetIdLock);
word16 packet_id = 0;
if (wm_SemLock(&packetIdLock) == 0) {
packet_id = mqtt_get_packetid();
wm_SemUnlock(&packetIdLock);
}
return packet_id;
}

Expand Down Expand Up @@ -243,7 +244,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx)
wm_SemFree(&packetIdLock);
client_exit(mqttCtx);
}
wm_SemLock(&pingSignal); /* default to locked */
if (wm_SemLock(&pingSignal) != 0) { /* default to locked */
client_exit(mqttCtx);
}

PRINTF("MQTT-SN Client: QoS %d, Use TLS %d", mqttCtx->qos,
mqttCtx->use_tls);
Expand Down Expand Up @@ -518,7 +521,9 @@ static void *ping_task(void *param)
XMEMSET(&ping, 0, sizeof(ping));

do {
wm_SemLock(&pingSignal);
if (wm_SemLock(&pingSignal) != 0)
break;

if (mStopRead)
break;

Expand Down
47 changes: 27 additions & 20 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2844,6 +2844,7 @@ int MqttClient_NetDisconnect(MqttClient *client)
{
#ifdef WOLFMQTT_MULTITHREAD
MqttPendResp *tmpResp;
int rc;
#endif

if (client == NULL) {
Expand All @@ -2852,24 +2853,28 @@ int MqttClient_NetDisconnect(MqttClient *client)

#ifdef WOLFMQTT_MULTITHREAD
/* Get client lock on to ensure no other threads are active */
wm_SemLock(&client->lockClient);

#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Net Disconnect: Removing pending responses");
#endif
for (tmpResp = client->firstPendResp;
tmpResp != NULL;
tmpResp = tmpResp->next) {
rc = wm_SemLock(&client->lockClient);
if (rc == 0) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
tmpResp, tmpResp->packet_obj,
MqttPacket_TypeDesc(tmpResp->packet_type),
tmpResp->packet_type, tmpResp->packet_id,
tmpResp->packetProcessing, tmpResp->packetDone);
PRINTF("Net Disconnect: Removing pending responses");
#endif
MqttClient_RespList_Remove(client, tmpResp);
for (tmpResp = client->firstPendResp;
tmpResp != NULL;
tmpResp = tmpResp->next) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
tmpResp, tmpResp->packet_obj,
MqttPacket_TypeDesc(tmpResp->packet_type),
tmpResp->packet_type, tmpResp->packet_id,
tmpResp->packetProcessing, tmpResp->packetDone);
#endif
MqttClient_RespList_Remove(client, tmpResp);
}
wm_SemUnlock(&client->lockClient);
}
else {
return rc;
}
wm_SemUnlock(&client->lockClient);
#endif

return MqttSocket_Disconnect(client);
Expand Down Expand Up @@ -3029,14 +3034,16 @@ word32 MqttClient_Flags(MqttClient *client, word32 mask, word32 flags)
if (client != NULL) {
#ifdef WOLFMQTT_MULTITHREAD
/* Get client lock on to ensure no other threads are active */
wm_SemLock(&client->lockClient);
if (wm_SemLock(&client->lockClient) == 0)
#endif
client->flags &= ~mask;
client->flags |= flags;
ret = client->flags;
{
client->flags &= ~mask;
client->flags |= flags;
ret = client->flags;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
wm_SemUnlock(&client->lockClient);
#endif
}
}
return ret;
}

0 comments on commit e753456

Please sign in to comment.