Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

turn: check fd deref cleanup on turn worker thread #8

Open
wants to merge 11 commits into
base: turn_thread
Choose a base branch
from
49 changes: 30 additions & 19 deletions modules/turn/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ static void destructor(void *arg)
mem_deref(al->username);
mem_deref(al->cli_sock);

/* @TODO check fd deref cleanup on turn worker thread */
mem_deref(al->rel_us);
mem_deref(al->rsv_us);
mtx_lock(&turndp()->mutex);
list_append(&turndp()->rm_map, &al->uks->le, al->uks);
mtx_unlock(&turndp()->mutex);

turndp()->allocc_cur--;
}
Expand Down Expand Up @@ -162,13 +162,13 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,

for (i=0; i<PORT_TRY_MAX; i++) {

err = udp_listen(&al->rel_us, rel_addr, udp_recv, al);
err = udp_listen(&al->uks->rel_us, rel_addr, udp_recv, al);
if (err)
break;

err = udp_local_get(al->rel_us, &al->rel_addr);
err = udp_local_get(al->uks->rel_us, &al->rel_addr);
if (err) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
break;
}

Expand All @@ -178,7 +178,7 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
restund_debug("turn: try#%u: %J\n", i, &al->rel_addr);

if (sa_port(&al->rel_addr) & 0x1) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
continue;
}

Expand All @@ -188,20 +188,20 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
al->rsv_addr = al->rel_addr;
sa_set_port(&al->rsv_addr, sa_port(&al->rel_addr) + 1);

err = udp_listen(&al->rsv_us, &al->rsv_addr, NULL, NULL);
err = udp_listen(&al->uks->rsv_us, &al->rsv_addr, NULL, NULL);
if (err) {
al->rel_us = mem_deref(al->rel_us);
al->uks->rel_us = mem_deref(al->uks->rel_us);
continue;
}
break;
}

/* Release fd for new thread and re_map*/
udp_thread_detach(al->rel_us);
udp_thread_detach(al->rsv_us);
udp_thread_detach(al->uks->rel_us);
udp_thread_detach(al->uks->rsv_us);

mtx_lock(&turndp()->mutex);
list_append(&turndp()->re_map, &al->le_map, al);
list_append(&turndp()->re_map, &al->uks->le, al);
mtx_unlock(&turndp()->mutex);

return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
Expand Down Expand Up @@ -233,9 +233,9 @@ static int rsvt_listen(const struct hash *ht, struct allocation *al,
if (!alr)
return ENOENT;

al->rel_us = alr->rsv_us;
udp_handler_set(al->rel_us, udp_recv, al);
alr->rsv_us = NULL;
al->uks->rel_us = alr->uks->rsv_us;
udp_handler_set(al->uks->rel_us, udp_recv, al);
alr->uks->rsv_us = NULL;
al->rel_addr = alr->rsv_addr;
sa_init(&alr->rsv_addr, AF_UNSPEC);

Expand Down Expand Up @@ -360,6 +360,17 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
STUN_ATTR_SOFTWARE, restund_software);
goto out;
}

al->uks = mem_zalloc(sizeof(struct udp_socks), NULL);
if (!al->uks) {
restund_warning("turn: no memory for allocation udp socks\n");
++turnd->reply.scode_500;
rerr = stun_ereply(proto, sock, src, 0, msg,
500, "Server Error",
ctx->key, ctx->keylen, ctx->fp, 1,
STUN_ATTR_SOFTWARE, restund_software);
goto out;
}

hash_append(turnd->ht_alloc, sa_hash(src, SA_ALL), &al->he, al);
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
Expand Down Expand Up @@ -416,9 +427,9 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
goto out;
}

udp_rxbuf_presz_set(al->rel_us, 4);
udp_rxbuf_presz_set(al->uks->rel_us, 4);
if (turndp()->udp_sockbuf_size > 0)
(void)udp_sockbuf_set(al->rel_us, turndp()->udp_sockbuf_size);
(void)udp_sockbuf_set(al->uks->rel_us, turndp()->udp_sockbuf_size);

restund_debug("turn: allocation %p created %s/%J/%J - %J (%us)\n",
al, stun_transp_name(al->proto), &al->cli_addr,
Expand All @@ -427,7 +438,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
alx = al;

reply:
if (alx->rsv_us) {
if (alx->uks->rsv_us) {
rsv = (uint64_t)sa_hash(src, SA_ALL) << 32;
rsv |= (uint64_t)sa_stunaf(&alx->rsv_addr) << 24;
rsv += sa_port(&alx->rsv_addr);
Expand All @@ -449,7 +460,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
STUN_ATTR_XOR_RELAY_ADDR,
public ? &public_addr : &alx->rel_addr,
STUN_ATTR_LIFETIME, &lifetime,
STUN_ATTR_RSV_TOKEN, alx->rsv_us ? &rsv : NULL,
STUN_ATTR_RSV_TOKEN, alx->uks->rsv_us ? &rsv : NULL,
STUN_ATTR_XOR_MAPPED_ADDR, src,
STUN_ATTR_SOFTWARE, restund_software);
out:
Expand Down
29 changes: 24 additions & 5 deletions modules/turn/turn.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static bool indication_handler(struct restund_msgctx *ctx, int proto,
if (restund_addr_is_blocked(psa))
err = EPERM;
else
err = udp_send(al->rel_us, psa, &data->v.data);
err = udp_send(al->uks->rel_us, psa, &data->v.data);
if (err)
turnd.errc_tx++;
else {
Expand Down Expand Up @@ -243,7 +243,7 @@ static bool raw_handler(int proto, const struct sa *src,
if (restund_addr_is_blocked(psa))
err = EPERM;
else
err = udp_send(al->rel_us, psa, mb);
err = udp_send(al->uks->rel_us, psa, mb);
if (err)
turnd.errc_tx++;
else {
Expand Down Expand Up @@ -341,22 +341,41 @@ static void tmr_handler(void *arg)
{
struct tmr *tmr = arg;
struct le *le;
int thrd_id;

mtx_lock(&turndp()->mutex);
if (!turndp()->run)
re_cancel();

thrd_id = GetThreadId(thrd_current());
jobo-zt marked this conversation as resolved.
Show resolved Hide resolved

/* Reassign one allocation by time */
LIST_FOREACH(&turndp()->re_map, le)
{
struct allocation *al = le->data;
struct allocation *al = list_ledata(le);
mtx_lock(&al->mutex);
udp_thread_attach(al->rel_us);
udp_thread_attach(al->rsv_us);
udp_thread_attach(al->uks->rel_us);
udp_thread_attach(al->uks->rsv_us);
al->uks->thrd_id = thrd_id;

mtx_unlock(&al->mutex);
}
list_clear(&turndp()->re_map);

le = list_head(&turndp()->rm_map);
while (le)
{
struct udp_socks *uks = list_ledata(le);
le = le->next;
if (thrd_id == uks->thrd_id) {
udp_thread_detach(uks->rel_us);
udp_thread_detach(uks->rsv_us);
jobo-zt marked this conversation as resolved.
Show resolved Hide resolved

list_unlink(&uks->le);
mem_deref(uks);
}
}

mtx_unlock(&turndp()->mutex);

tmr_start(tmr, 10, tmr_handler, tmr);
Expand Down
12 changes: 9 additions & 3 deletions modules/turn/turn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct turnd {
mtx_t mutex;
bool run;
struct list re_map;
struct list rm_map;

struct {
uint64_t scode_400;
Expand All @@ -36,9 +37,15 @@ struct turnd {

struct chanlist;

struct udp_socks{
struct le le;
int thrd_id;
struct udp_sock *rel_us;
struct udp_sock *rsv_us;
};

struct allocation {
struct le he;
struct le le_map;
mtx_t mutex;
struct tmr tmr;
uint8_t tid[STUN_TID_SIZE];
Expand All @@ -47,8 +54,7 @@ struct allocation {
struct sa rel_addr;
struct sa rsv_addr;
void *cli_sock;
struct udp_sock *rel_us;
struct udp_sock *rsv_us;
struct udp_socks *uks;
char *username;
struct hash *perms;
struct chanlist *chans;
Expand Down