From a3b42da914a5f8243a30e12eedf0d97d55b3d333 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Tue, 24 Sep 2024 20:41:19 +0800 Subject: [PATCH 01/10] sync remote restund/tree/turn_thread. --- modules/turn/alloc.c | 24 ++++++++++++- modules/turn/chan.c | 14 ++++++-- modules/turn/perm.c | 10 ++++-- modules/turn/turn.c | 86 ++++++++++++++++++++++++++++++++++++++++++-- modules/turn/turn.h | 7 +++- 5 files changed, 132 insertions(+), 9 deletions(-) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index 03bd813..daa59fc 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -52,6 +52,10 @@ static void destructor(void *arg) { struct allocation *al = arg; + mtx_lock(&turndp()->mutex); + list_unlink(&al->le_map); + mtx_unlock(&turndp()->mutex); + hash_flush(al->perms); mem_deref(al->perms); mem_deref(al->chans); @@ -60,8 +64,11 @@ static void destructor(void *arg) tmr_cancel(&al->tmr); mem_deref(al->username); mem_deref(al->cli_sock); + + /* @TODO check fd deref cleanup on turn worker thread */ mem_deref(al->rel_us); mem_deref(al->rsv_us); + turndp()->allocc_cur--; } @@ -90,13 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg) } } + mtx_lock(&al->mutex); perm = perm_find(al->perms, src); + mtx_unlock(&al->mutex); if (!perm) { ++al->dropc_rx; return; } + mtx_lock(&al->mutex); chan = chan_peer_find(al->chans, src); + mtx_unlock(&al->mutex); if (chan) { uint16_t len = mbuf_get_left(mb); size_t start; @@ -185,6 +196,14 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, break; } + /* Release fd for new thread and re_map*/ + udp_thread_detach(al->rel_us); + udp_thread_detach(al->rsv_us); + + mtx_lock(&turndp()->mutex); + list_append(&turndp()->re_map, &al->le_map, al); + mtx_unlock(&turndp()->mutex); + return (i == PORT_TRY_MAX) ? EADDRINUSE : err; } @@ -247,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, goto reply; } - restund_debug("turn: allocation already exists (%J)\n", src); + restund_warning("turn: allocation already exists (%J)\n", src); ++turnd->reply.scode_437; rerr = stun_ereply(proto, sock, src, 0, msg, 437, "Allocation TID Mismatch", @@ -351,6 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, al->cli_addr = *src; al->srv_addr = *dst; al->proto = proto; + mtx_init(&al->mutex, mtx_plain); sa_init(&al->rsv_addr, AF_UNSPEC); turndp()->allocc_tot++; turndp()->allocc_cur++; @@ -466,7 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al, lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0; lifetime = MIN(lifetime, turnd->lifetime_max); + mtx_lock(&al->mutex); tmr_start(&al->tmr, lifetime * 1000, timeout, al); + mtx_unlock(&al->mutex); restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime); diff --git a/modules/turn/chan.c b/modules/turn/chan.c index b8850a2..20c3b00 100644 --- a/modules/turn/chan.c +++ b/modules/turn/chan.c @@ -27,7 +27,7 @@ struct chan { struct le he_numb; struct le he_peer; struct sa peer; - const struct allocation *al; + struct allocation *al; time_t expires; uint16_t numb; }; @@ -50,8 +50,10 @@ static void destructor(void *arg) restund_debug("turn: allocation %p channel 0x%x %J destroyed\n", chan->al, chan->numb, &chan->peer); + mtx_lock(&chan->al->mutex); hash_unlink(&chan->he_numb); hash_unlink(&chan->he_peer); + mtx_unlock(&chan->al->mutex); } @@ -185,17 +187,19 @@ void chan_status(const struct chanlist *cl, struct mbuf *mb) static struct chan *chan_create(struct chanlist *cl, uint16_t numb, const struct sa *peer, - const struct allocation *al) + struct allocation *al) { struct chan *chan; - if (!cl || !peer) + if (!cl || !peer || !al) return NULL; chan = mem_zalloc(sizeof(*chan), destructor); if (!chan) return NULL; + mtx_lock(&al->mutex); + hash_append(cl->ht_numb, numb, &chan->he_numb, chan); hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan); @@ -204,6 +208,8 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb, chan->al = al; chan->expires = time(NULL) + CHAN_LIFETIME; + mtx_unlock(&al->mutex); + restund_debug("turn: allocation %p channel 0x%x %J created\n", chan->al, chan->numb, &chan->peer); @@ -216,7 +222,9 @@ static void chan_refresh(struct chan *chan) if (!chan) return; + mtx_lock(&chan->al->mutex); chan->expires = time(NULL) + CHAN_LIFETIME; + mtx_unlock(&chan->al->mutex); restund_debug("turn: allocation %p channel 0x%x %J refreshed\n", chan->al, chan->numb, &chan->peer); diff --git a/modules/turn/perm.c b/modules/turn/perm.c index fb2e72f..2ebf55e 100644 --- a/modules/turn/perm.c +++ b/modules/turn/perm.c @@ -19,7 +19,7 @@ struct perm { struct le he; struct sa peer; struct restund_trafstat ts; - const struct allocation *al; + struct allocation *al; time_t expires; time_t start; bool new; @@ -38,7 +38,9 @@ static void destructor(void *arg) struct perm *perm = arg; int err; + mtx_lock(&perm->al->mutex); hash_unlink(&perm->he); + mtx_unlock(&perm->al->mutex); restund_debug("turn: allocation %p permission %j destroyed " "(%llu/%llu %llu/%llu)\n", @@ -90,7 +92,7 @@ struct perm *perm_find(const struct hash *ht, const struct sa *peer) struct perm *perm_create(struct hash *ht, const struct sa *peer, - const struct allocation *al) + struct allocation *al) { const time_t now = time(NULL); struct perm *perm; @@ -102,6 +104,8 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer, if (!perm) return NULL; + mtx_lock(&al->mutex); + hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm); perm->peer = *peer; @@ -109,6 +113,8 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer, perm->expires = now + PERM_LIFETIME; perm->start = now; + mtx_unlock(&al->mutex); + restund_debug("turn: allocation %p permission %j created\n", al, peer); return perm; diff --git a/modules/turn/turn.c b/modules/turn/turn.c index ae7fb46..0e0d9b1 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -11,7 +11,8 @@ enum { - ALLOC_DEFAULT_BSIZE = 512, + ALLOC_DEFAULT_BSIZE = 1024, + TURN_THREADS = 4 }; @@ -23,7 +24,8 @@ struct tuple { static struct turnd turnd; - +static struct tmr timers[TURN_THREADS]; +static thrd_t tid[TURN_THREADS]; struct turnd *turndp(void) { @@ -335,6 +337,60 @@ static struct restund_cmdsub cmd_turnreply = { }; +static void tmr_handler(void *arg) +{ + struct tmr *tmr = arg; + struct le *le; + + mtx_lock(&turndp()->mutex); + if (!turndp()->run) + re_cancel(); + + /* Reassign one allocation by time */ + LIST_FOREACH(&turndp()->re_map, le) + { + struct allocation *al = le->data; + mtx_lock(&al->mutex); + udp_thread_attach(al->rel_us); + udp_thread_attach(al->rsv_us); + mtx_unlock(&al->mutex); + } + list_clear(&turndp()->re_map); + + mtx_unlock(&turndp()->mutex); + + tmr_start(tmr, 10, tmr_handler, tmr); +} + + +static int thread_handler(void *arg) +{ + struct tmr *tmr = arg; + int err; + + err = re_thread_init(); + if (err) { + restund_error("turn: re_thread_init failed %m\n", err); + return 0; + } + + fd_setsize(-1); + + tmr_start(tmr, 10, tmr_handler, tmr); + + err = re_main(NULL); + if (err) + restund_error("turn: re_main failed %m\n", err); + + tmr_cancel(tmr); + + tmr_debug(); + re_thread_close(); + + return 0; +} + + static int module_init(void) { uint32_t x, bsize = ALLOC_DEFAULT_BSIZE; @@ -406,6 +462,24 @@ static int module_init(void) goto out; } + list_init(&turnd.re_map); + + turnd.run = true; + err = mtx_init(&turnd.mutex, mtx_plain); + if (err) { + restund_error("turn: mtx_init err: %d\n", err); + goto out; + } + + for (int i = 0; i < TURN_THREADS; i++) { + err = thrd_create(&tid[i], thread_handler, + &timers[i]); + if (err) { + restund_error("turn: thrd_create err: %m\n", err); + goto out; + } + } + restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n", turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6, bsize); @@ -417,6 +491,14 @@ static int module_init(void) static int module_close(void) { + mtx_lock(&turnd.mutex); + turnd.run = false; + mtx_unlock(&turnd.mutex); + + for (int i = 0; i < TURN_THREADS; i++) { + thrd_join(tid[i], NULL); + } + hash_flush(turnd.ht_alloc); turnd.ht_alloc = mem_deref(turnd.ht_alloc); restund_cmd_unsubscribe(&cmd_turnreply); diff --git a/modules/turn/turn.h b/modules/turn/turn.h index a7f882d..2fc9fc5 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -17,6 +17,9 @@ struct turnd { uint32_t allocc_cur; uint32_t lifetime_max; uint32_t udp_sockbuf_size; + mtx_t mutex; + bool run; + struct list re_map; struct { uint64_t scode_400; @@ -35,6 +38,8 @@ struct chanlist; struct allocation { struct le he; + struct le le_map; + mtx_t mutex; struct tmr tmr; uint8_t tid[STUN_TID_SIZE]; struct sa cli_addr; @@ -73,7 +78,7 @@ struct perm; struct perm *perm_find(const struct hash *ht, const struct sa *addr); struct perm *perm_create(struct hash *ht, const struct sa *peer, - const struct allocation *al); + struct allocation *al); void perm_refresh(struct perm *perm); void perm_tx_stat(struct perm *perm, size_t bytc); void perm_rx_stat(struct perm *perm, size_t bytc); From ac7faf94968fed17557a85d3e12cd80b28259bec Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sat, 19 Oct 2024 00:33:56 +0800 Subject: [PATCH 02/10] turn: check fd deref cleanup on turn worker thread --- modules/turn/alloc.c | 49 +++++++++++++++++++++++++++----------------- modules/turn/turn.c | 29 +++++++++++++++++++++----- modules/turn/turn.h | 12 ++++++++--- 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index daa59fc..75a552a 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -65,9 +65,9 @@ static void destructor(void *arg) mem_deref(al->username); mem_deref(al->cli_sock); - /* @TODO check fd deref cleanup on turn worker thread */ - mem_deref(al->rel_us); - mem_deref(al->rsv_us); + mtx_lock(&turndp()->mutex); + list_append(&turndp()->rm_map, &al->uks->le, al->uks); + mtx_unlock(&turndp()->mutex); turndp()->allocc_cur--; } @@ -162,13 +162,13 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, for (i=0; irel_us, rel_addr, udp_recv, al); + err = udp_listen(&al->uks->rel_us, rel_addr, udp_recv, al); if (err) break; - err = udp_local_get(al->rel_us, &al->rel_addr); + err = udp_local_get(al->uks->rel_us, &al->rel_addr); if (err) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); break; } @@ -178,7 +178,7 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, restund_debug("turn: try#%u: %J\n", i, &al->rel_addr); if (sa_port(&al->rel_addr) & 0x1) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); continue; } @@ -188,20 +188,20 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, al->rsv_addr = al->rel_addr; sa_set_port(&al->rsv_addr, sa_port(&al->rel_addr) + 1); - err = udp_listen(&al->rsv_us, &al->rsv_addr, NULL, NULL); + err = udp_listen(&al->uks->rsv_us, &al->rsv_addr, NULL, NULL); if (err) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); continue; } break; } /* Release fd for new thread and re_map*/ - udp_thread_detach(al->rel_us); - udp_thread_detach(al->rsv_us); + udp_thread_detach(al->uks->rel_us); + udp_thread_detach(al->uks->rsv_us); mtx_lock(&turndp()->mutex); - list_append(&turndp()->re_map, &al->le_map, al); + list_append(&turndp()->re_map, &al->uks->le, al); mtx_unlock(&turndp()->mutex); return (i == PORT_TRY_MAX) ? EADDRINUSE : err; @@ -233,9 +233,9 @@ static int rsvt_listen(const struct hash *ht, struct allocation *al, if (!alr) return ENOENT; - al->rel_us = alr->rsv_us; - udp_handler_set(al->rel_us, udp_recv, al); - alr->rsv_us = NULL; + al->uks->rel_us = alr->uks->rsv_us; + udp_handler_set(al->uks->rel_us, udp_recv, al); + alr->uks->rsv_us = NULL; al->rel_addr = alr->rsv_addr; sa_init(&alr->rsv_addr, AF_UNSPEC); @@ -360,6 +360,17 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, STUN_ATTR_SOFTWARE, restund_software); goto out; } + + al->uks = mem_zalloc(sizeof(struct udp_socks), NULL); + if (!al->uks) { + restund_warning("turn: no memory for allocation udp socks\n"); + ++turnd->reply.scode_500; + rerr = stun_ereply(proto, sock, src, 0, msg, + 500, "Server Error", + ctx->key, ctx->keylen, ctx->fp, 1, + STUN_ATTR_SOFTWARE, restund_software); + goto out; + } hash_append(turnd->ht_alloc, sa_hash(src, SA_ALL), &al->he, al); tmr_start(&al->tmr, lifetime * 1000, timeout, al); @@ -416,9 +427,9 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, goto out; } - udp_rxbuf_presz_set(al->rel_us, 4); + udp_rxbuf_presz_set(al->uks->rel_us, 4); if (turndp()->udp_sockbuf_size > 0) - (void)udp_sockbuf_set(al->rel_us, turndp()->udp_sockbuf_size); + (void)udp_sockbuf_set(al->uks->rel_us, turndp()->udp_sockbuf_size); restund_debug("turn: allocation %p created %s/%J/%J - %J (%us)\n", al, stun_transp_name(al->proto), &al->cli_addr, @@ -427,7 +438,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, alx = al; reply: - if (alx->rsv_us) { + if (alx->uks->rsv_us) { rsv = (uint64_t)sa_hash(src, SA_ALL) << 32; rsv |= (uint64_t)sa_stunaf(&alx->rsv_addr) << 24; rsv += sa_port(&alx->rsv_addr); @@ -449,7 +460,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, STUN_ATTR_XOR_RELAY_ADDR, public ? &public_addr : &alx->rel_addr, STUN_ATTR_LIFETIME, &lifetime, - STUN_ATTR_RSV_TOKEN, alx->rsv_us ? &rsv : NULL, + STUN_ATTR_RSV_TOKEN, alx->uks->rsv_us ? &rsv : NULL, STUN_ATTR_XOR_MAPPED_ADDR, src, STUN_ATTR_SOFTWARE, restund_software); out: diff --git a/modules/turn/turn.c b/modules/turn/turn.c index 0e0d9b1..8feb48a 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -190,7 +190,7 @@ static bool indication_handler(struct restund_msgctx *ctx, int proto, if (restund_addr_is_blocked(psa)) err = EPERM; else - err = udp_send(al->rel_us, psa, &data->v.data); + err = udp_send(al->uks->rel_us, psa, &data->v.data); if (err) turnd.errc_tx++; else { @@ -243,7 +243,7 @@ static bool raw_handler(int proto, const struct sa *src, if (restund_addr_is_blocked(psa)) err = EPERM; else - err = udp_send(al->rel_us, psa, mb); + err = udp_send(al->uks->rel_us, psa, mb); if (err) turnd.errc_tx++; else { @@ -341,22 +341,41 @@ static void tmr_handler(void *arg) { struct tmr *tmr = arg; struct le *le; + int thrd_id; mtx_lock(&turndp()->mutex); if (!turndp()->run) re_cancel(); + thrd_id = GetThreadId(thrd_current()); + /* Reassign one allocation by time */ LIST_FOREACH(&turndp()->re_map, le) { - struct allocation *al = le->data; + struct allocation *al = list_ledata(le); mtx_lock(&al->mutex); - udp_thread_attach(al->rel_us); - udp_thread_attach(al->rsv_us); + udp_thread_attach(al->uks->rel_us); + udp_thread_attach(al->uks->rsv_us); + al->uks->thrd_id = thrd_id; + mtx_unlock(&al->mutex); } list_clear(&turndp()->re_map); + le = list_head(&turndp()->rm_map); + while (le) + { + struct udp_socks *uks = list_ledata(le); + le = le->next; + if (thrd_id == uks->thrd_id) { + udp_thread_detach(uks->rel_us); + udp_thread_detach(uks->rsv_us); + + list_unlink(&uks->le); + mem_deref(uks); + } + } + mtx_unlock(&turndp()->mutex); tmr_start(tmr, 10, tmr_handler, tmr); diff --git a/modules/turn/turn.h b/modules/turn/turn.h index 2fc9fc5..d0a1a04 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -20,6 +20,7 @@ struct turnd { mtx_t mutex; bool run; struct list re_map; + struct list rm_map; struct { uint64_t scode_400; @@ -36,9 +37,15 @@ struct turnd { struct chanlist; +struct udp_socks{ + struct le le; + int thrd_id; + struct udp_sock *rel_us; + struct udp_sock *rsv_us; +}; + struct allocation { struct le he; - struct le le_map; mtx_t mutex; struct tmr tmr; uint8_t tid[STUN_TID_SIZE]; @@ -47,8 +54,7 @@ struct allocation { struct sa rel_addr; struct sa rsv_addr; void *cli_sock; - struct udp_sock *rel_us; - struct udp_sock *rsv_us; + struct udp_socks *uks; char *username; struct hash *perms; struct chanlist *chans; From 119fd242cdd5aa3d08584cb9817325fce6fcc4e4 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sat, 19 Oct 2024 08:47:23 +0800 Subject: [PATCH 03/10] resolve memory leak --- modules/turn/alloc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index 75a552a..dd8e312 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -53,7 +53,7 @@ static void destructor(void *arg) struct allocation *al = arg; mtx_lock(&turndp()->mutex); - list_unlink(&al->le_map); + list_unlink(&al->uks->le); mtx_unlock(&turndp()->mutex); hash_flush(al->perms); From 8487ac037db3cea69dde1f47d63832092bf4b5c8 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sat, 19 Oct 2024 08:50:57 +0800 Subject: [PATCH 04/10] resolve memory leak --- modules/turn/turn.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/turn/turn.c b/modules/turn/turn.c index 8feb48a..bb04080 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -370,6 +370,8 @@ static void tmr_handler(void *arg) if (thrd_id == uks->thrd_id) { udp_thread_detach(uks->rel_us); udp_thread_detach(uks->rsv_us); + mem_deref(uks->rel_us); + mem_deref(uks->rsv_us); list_unlink(&uks->le); mem_deref(uks); From 227419a9e779a1fc0f0d53bf30c939d90d6c34ce Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sat, 19 Oct 2024 09:15:53 +0800 Subject: [PATCH 05/10] no need to handle before destructor --- modules/turn/alloc.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index dd8e312..f4eb50c 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -52,10 +52,6 @@ static void destructor(void *arg) { struct allocation *al = arg; - mtx_lock(&turndp()->mutex); - list_unlink(&al->uks->le); - mtx_unlock(&turndp()->mutex); - hash_flush(al->perms); mem_deref(al->perms); mem_deref(al->chans); From 03d2ae8dc08ae7d2051344c6dbf13b2420388072 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sat, 19 Oct 2024 11:14:34 +0800 Subject: [PATCH 06/10] turn: change the method used to threads equality --- modules/turn/turn.c | 14 ++++++-------- modules/turn/turn.h | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/modules/turn/turn.c b/modules/turn/turn.c index bb04080..bb1073c 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -341,14 +341,11 @@ static void tmr_handler(void *arg) { struct tmr *tmr = arg; struct le *le; - int thrd_id; mtx_lock(&turndp()->mutex); if (!turndp()->run) re_cancel(); - - thrd_id = GetThreadId(thrd_current()); - + /* Reassign one allocation by time */ LIST_FOREACH(&turndp()->re_map, le) { @@ -356,7 +353,7 @@ static void tmr_handler(void *arg) mtx_lock(&al->mutex); udp_thread_attach(al->uks->rel_us); udp_thread_attach(al->uks->rsv_us); - al->uks->thrd_id = thrd_id; + al->uks->thrd = thrd_current(); mtx_unlock(&al->mutex); } @@ -367,7 +364,8 @@ static void tmr_handler(void *arg) { struct udp_socks *uks = list_ledata(le); le = le->next; - if (thrd_id == uks->thrd_id) { + + if (thrd_equal(uks->thrd, thrd_current())) { udp_thread_detach(uks->rel_us); udp_thread_detach(uks->rsv_us); mem_deref(uks->rel_us); @@ -484,6 +482,7 @@ static int module_init(void) } list_init(&turnd.re_map); + list_init(&turnd.rm_map); turnd.run = true; err = mtx_init(&turnd.mutex, mtx_plain); @@ -493,8 +492,7 @@ static int module_init(void) } for (int i = 0; i < TURN_THREADS; i++) { - err = thrd_create(&tid[i], thread_handler, - &timers[i]); + err = thrd_create(&tid[i], thread_handler, &timers[i]); if (err) { restund_error("turn: thrd_create err: %m\n", err); goto out; diff --git a/modules/turn/turn.h b/modules/turn/turn.h index d0a1a04..b64453f 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -39,7 +39,7 @@ struct chanlist; struct udp_socks{ struct le le; - int thrd_id; + thrd_t *thrd; struct udp_sock *rel_us; struct udp_sock *rsv_us; }; From 6751ea75b4e528caba0dea2c30156d6bc04a5a1c Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Sun, 20 Oct 2024 20:23:55 +0800 Subject: [PATCH 07/10] modify the way to get thread id --- modules/turn/turn.c | 43 +++++++++++++++++++++++++++++++++++++++---- modules/turn/turn.h | 3 ++- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/modules/turn/turn.c b/modules/turn/turn.c index bb1073c..e856caa 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -32,6 +32,23 @@ struct turnd *turndp(void) return &turnd; } +static inline uintptr_t get_thread_id(void) +{ +#if defined(WIN32) + return (uintptr_t)GetCurrentThreadId(); +#elif defined(LINUX) + return (uintptr_t)syscall(SYS_gettid); +#elif defined(HAVE_PTHREAD) +#if defined(DARWIN) || defined(FREEBSD) || defined(OPENBSD) || \ + defined(NETBSD) || defined(DRAGONFLY) + return (uintptr_t)(void *)pthread_self(); +#else + return (uintptr_t)pthread_self(); +#endif +#else + return 0; +#endif +} static bool hash_cmp_handler(struct le *le, void *arg) { @@ -345,7 +362,7 @@ static void tmr_handler(void *arg) mtx_lock(&turndp()->mutex); if (!turndp()->run) re_cancel(); - + /* Reassign one allocation by time */ LIST_FOREACH(&turndp()->re_map, le) { @@ -353,8 +370,7 @@ static void tmr_handler(void *arg) mtx_lock(&al->mutex); udp_thread_attach(al->uks->rel_us); udp_thread_attach(al->uks->rsv_us); - al->uks->thrd = thrd_current(); - + al->uks->thrd_id = get_thread_id(); mtx_unlock(&al->mutex); } list_clear(&turndp()->re_map); @@ -365,7 +381,24 @@ static void tmr_handler(void *arg) struct udp_socks *uks = list_ledata(le); le = le->next; - if (thrd_equal(uks->thrd, thrd_current())) { + uint64_t jif = tmr_jiffies_usec(); + if (0 == turndp()->ts) + { + turndp()->ts = jif; + } + + if ((jif - turndp()->ts) > 1000000) // 1s + { + restund_error("no processing for a long time, check thread has exited."); + + mem_deref(uks->rel_us); + mem_deref(uks->rsv_us); + list_unlink(&uks->le); + mem_deref(uks); + turndp()->ts = 0; + } + + if (uks->thrd_id == get_thread_id()) { udp_thread_detach(uks->rel_us); udp_thread_detach(uks->rsv_us); mem_deref(uks->rel_us); @@ -373,6 +406,7 @@ static void tmr_handler(void *arg) list_unlink(&uks->le); mem_deref(uks); + turndp()->ts = 0; } } @@ -484,6 +518,7 @@ static int module_init(void) list_init(&turnd.re_map); list_init(&turnd.rm_map); + turnd.ts = 0; turnd.run = true; err = mtx_init(&turnd.mutex, mtx_plain); if (err) { diff --git a/modules/turn/turn.h b/modules/turn/turn.h index b64453f..a9a073b 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -21,6 +21,7 @@ struct turnd { bool run; struct list re_map; struct list rm_map; + uint64_t ts; struct { uint64_t scode_400; @@ -39,7 +40,7 @@ struct chanlist; struct udp_socks{ struct le le; - thrd_t *thrd; + uintptr_t thrd_id; struct udp_sock *rel_us; struct udp_sock *rsv_us; }; From be0b7d864d0e03fe47ad1bb7fdcc6fbabe142926 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Mon, 21 Oct 2024 22:01:32 +0800 Subject: [PATCH 08/10] alloc: Timeout handling when the terminal exits abnormally --- modules/turn/alloc.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index f4eb50c..19089a7 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -61,6 +61,7 @@ static void destructor(void *arg) mem_deref(al->username); mem_deref(al->cli_sock); + udp_handler_set(al->uks->rel_us, NULL, NULL); mtx_lock(&turndp()->mutex); list_append(&turndp()->rm_map, &al->uks->le, al->uks); mtx_unlock(&turndp()->mutex); @@ -85,6 +86,9 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg) struct chan *chan; int err; + if (!al) + return; + if (al->proto == IPPROTO_TCP) { if (tcp_conn_txqsz(al->cli_sock) > TCP_MAX_TXQSZ) { From 3419d7efc1c778a3994c289da20e767b5fb99ac5 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Thu, 24 Oct 2024 22:22:18 +0800 Subject: [PATCH 09/10] turn: used thrd_equal --- modules/turn/turn.c | 42 ++++++++++++++++++++---------------------- modules/turn/turn.h | 2 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/modules/turn/turn.c b/modules/turn/turn.c index e856caa..9d52292 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -32,23 +32,6 @@ struct turnd *turndp(void) return &turnd; } -static inline uintptr_t get_thread_id(void) -{ -#if defined(WIN32) - return (uintptr_t)GetCurrentThreadId(); -#elif defined(LINUX) - return (uintptr_t)syscall(SYS_gettid); -#elif defined(HAVE_PTHREAD) -#if defined(DARWIN) || defined(FREEBSD) || defined(OPENBSD) || \ - defined(NETBSD) || defined(DRAGONFLY) - return (uintptr_t)(void *)pthread_self(); -#else - return (uintptr_t)pthread_self(); -#endif -#else - return 0; -#endif -} static bool hash_cmp_handler(struct le *le, void *arg) { @@ -361,21 +344,34 @@ static void tmr_handler(void *arg) mtx_lock(&turndp()->mutex); if (!turndp()->run) + { re_cancel(); + goto out; + } + + struct list *re_map = &turndp()->re_map; + struct list *rm_map = &turndp()->rm_map; + if (!list_isempty(re_map) && (!list_isempty(rm_map))) + { + goto out; + } + + thrd_t thrd = thrd_current(); /* Reassign one allocation by time */ - LIST_FOREACH(&turndp()->re_map, le) + LIST_FOREACH(re_map, le) { struct allocation *al = list_ledata(le); mtx_lock(&al->mutex); udp_thread_attach(al->uks->rel_us); udp_thread_attach(al->uks->rsv_us); - al->uks->thrd_id = get_thread_id(); + al->uks->thrd = thrd; + mtx_unlock(&al->mutex); } - list_clear(&turndp()->re_map); + list_clear(re_map); - le = list_head(&turndp()->rm_map); + le = list_head(rm_map); while (le) { struct udp_socks *uks = list_ledata(le); @@ -394,11 +390,12 @@ static void tmr_handler(void *arg) mem_deref(uks->rel_us); mem_deref(uks->rsv_us); list_unlink(&uks->le); + mem_deref(uks); turndp()->ts = 0; } - if (uks->thrd_id == get_thread_id()) { + if (thrd_equal(uks->thrd, thrd)) { udp_thread_detach(uks->rel_us); udp_thread_detach(uks->rsv_us); mem_deref(uks->rel_us); @@ -410,6 +407,7 @@ static void tmr_handler(void *arg) } } +out: mtx_unlock(&turndp()->mutex); tmr_start(tmr, 10, tmr_handler, tmr); diff --git a/modules/turn/turn.h b/modules/turn/turn.h index a9a073b..02b6210 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -40,7 +40,7 @@ struct chanlist; struct udp_socks{ struct le le; - uintptr_t thrd_id; + thrd_t thrd; struct udp_sock *rel_us; struct udp_sock *rsv_us; }; From c4623ab6edb0becaf4db78eb5bfd050612b18336 Mon Sep 17 00:00:00 2001 From: jobo-zt <75535552@qq.com> Date: Tue, 29 Oct 2024 22:19:43 +0800 Subject: [PATCH 10/10] alloc: Clean up the map before destructor --- modules/turn/alloc.c | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index 19089a7..07794ff 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -63,6 +63,7 @@ static void destructor(void *arg) udp_handler_set(al->uks->rel_us, NULL, NULL); mtx_lock(&turndp()->mutex); + list_unlink(&al->uks->le); list_append(&turndp()->rm_map, &al->uks->le, al->uks); mtx_unlock(&turndp()->mutex);