Skip to content

Commit

Permalink
Improve logic for detection of active read or write. Add test case fo…
Browse files Browse the repository at this point in the history
…r trying to send multiple publishes in the same thread.
  • Loading branch information
dgarske committed Nov 27, 2023
1 parent 6fdcee6 commit 0336347
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 59 deletions.
72 changes: 44 additions & 28 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
/* Configuration */

/* Number of publish tasks. Each will send a unique message to the broker. */
#define NUM_PUB_TASKS 10
#define NUM_PUB_TASKS 5
#define NUM_PUB_PER_TASK 2

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
Expand Down Expand Up @@ -470,7 +471,8 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
Expand Down Expand Up @@ -598,37 +600,51 @@ static DWORD WINAPI publish_task( LPVOID param )
static void *publish_task(void *param)
#endif
{
int rc;
int rc[NUM_PUB_PER_TASK], i;
MQTTCtx *mqttCtx = (MQTTCtx*)param;
MqttPublish publish;
word32 startSec = 0;

/* Publish Topic */
XMEMSET(&publish, 0, sizeof(MqttPublish));
publish.retain = 0;
publish.qos = mqttCtx->qos;
publish.duplicate = 0;
publish.topic_name = mqttCtx->topic_name;
publish.packet_id = mqtt_get_packetid_threadsafe();
publish.buffer = (byte*)mTestMessage;
publish.total_len = sizeof(mTestMessage);
MqttPublish publish[NUM_PUB_PER_TASK];
word32 startSec[NUM_PUB_PER_TASK];

/* Build publish */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
/* Publish Topic */
XMEMSET(&publish[i], 0, sizeof(MqttPublish));
publish[i].retain = 0;
publish[i].qos = mqttCtx->qos;
publish[i].duplicate = 0;
publish[i].topic_name = mqttCtx->topic_name;
publish[i].packet_id = mqtt_get_packetid_threadsafe();
publish[i].buffer = (byte*)mTestMessage;
publish[i].total_len = sizeof(mTestMessage);

rc[i] = MQTT_CODE_CONTINUE;
startSec[i] = 0;
}

do {
rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL);
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH,
mqttCtx->cmd_timeout_ms);
} while (rc == MQTT_CODE_CONTINUE);
if (rc != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish);
/* Send until != continue */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
while (rc[i] == MQTT_CODE_CONTINUE) {
rc[i] = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish[i],
NULL);
rc[i] = check_response(mqttCtx, rc[i], &startSec[i],
MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms);
}
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
publish.topic_name,
MqttClient_ReturnCodeToString(rc), rc);
/* Report result */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
if (rc[i] != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish[i]);
}

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
publish[i].topic_name,
MqttClient_ReturnCodeToString(rc[i]), rc[i]);

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
}

THREAD_EXIT(0);
}
Expand Down
106 changes: 75 additions & 31 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg);
#elif defined(__MACH__)

/* Apple style dispatch semaphore */
int wm_SemInit(wm_Sem *s){
int wm_SemInit(wm_Sem *s) {
/* dispatch_release() fails hard, with Trace/BPT trap signal, if the
* sem's internal count is less than the value passed in with
* dispatch_semaphore_create(). work around this by initializing
Expand All @@ -92,7 +92,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg);

return 0;
}
int wm_SemFree(wm_Sem *s){
int wm_SemFree(wm_Sem *s) {
if ((s == NULL) ||
(s->sem == NULL))
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
Expand Down Expand Up @@ -194,38 +194,52 @@ static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat)
{
int rc = MQTT_CODE_SUCCESS;

#ifdef WOLFMQTT_MULTITHREAD
#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK)
#ifdef WOLFMQTT_DEBUG_CLIENT
#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK)
#ifdef WOLFMQTT_DEBUG_CLIENT
if (stat->isWriteActive) {
MQTT_TRACE_MSG("Warning, send already locked!");
rc = MQTT_CODE_ERROR_SYSTEM;
}
#endif /* WOLFMQTT_DEBUG_CLIENT */
#ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK
#endif
#ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK
/* detect if a write is already in progress */
if (wm_SemLock(&client->lockClient) == 0) {
if (client->write.total > 0) {
#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
if (client->write.isActive) {
MQTT_TRACE_MSG("Partial write in progress!");
rc = MQTT_CODE_CONTINUE; /* can't write yet */
}
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}
#endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */
if (rc != 0) {
#endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */
if (rc != MQTT_CODE_SUCCESS) {
return rc;
}
#endif
#endif

#ifdef WOLFMQTT_MULTITHREAD
rc = wm_SemLock(&client->lockSend);
#endif /* WOLFMQTT_MULTITHREAD */
if (rc == 0) {
#endif
if (rc == MQTT_CODE_SUCCESS) {
stat->isWriteActive = 1;

#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
client->write.isActive = 1;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}

MQTT_TRACE_MSG("lockSend");
}

(void)client;

return rc;
}
static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat)
Expand All @@ -237,8 +251,16 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat)
}
#endif

/* reset write */
XMEMSET(&client->write, 0, sizeof(client->write));
#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
/* reset write */
XMEMSET(&client->write, 0, sizeof(client->write));
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}

if (stat->isWriteActive) {
MQTT_TRACE_MSG("unlockSend");
Expand All @@ -253,31 +275,47 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
{
int rc = MQTT_CODE_SUCCESS;

#ifdef WOLFMQTT_MULTITHREAD
#ifdef WOLFMQTT_DEBUG_CLIENT
#ifdef WOLFMQTT_DEBUG_CLIENT
if (stat->isReadActive) {
MQTT_TRACE_MSG("Warning, recv already locked!");
rc = MQTT_CODE_ERROR_SYSTEM;
}
/* detect if a read is already in progress */
if (wm_SemLock(&client->lockClient) == 0) {
if (client->read.total > 0) {
#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
if (client->read.isActive) {
MQTT_TRACE_MSG("Partial read in progress!");
rc = MQTT_CODE_CONTINUE; /* can't read yet */
}
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}
if (rc != 0)
return rc;
#endif /* WOLFMQTT_DEBUG_CLIENT */
#endif /* WOLFMQTT_DEBUG_CLIENT */

#ifdef WOLFMQTT_MULTITHREAD
rc = wm_SemLock(&client->lockRecv);
#endif /* WOLFMQTT_MULTITHREAD */
if (rc == 0) {
#endif
if (rc == MQTT_CODE_SUCCESS) {
stat->isReadActive = 1;

#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
client->read.isActive = 1;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}

MQTT_TRACE_MSG("lockRecv");
}
(void)client;

return rc;
}
static void MqttReadStop(MqttClient* client, MqttMsgStat* stat)
Expand All @@ -289,12 +327,20 @@ static void MqttReadStop(MqttClient* client, MqttMsgStat* stat)
}
#endif

/* reset read */
XMEMSET(&client->read, 0, sizeof(client->read));
#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
/* reset read */
XMEMSET(&client->read, 0, sizeof(client->read));
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}

if (stat->isReadActive) {
stat->isReadActive = 0;
MQTT_TRACE_MSG("unlockRecv");
stat->isReadActive = 0;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockRecv);
#endif
Expand Down Expand Up @@ -2738,14 +2784,12 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg)
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Cancel Read Lock");
#endif
mms_stat->isReadActive = 0;
MqttReadStop(client, mms_stat);
}
if (mms_stat->isWriteActive) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Cancel Write Lock");
#endif
mms_stat->isWriteActive = 0;
MqttWriteStop(client, mms_stat);
}

Expand Down
3 changes: 3 additions & 0 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ typedef struct _MqttSk {
int pos; /* position inside current buffer */
int len; /* length of current segment being sent */
int total; /* number bytes sent or received */

/* status bit for if client read or write is active */
byte isActive:1;
} MqttSk;

#ifdef WOLFMQTT_DISCONNECT_CB
Expand Down

0 comments on commit 0336347

Please sign in to comment.