Skip to content

Commit

Permalink
Use counting semaphore condvar implementation in FreeRTOS-Plus-TCP port
Browse files Browse the repository at this point in the history
Co-Authored-By: Alexander Bushnev <[email protected]>
  • Loading branch information
bjsowa and sashacmc committed Dec 7, 2024
1 parent fa8b1ee commit a5e66e8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 80 deletions.
10 changes: 9 additions & 1 deletion include/zenoh-pico/system/platform/freertos_plus_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ typedef struct {
} _z_task_t;

typedef SemaphoreHandle_t _z_mutex_t;
typedef void *_z_condvar_t;
typedef struct {
SemaphoreHandle_t mutex;
SemaphoreHandle_t sem;
int waiters;
#if (configSUPPORT_STATIC_ALLOCATION == 1)
StaticSemaphore_t mutex_buffer;
StaticSemaphore_t sem_buffer;
#endif /* SUPPORT_STATIC_ALLOCATION */
} _z_condvar_t;
#endif // Z_MULTI_THREAD == 1

typedef TickType_t z_clock_t;
Expand Down
128 changes: 49 additions & 79 deletions src/system/freertos_plus_tcp/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,111 +157,81 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { return xSemaphoreTakeRecursive(*m,
z_result_t _z_mutex_unlock(_z_mutex_t *m) { return xSemaphoreGiveRecursive(*m) == pdTRUE ? 0 : -1; }

/*------------------ CondVar ------------------*/
typedef struct waiter_t {
SemaphoreHandle_t sem;
struct waiter_t *prev;
struct waiter_t *next;
bool in_list;
} waiter_t;

typedef struct condvar_t {
waiter_t *wait_list;
} condvar_t;

static void add_wait_list(waiter_t **wait_list, waiter_t *waiter) {
if (*wait_list == NULL) {
*wait_list = waiter;
waiter->next = waiter;
waiter->prev = waiter;
} else {
waiter_t *first = *wait_list;
waiter_t *last = (*wait_list)->prev;

waiter->next = first;
waiter->prev = last;

first->prev = waiter;
last->next = waiter;
}
waiter->in_list = true;
}

static void remove_wait_list(waiter_t **wait_list, waiter_t *waiter) {
waiter_t *prev = waiter->prev;
waiter_t *next = waiter->next;

prev->next = waiter->next;
next->prev = waiter->prev;
*wait_list = waiter->next;

if (*wait_list == waiter) {
*wait_list = NULL;
z_result_t _z_condvar_init(_z_condvar_t *cv) {
if (!cv) {
return _Z_ERR_GENERIC;
}

waiter->next = NULL;
waiter->prev = NULL;
waiter->in_list = false;
}
#if (configSUPPORT_STATIC_ALLOCATION == 1)
cv->mutex = xSemaphoreCreateRecursiveMutexStatic(&cv->mutex_buffer);
cv->sem = xSemaphoreCreateCountingStatic((UBaseType_t)(~0), 0, &cv->sem_buffer);
#else
cv->mutex = xSemaphoreCreateMutex();
cv->sem = xSemaphoreCreateCounting((UBaseType_t)(~0), 0);
#endif /* SUPPORT_STATIC_ALLOCATION */
cv->waiters = 0;

z_result_t _z_condvar_init(_z_condvar_t *cv) {
*cv = (condvar_t *)z_malloc(sizeof(condvar_t));
if (*cv == NULL) {
return -1;
if (!cv->mutex || !cv->sem) {
return _Z_ERR_GENERIC;
}
((condvar_t *)*cv)->wait_list = NULL;
return 0;
return _Z_RES_OK;
}

z_result_t _z_condvar_drop(_z_condvar_t *cv) {
z_free(*cv);
return 0;
if (!cv) {
return _Z_ERR_GENERIC;
}
vSemaphoreDelete(cv->sem);
vSemaphoreDelete(cv->mutex);
return _Z_RES_OK;
}

z_result_t _z_condvar_signal(_z_condvar_t *cv) {
condvar_t *cond_var = (condvar_t *)*cv;
if (!cv) {
return _Z_ERR_GENERIC;
}

if (cond_var->wait_list != NULL) {
xSemaphoreGive(cond_var->wait_list->sem);
remove_wait_list(&cond_var->wait_list, cond_var->wait_list);
xSemaphoreTake(cv->mutex, portMAX_DELAY);
if (cv->waiters > 0) {
xSemaphoreGive(cv->sem);
cv->waiters--;
}
xSemaphoreGive(cv->mutex);

return 0;
return _Z_RES_OK;
}

z_result_t _z_condvar_signal_all(_z_condvar_t *cv) {
condvar_t *cond_var = (condvar_t *)*cv;
if (!cv) {
return _Z_ERR_GENERIC;
}

while (cond_var->wait_list != NULL) {
xSemaphoreGive(cond_var->wait_list->sem);
remove_wait_list(&cond_var->wait_list, cond_var->wait_list);
xSemaphoreTake(cv->mutex, portMAX_DELAY);
while (cv->waiters > 0) {
xSemaphoreGive(cv->sem);
cv->waiters--;
}
xSemaphoreGive(cv->mutex);

return 0;
return _Z_RES_OK;
}

z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) {
condvar_t *cond_var = (condvar_t *)*cv;
SemaphoreHandle_t mutex = *m;
if (!cv || !m) {
return _Z_ERR_GENERIC;
}

waiter_t current_thread;
#if (configSUPPORT_STATIC_ALLOCATION == 1)
StaticSemaphore_t current_thread_sem_buffer;
current_thread.sem = xSemaphoreCreateCountingStatic(1, 0, &current_thread_sem_buffer);
#else
current_thread.sem = xSemaphoreCreateCounting(1, 0);
#endif
add_wait_list(&cond_var->wait_list, &current_thread);
xSemaphoreTake(cv->mutex, portMAX_DELAY);
cv->waiters++;
xSemaphoreGive(cv->mutex);

xSemaphoreGiveRecursive(mutex);
xSemaphoreTake(current_thread.sem, portMAX_DELAY);
xSemaphoreTakeRecursive(mutex, portMAX_DELAY);
_z_mutex_unlock(m);

if (current_thread.in_list) {
remove_wait_list(&cond_var->wait_list, &current_thread);
}
xSemaphoreTake(cv->sem, portMAX_DELAY);

vSemaphoreDelete(current_thread.sem);
return 0;
_z_mutex_lock(m);

return _Z_RES_OK;
}
#endif // Z_MULTI_THREAD == 1

Expand Down

0 comments on commit a5e66e8

Please sign in to comment.