diff --git a/modules/turn/alloc.c b/modules/turn/alloc.c index daa59fc..07794ff 100644 --- a/modules/turn/alloc.c +++ b/modules/turn/alloc.c @@ -52,10 +52,6 @@ static void destructor(void *arg) { struct allocation *al = arg; - mtx_lock(&turndp()->mutex); - list_unlink(&al->le_map); - mtx_unlock(&turndp()->mutex); - hash_flush(al->perms); mem_deref(al->perms); mem_deref(al->chans); @@ -65,9 +61,11 @@ static void destructor(void *arg) mem_deref(al->username); mem_deref(al->cli_sock); - /* @TODO check fd deref cleanup on turn worker thread */ - mem_deref(al->rel_us); - mem_deref(al->rsv_us); + udp_handler_set(al->uks->rel_us, NULL, NULL); + mtx_lock(&turndp()->mutex); + list_unlink(&al->uks->le); + list_append(&turndp()->rm_map, &al->uks->le, al->uks); + mtx_unlock(&turndp()->mutex); turndp()->allocc_cur--; } @@ -89,6 +87,9 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg) struct chan *chan; int err; + if (!al) + return; + if (al->proto == IPPROTO_TCP) { if (tcp_conn_txqsz(al->cli_sock) > TCP_MAX_TXQSZ) { @@ -162,13 +163,13 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, for (i=0; irel_us, rel_addr, udp_recv, al); + err = udp_listen(&al->uks->rel_us, rel_addr, udp_recv, al); if (err) break; - err = udp_local_get(al->rel_us, &al->rel_addr); + err = udp_local_get(al->uks->rel_us, &al->rel_addr); if (err) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); break; } @@ -178,7 +179,7 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, restund_debug("turn: try#%u: %J\n", i, &al->rel_addr); if (sa_port(&al->rel_addr) & 0x1) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); continue; } @@ -188,20 +189,20 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al, al->rsv_addr = al->rel_addr; sa_set_port(&al->rsv_addr, sa_port(&al->rel_addr) + 1); - err = udp_listen(&al->rsv_us, &al->rsv_addr, NULL, NULL); + err = udp_listen(&al->uks->rsv_us, &al->rsv_addr, NULL, NULL); if (err) { - al->rel_us = mem_deref(al->rel_us); + al->uks->rel_us = mem_deref(al->uks->rel_us); continue; } break; } /* Release fd for new thread and re_map*/ - udp_thread_detach(al->rel_us); - udp_thread_detach(al->rsv_us); + udp_thread_detach(al->uks->rel_us); + udp_thread_detach(al->uks->rsv_us); mtx_lock(&turndp()->mutex); - list_append(&turndp()->re_map, &al->le_map, al); + list_append(&turndp()->re_map, &al->uks->le, al); mtx_unlock(&turndp()->mutex); return (i == PORT_TRY_MAX) ? EADDRINUSE : err; @@ -233,9 +234,9 @@ static int rsvt_listen(const struct hash *ht, struct allocation *al, if (!alr) return ENOENT; - al->rel_us = alr->rsv_us; - udp_handler_set(al->rel_us, udp_recv, al); - alr->rsv_us = NULL; + al->uks->rel_us = alr->uks->rsv_us; + udp_handler_set(al->uks->rel_us, udp_recv, al); + alr->uks->rsv_us = NULL; al->rel_addr = alr->rsv_addr; sa_init(&alr->rsv_addr, AF_UNSPEC); @@ -360,6 +361,17 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, STUN_ATTR_SOFTWARE, restund_software); goto out; } + + al->uks = mem_zalloc(sizeof(struct udp_socks), NULL); + if (!al->uks) { + restund_warning("turn: no memory for allocation udp socks\n"); + ++turnd->reply.scode_500; + rerr = stun_ereply(proto, sock, src, 0, msg, + 500, "Server Error", + ctx->key, ctx->keylen, ctx->fp, 1, + STUN_ATTR_SOFTWARE, restund_software); + goto out; + } hash_append(turnd->ht_alloc, sa_hash(src, SA_ALL), &al->he, al); tmr_start(&al->tmr, lifetime * 1000, timeout, al); @@ -416,9 +428,9 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, goto out; } - udp_rxbuf_presz_set(al->rel_us, 4); + udp_rxbuf_presz_set(al->uks->rel_us, 4); if (turndp()->udp_sockbuf_size > 0) - (void)udp_sockbuf_set(al->rel_us, turndp()->udp_sockbuf_size); + (void)udp_sockbuf_set(al->uks->rel_us, turndp()->udp_sockbuf_size); restund_debug("turn: allocation %p created %s/%J/%J - %J (%us)\n", al, stun_transp_name(al->proto), &al->cli_addr, @@ -427,7 +439,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, alx = al; reply: - if (alx->rsv_us) { + if (alx->uks->rsv_us) { rsv = (uint64_t)sa_hash(src, SA_ALL) << 32; rsv |= (uint64_t)sa_stunaf(&alx->rsv_addr) << 24; rsv += sa_port(&alx->rsv_addr); @@ -449,7 +461,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx, STUN_ATTR_XOR_RELAY_ADDR, public ? &public_addr : &alx->rel_addr, STUN_ATTR_LIFETIME, &lifetime, - STUN_ATTR_RSV_TOKEN, alx->rsv_us ? &rsv : NULL, + STUN_ATTR_RSV_TOKEN, alx->uks->rsv_us ? &rsv : NULL, STUN_ATTR_XOR_MAPPED_ADDR, src, STUN_ATTR_SOFTWARE, restund_software); out: diff --git a/modules/turn/turn.c b/modules/turn/turn.c index 0e0d9b1..9d52292 100644 --- a/modules/turn/turn.c +++ b/modules/turn/turn.c @@ -190,7 +190,7 @@ static bool indication_handler(struct restund_msgctx *ctx, int proto, if (restund_addr_is_blocked(psa)) err = EPERM; else - err = udp_send(al->rel_us, psa, &data->v.data); + err = udp_send(al->uks->rel_us, psa, &data->v.data); if (err) turnd.errc_tx++; else { @@ -243,7 +243,7 @@ static bool raw_handler(int proto, const struct sa *src, if (restund_addr_is_blocked(psa)) err = EPERM; else - err = udp_send(al->rel_us, psa, mb); + err = udp_send(al->uks->rel_us, psa, mb); if (err) turnd.errc_tx++; else { @@ -344,19 +344,70 @@ static void tmr_handler(void *arg) mtx_lock(&turndp()->mutex); if (!turndp()->run) + { re_cancel(); + goto out; + } + + struct list *re_map = &turndp()->re_map; + struct list *rm_map = &turndp()->rm_map; + if (!list_isempty(re_map) && (!list_isempty(rm_map))) + { + goto out; + } + + thrd_t thrd = thrd_current(); /* Reassign one allocation by time */ - LIST_FOREACH(&turndp()->re_map, le) + LIST_FOREACH(re_map, le) { - struct allocation *al = le->data; + struct allocation *al = list_ledata(le); mtx_lock(&al->mutex); - udp_thread_attach(al->rel_us); - udp_thread_attach(al->rsv_us); + udp_thread_attach(al->uks->rel_us); + udp_thread_attach(al->uks->rsv_us); + al->uks->thrd = thrd; + mtx_unlock(&al->mutex); } - list_clear(&turndp()->re_map); + list_clear(re_map); + + le = list_head(rm_map); + while (le) + { + struct udp_socks *uks = list_ledata(le); + le = le->next; + + uint64_t jif = tmr_jiffies_usec(); + if (0 == turndp()->ts) + { + turndp()->ts = jif; + } + + if ((jif - turndp()->ts) > 1000000) // 1s + { + restund_error("no processing for a long time, check thread has exited."); + + mem_deref(uks->rel_us); + mem_deref(uks->rsv_us); + list_unlink(&uks->le); + + mem_deref(uks); + turndp()->ts = 0; + } + + if (thrd_equal(uks->thrd, thrd)) { + udp_thread_detach(uks->rel_us); + udp_thread_detach(uks->rsv_us); + mem_deref(uks->rel_us); + mem_deref(uks->rsv_us); + + list_unlink(&uks->le); + mem_deref(uks); + turndp()->ts = 0; + } + } +out: mtx_unlock(&turndp()->mutex); tmr_start(tmr, 10, tmr_handler, tmr); @@ -463,7 +514,9 @@ static int module_init(void) } list_init(&turnd.re_map); + list_init(&turnd.rm_map); + turnd.ts = 0; turnd.run = true; err = mtx_init(&turnd.mutex, mtx_plain); if (err) { @@ -472,8 +525,7 @@ static int module_init(void) } for (int i = 0; i < TURN_THREADS; i++) { - err = thrd_create(&tid[i], thread_handler, - &timers[i]); + err = thrd_create(&tid[i], thread_handler, &timers[i]); if (err) { restund_error("turn: thrd_create err: %m\n", err); goto out; diff --git a/modules/turn/turn.h b/modules/turn/turn.h index 2fc9fc5..02b6210 100644 --- a/modules/turn/turn.h +++ b/modules/turn/turn.h @@ -20,6 +20,8 @@ struct turnd { mtx_t mutex; bool run; struct list re_map; + struct list rm_map; + uint64_t ts; struct { uint64_t scode_400; @@ -36,9 +38,15 @@ struct turnd { struct chanlist; +struct udp_socks{ + struct le le; + thrd_t thrd; + struct udp_sock *rel_us; + struct udp_sock *rsv_us; +}; + struct allocation { struct le he; - struct le le_map; mtx_t mutex; struct tmr tmr; uint8_t tid[STUN_TID_SIZE]; @@ -47,8 +55,7 @@ struct allocation { struct sa rel_addr; struct sa rsv_addr; void *cli_sock; - struct udp_sock *rel_us; - struct udp_sock *rsv_us; + struct udp_socks *uks; char *username; struct hash *perms; struct chanlist *chans;