From 0c49a2e7214c2001a286e31b897005b4ecd5c18a Mon Sep 17 00:00:00 2001 From: hev Date: Sun, 5 Jan 2025 02:07:49 +0800 Subject: [PATCH] HevSocks5TProxy: Add support for multiple workers. --- README.md | 3 + conf/main.yml | 4 + src/hev-config.c | 47 ++- src/hev-config.h | 2 + src/hev-socks5-tproxy.c | 684 +++++----------------------------------- src/hev-socks5-worker.c | 596 ++++++++++++++++++++++++++++++++++ src/hev-socks5-worker.h | 22 ++ 7 files changed, 759 insertions(+), 599 deletions(-) create mode 100644 src/hev-socks5-worker.c create mode 100644 src/hev-socks5-worker.h diff --git a/README.md b/README.md index 6f3a340..90bb5cb 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,9 @@ ndk-build ### Config ```yaml +main: + workers: 1 + socks5: port: 1080 address: 127.0.0.1 diff --git a/conf/main.yml b/conf/main.yml index f3f06ec..4082679 100644 --- a/conf/main.yml +++ b/conf/main.yml @@ -1,5 +1,9 @@ # Main configuration for hev-socks5-tproxy +main: + # Worker threads + workers: 1 + socks5: # Socks5 server port port: 1080 diff --git a/src/hev-config.c b/src/hev-config.c index 379b433..1cc98b4 100644 --- a/src/hev-config.c +++ b/src/hev-config.c @@ -13,6 +13,7 @@ #include "hev-logger.h" #include "hev-config.h" +static unsigned int workers = 1; static HevConfigServer srv; static char tcp_address[256]; static char tcp_port[8]; @@ -31,6 +32,42 @@ static int read_write_timeout = 60000; static int limit_nofile = 65535; static int log_level = HEV_LOGGER_WARN; +static int +hev_config_parse_main (yaml_document_t *doc, yaml_node_t *base) +{ + yaml_node_pair_t *pair; + + if (!base || YAML_MAPPING_NODE != base->type) + return -1; + + for (pair = base->data.mapping.pairs.start; + pair < base->data.mapping.pairs.top; pair++) { + yaml_node_t *node; + const char *key, *value; + + if (!pair->key || !pair->value) + break; + + node = yaml_document_get_node (doc, pair->key); + if (!node || YAML_SCALAR_NODE != node->type) + break; + key = (const char *)node->data.scalar.value; + + node = yaml_document_get_node (doc, pair->value); + if (!node || YAML_SCALAR_NODE != node->type) + break; + value = (const char *)node->data.scalar.value; + + if (0 == strcmp (key, "workers")) + workers = strtoul (value, NULL, 10); + } + + if (!workers) + workers = 1; + + return 0; +} + static int hev_config_parse_server (yaml_document_t *doc, yaml_node_t *base, const char *sec, HevConfigServer *srv) @@ -315,7 +352,9 @@ hev_config_parse_doc (yaml_document_t *doc) key = (const char *)node->data.scalar.value; node = yaml_document_get_node (doc, pair->value); - if (0 == strcmp (key, "socks5")) + if (0 == strcmp (key, "main")) + res = hev_config_parse_main (doc, node); + else if (0 == strcmp (key, "socks5")) res = hev_config_parse_server (doc, node, key, &srv); else if (0 == strcmp (key, "tcp")) res = hev_config_parse_addr (doc, node, key, tcp_address, tcp_port); @@ -372,6 +411,12 @@ hev_config_fini (void) { } +unsigned int +hev_config_get_workers (void) +{ + return workers; +} + HevConfigServer * hev_config_get_socks5_server (void) { diff --git a/src/hev-config.h b/src/hev-config.h index b2ef101..f95716b 100644 --- a/src/hev-config.h +++ b/src/hev-config.h @@ -26,6 +26,8 @@ struct _HevConfigServer int hev_config_init (const char *path); void hev_config_fini (void); +unsigned int hev_config_get_workers (void); + HevConfigServer *hev_config_get_socks5_server (void); const char *hev_config_get_tcp_address (void); const char *hev_config_get_tcp_port (void); diff --git a/src/hev-socks5-tproxy.c b/src/hev-socks5-tproxy.c index fe95a5f..8732f1f 100644 --- a/src/hev-socks5-tproxy.c +++ b/src/hev-socks5-tproxy.c @@ -7,43 +7,23 @@ ============================================================================ */ -#include -#include -#include #include -#include +#include -#include -#include -#include #include #include -#include "hev-list.h" -#include "hev-utils.h" -#include "hev-rbtree.h" #include "hev-config.h" #include "hev-logger.h" -#include "hev-compiler.h" -#include "hev-config-const.h" #include "hev-tsocks-cache.h" -#include "hev-socks5-session-tcp.h" -#include "hev-socks5-session-udp.h" -#include "hev-tproxy-session-dns.h" +#include "hev-socks5-worker.h" #include "hev-socks5-tproxy.h" -static int quit; -static int fd_event; +static unsigned int workers; -static HevList tcp_set; -static HevList dns_set; -static HevRBTree udp_set; - -static HevTask *task_tcp; -static HevTask *task_udp; -static HevTask *task_dns; -static HevTask *task_event; +static pthread_t *work_threads; +static HevSocks5Worker **worker_list; static void sigint_handler (int signum) @@ -51,644 +31,152 @@ sigint_handler (int signum) hev_socks5_tproxy_stop (); } -static int -task_io_yielder (HevTaskYieldType type, void *data) -{ - hev_task_yield (type); - - return quit ? -1 : 0; -} - -static int -hev_socks5_tproxy_tcp_socket (void) +int +hev_socks5_tproxy_init (void) { - struct sockaddr_in6 saddr; - const char *addr; - const char *port; - int one = 1; int res; - int fd; - - LOG_D ("socks5 tproxy tcp socket"); - addr = hev_config_get_tcp_address (); - port = hev_config_get_tcp_port (); - if (!addr) - goto exit; + LOG_D ("socks5 tproxy init"); - res = resolve_to_sockaddr (addr, port, SOCK_STREAM, &saddr); + res = hev_task_system_init (); if (res < 0) { - LOG_E ("socks5 tproxy tcp addr"); - goto exit; - } - - fd = hev_task_io_socket_socket (AF_INET6, SOCK_STREAM, 0); - if (fd < 0) { - LOG_E ("socks5 tproxy tcp socket"); + LOG_E ("socks5 tproxy task system"); goto exit; } - res = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy tcp socket reuse"); - goto close; - } - - res = setsockopt (fd, SOL_IP, IP_TRANSPARENT, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy tcp ipv4 transparent"); - goto close; - } - - res = setsockopt (fd, SOL_IPV6, IPV6_TRANSPARENT, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy tcp ipv6 transparent"); - goto close; - } - - res = bind (fd, (struct sockaddr *)&saddr, sizeof (saddr)); - if (res < 0) { - LOG_E ("socks5 tproxy tcp socket bind"); - goto close; - } - - res = listen (fd, 100); - if (res < 0) { - LOG_E ("socks5 tproxy tcp socket listen"); - goto close; - } - - return fd; - -close: - close (fd); -exit: - return -1; -} - -static int -hev_socks5_tproxy_udp_socket (const char *addr, const char *port) -{ - struct sockaddr_in6 saddr; - int one = 1; - int res; - int fd; - - LOG_D ("socks5 tproxy udp socket"); - - res = resolve_to_sockaddr (addr, port, SOCK_DGRAM, &saddr); + res = hev_tsocks_cache_init (); if (res < 0) { - LOG_E ("socks5 tproxy udp addr"); + LOG_E ("socks5 tproxy tsocks cache"); goto exit; } - fd = hev_task_io_socket_socket (AF_INET6, SOCK_DGRAM, 0); - if (fd < 0) { - LOG_E ("socks5 tproxy udp socket"); + workers = hev_config_get_workers (); + work_threads = hev_malloc0 (sizeof (pthread_t) * workers); + if (!work_threads) { + LOG_E ("socks5 tproxy work threads"); goto exit; } - res = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy udp socket reuse"); - goto close; - } - - res = setsockopt (fd, SOL_IP, IP_TRANSPARENT, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy udp ipv4 transparent"); - goto close; - } - - res = setsockopt (fd, SOL_IPV6, IPV6_TRANSPARENT, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy udp ipv6 transparent"); - goto close; - } - - res = setsockopt (fd, SOL_IP, IP_RECVORIGDSTADDR, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy udp ipv4 orig dest"); - goto close; - } - - res = setsockopt (fd, SOL_IPV6, IPV6_RECVORIGDSTADDR, &one, sizeof (one)); - if (res < 0) { - LOG_E ("socks5 tproxy udp ipv6 orig dest"); - goto close; + worker_list = hev_malloc0 (sizeof (HevSocks5Worker *) * workers); + if (!worker_list) { + LOG_E ("socks5 tproxy worker list"); + goto exit; } - res = hev_config_get_misc_udp_recv_buffer_size (); - res = setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &res, sizeof (res)); - if (res < 0) - LOG_W ("socks5 tproxy udp socket rcvbuf"); - - res = bind (fd, (struct sockaddr *)&saddr, sizeof (saddr)); - if (res < 0) { - LOG_E ("socks5 tproxy udp socket bind"); - goto close; - } + signal (SIGPIPE, SIG_IGN); + signal (SIGINT, sigint_handler); - return fd; + return 0; -close: - close (fd); exit: + hev_socks5_tproxy_fini (); return -1; } -static void -hev_socks5_event_task_entry (void *data) -{ - eventfd_t val; - - LOG_D ("socks5 event task run"); - - fd_event = eventfd (0, EFD_NONBLOCK); - if (fd_event < 0) { - LOG_E ("socks5 eventfd"); - return; - } - - hev_task_add_fd (hev_task_self (), fd_event, POLLIN); - hev_task_io_read (fd_event, &val, sizeof (val), NULL, NULL); - - quit = 1; - - if (task_tcp) - hev_task_wakeup (task_tcp); - if (task_udp) - hev_task_wakeup (task_udp); - if (task_dns) - hev_task_wakeup (task_dns); - - close (fd_event); - fd_event = -1; - task_event = NULL; -} - -static void -hev_socks5_tcp_session_task_entry (void *data) +void +hev_socks5_tproxy_fini (void) { - HevSocks5SessionTCP *tcp = data; - - hev_tproxy_session_run (HEV_TPROXY_SESSION (tcp)); + LOG_D ("socks5 tproxy fini"); - hev_list_del (&tcp_set, &tcp->node); - hev_object_unref (HEV_OBJECT (tcp)); + if (work_threads) + hev_free (work_threads); + if (worker_list) + hev_free (worker_list); + hev_tsocks_cache_fini (); + hev_task_system_fini (); } -static void -hev_socks5_tcp_session_new (int fd) +static void * +work_thread_handler (void *data) { - HevSocks5SessionTCP *tcp; - struct sockaddr_in6 addr; - socklen_t addrlen; - int stack_size; - HevTask *task; + HevSocks5Worker **worker = data; int res; - LOG_D ("socks5 tcp session new"); - - addrlen = sizeof (addr); - res = getsockname (fd, (struct sockaddr *)&addr, &addrlen); + res = hev_task_system_init (); if (res < 0) { - LOG_E ("socks5 tcp orig dest"); - close (fd); - return; - } - - tcp = hev_socks5_session_tcp_new ((struct sockaddr *)&addr, fd); - if (!tcp) { - close (fd); - return; - } - - stack_size = hev_config_get_misc_task_stack_size (); - task = hev_task_new (stack_size); - if (!task) { - hev_object_unref (HEV_OBJECT (tcp)); - return; - } - - hev_tproxy_session_set_task (HEV_TPROXY_SESSION (tcp), task); - hev_list_add_tail (&tcp_set, &tcp->node); - hev_task_run (task, hev_socks5_tcp_session_task_entry, tcp); -} - -static void -hev_socks5_tcp_task_entry (void *data) -{ - HevListNode *node; - int fd; - - LOG_D ("socks5 tcp task run"); - - fd = hev_socks5_tproxy_tcp_socket (); - if (fd < 0) + LOG_E ("socks5 tproxy worker task system"); goto exit; - - hev_task_add_fd (hev_task_self (), fd, POLLIN); - - for (;;) { - int nfd; - - nfd = hev_task_io_socket_accept (fd, NULL, NULL, task_io_yielder, NULL); - if (nfd == -1) { - LOG_W ("socks5 tcp accept"); - continue; - } else if (nfd < 0) { - break; - } - - hev_socks5_tcp_session_new (nfd); } - node = hev_list_first (&tcp_set); - for (; node; node = hev_list_node_next (node)) { - HevSocks5SessionTCP *tcp; - - tcp = container_of (node, HevSocks5SessionTCP, node); - hev_tproxy_session_terminate (HEV_TPROXY_SESSION (tcp)); + res = hev_socks5_worker_init (*worker, 0); + if (res < 0) { + LOG_E ("socks5 tproxy worker init"); + goto free; } - close (fd); -exit: - task_tcp = NULL; -} - -static int -hev_socks5_udp_recvmsg (int fd, struct sockaddr *saddr, struct sockaddr *daddr, - void *buf, size_t len) -{ - union - { - char buf[CMSG_SPACE (sizeof (struct sockaddr_in6))]; - struct cmsghdr align; - } u; - struct msghdr mh = { 0 }; - struct iovec iov; - int res; - - iov.iov_base = buf; - iov.iov_len = len; - mh.msg_iov = &iov; - mh.msg_iovlen = 1; - mh.msg_name = saddr; - mh.msg_namelen = sizeof (struct sockaddr_in6); - mh.msg_control = u.buf; - mh.msg_controllen = sizeof (u.buf); - - res = hev_task_io_socket_recvmsg (fd, &mh, 0, task_io_yielder, NULL); - if (res < 0) - return res; - - msg_to_sock_addr (&mh, daddr); + hev_socks5_worker_start (*worker); - return res; -} - -static HevSocks5SessionUDP * -hev_socks5_udp_session_find (struct sockaddr *addr) -{ - HevRBTreeNode *node = udp_set.root; - - while (node) { - HevSocks5SessionUDP *this; - int res; - - this = container_of (node, HevSocks5SessionUDP, node); - res = memcmp (&this->addr, addr, sizeof (struct sockaddr_in6)); + hev_task_system_run (); - if (res < 0) - node = node->left; - else if (res > 0) - node = node->right; - else - return this; - } + hev_socks5_worker_destroy (*worker); + *worker = NULL; +free: + hev_task_system_fini (); +exit: return NULL; } -static void -hev_socks5_udp_session_add (HevSocks5SessionUDP *udp) -{ - HevRBTreeNode **new = &udp_set.root, *parent = NULL; - - while (*new) { - HevSocks5SessionUDP *this; - int res; - - this = container_of (*new, HevSocks5SessionUDP, node); - res = memcmp (&this->addr, &udp->addr, sizeof (struct sockaddr_in6)); - - parent = *new; - if (res < 0) - new = &((*new)->left); - else if (res > 0) - new = &((*new)->right); - } - - hev_rbtree_node_link (&udp->node, parent, new); - hev_rbtree_insert_color (&udp_set, &udp->node); -} - -static void -hev_socks5_udp_session_del (HevSocks5SessionUDP *udp) -{ - hev_rbtree_erase (&udp_set, &udp->node); -} - -static void -hev_socks5_udp_session_task_entry (void *data) -{ - HevSocks5SessionUDP *udp = data; - - hev_tproxy_session_run (HEV_TPROXY_SESSION (udp)); - - hev_socks5_udp_session_del (udp); - hev_object_unref (HEV_OBJECT (udp)); -} - -static HevSocks5SessionUDP * -hev_socks5_udp_session_new (struct sockaddr *addr) -{ - HevSocks5SessionUDP *udp; - int stack_size; - HevTask *task; - - LOG_D ("socks5 udp session new"); - - udp = hev_socks5_session_udp_new (addr); - if (!udp) - return NULL; - - stack_size = hev_config_get_misc_task_stack_size (); - task = hev_task_new (stack_size); - if (!task) { - hev_object_unref (HEV_OBJECT (udp)); - return NULL; - } - - hev_tproxy_session_set_task (HEV_TPROXY_SESSION (udp), task); - hev_socks5_udp_session_add (udp); - hev_task_run (task, hev_socks5_udp_session_task_entry, udp); - - return udp; -} - -static int -hev_socks5_udp_dispatch (struct sockaddr *saddr, struct sockaddr *daddr, - void *data, size_t len) +void +hev_socks5_tproxy_run (void) { - HevSocks5SessionUDP *udp; int res; + int i; - udp = hev_socks5_udp_session_find (saddr); - if (!udp) { - udp = hev_socks5_udp_session_new (saddr); - if (!udp) - return -1; - } - - res = hev_socks5_session_udp_send (udp, data, len, daddr); - - return res; -} - -static void -hev_socks5_udp_task_entry (void *data) -{ - HevRBTreeNode *node; - const char *addr; - const char *port; - int fd; - - LOG_D ("socks5 udp task run"); - - addr = hev_config_get_udp_address (); - port = hev_config_get_udp_port (); - if (!addr) - goto exit; - - fd = hev_socks5_tproxy_udp_socket (addr, port); - if (fd < 0) - goto exit; - - hev_task_add_fd (hev_task_self (), fd, POLLIN); - - for (;;) { - struct sockaddr_in6 saddr = { 0 }; - struct sockaddr_in6 daddr = { 0 }; - struct sockaddr *sap; - struct sockaddr *dap; - void *buf; - int res; - - buf = hev_malloc (UDP_BUF_SIZE); - sap = (struct sockaddr *)&saddr; - dap = (struct sockaddr *)&daddr; - - res = hev_socks5_udp_recvmsg (fd, sap, dap, buf, UDP_BUF_SIZE); - if (res == -1 || res == 0) { - LOG_W ("socks5 udp recvmsg"); - hev_free (buf); - continue; - } else if (res < 0) { - hev_free (buf); - break; - } + LOG_D ("socks5 tproxy run"); - res = hev_socks5_udp_dispatch (sap, dap, buf, res); - if (res < 0) - hev_free (buf); + worker_list[0] = hev_socks5_worker_new (); + if (!worker_list[0]) { + LOG_E ("socks5 tproxy worker"); + return; } - node = hev_rbtree_first (&udp_set); - for (; node; node = hev_rbtree_node_next (node)) { - HevSocks5SessionUDP *udp; - - udp = container_of (node, HevSocks5SessionUDP, node); - hev_tproxy_session_terminate (HEV_TPROXY_SESSION (udp)); + res = hev_socks5_worker_init (worker_list[0], 1); + if (res < 0) { + LOG_E ("socks5 tproxy worker init"); + return; } - close (fd); -exit: - task_udp = NULL; -} - -static void -hev_socks5_dns_session_task_entry (void *data) -{ - HevTProxySessionDNS *dns = data; - - hev_tproxy_session_run (HEV_TPROXY_SESSION (dns)); - - hev_list_del (&dns_set, &dns->node); - hev_object_unref (HEV_OBJECT (dns)); -} - -static void -hev_socks5_dns_task_entry (void *data) -{ - HevListNode *node; - const char *addr; - const char *port; - int stack_size; - int fd; - - LOG_D ("socks5 dns task run"); - - addr = hev_config_get_dns_address (); - port = hev_config_get_dns_port (); - if (!addr) - goto exit; - - fd = hev_socks5_tproxy_udp_socket (addr, port); - if (fd < 0) - goto exit; + hev_socks5_worker_start (worker_list[0]); - hev_task_add_fd (hev_task_self (), fd, POLLIN); - stack_size = hev_config_get_misc_task_stack_size (); - - for (;;) { - HevTProxySessionDNS *dns; - struct sockaddr *sap; - struct sockaddr *dap; - HevTask *task; - void *buffer; - int res; - - dns = hev_tproxy_session_dns_new (); - sap = hev_tproxy_session_dns_get_saddr (dns); - dap = hev_tproxy_session_dns_get_daddr (dns); - buffer = hev_tproxy_session_dns_get_buffer (dns); - - res = hev_socks5_udp_recvmsg (fd, sap, dap, buffer, UDP_BUF_SIZE); - if (res == -1 || res == 0) { - LOG_W ("socks5 dns recvmsg"); - hev_object_unref (HEV_OBJECT (dns)); - continue; - } else if (res <= 0) { - hev_object_unref (HEV_OBJECT (dns)); - break; + for (i = 1; i < workers; i++) { + worker_list[i] = hev_socks5_worker_new (); + if (!worker_list[i]) { + LOG_E ("socks5 tproxy worker"); + return; } - task = hev_task_new (stack_size); - hev_task_run (task, hev_socks5_dns_session_task_entry, dns); - hev_list_add_tail (&dns_set, &dns->node); - hev_tproxy_session_dns_set_size (dns, res); - hev_tproxy_session_set_task (HEV_TPROXY_SESSION (dns), task); + pthread_create (&work_threads[i], NULL, work_thread_handler, + &worker_list[i]); } - node = hev_list_first (&dns_set); - for (; node; node = hev_list_node_next (node)) { - HevTProxySessionDNS *dns; - - dns = container_of (node, HevTProxySessionDNS, node); - hev_tproxy_session_terminate (HEV_TPROXY_SESSION (dns)); - } - - close (fd); -exit: - task_dns = NULL; -} - -int -hev_socks5_tproxy_init (void) -{ - LOG_D ("socks5 tproxy init"); - - if (hev_task_system_init () < 0) { - LOG_E ("socks5 tproxy task system"); - goto exit; - } - - if (hev_tsocks_cache_init () < 0) { - LOG_E ("socks5 tproxy tsocks cache"); - goto exit; - } - - task_event = hev_task_new (-1); - if (!task_event) { - LOG_E ("socks5 tproxy task event"); - goto exit; - } + hev_task_system_run (); - task_tcp = hev_task_new (-1); - if (!task_tcp) { - LOG_E ("socks5 tproxy task tcp"); - goto exit; - } + if (worker_list[0]) { + int i; - task_udp = hev_task_new (-1); - if (!task_udp) { - LOG_E ("socks5 tproxy task udp"); - goto exit; - } + for (i = 1; i < workers; i++) + pthread_join (work_threads[i], NULL); - task_dns = hev_task_new (-1); - if (!task_dns) { - LOG_E ("socks5 tproxy task dns"); - goto exit; + hev_socks5_worker_destroy (worker_list[0]); + worker_list[0] = NULL; } - - signal (SIGPIPE, SIG_IGN); - signal (SIGINT, sigint_handler); - - return 0; - -exit: - hev_socks5_tproxy_fini (); - return -1; -} - -void -hev_socks5_tproxy_fini (void) -{ - LOG_D ("socks5 tproxy fini"); - - if (task_event) - hev_task_unref (task_event); - if (task_tcp) - hev_task_unref (task_tcp); - if (task_udp) - hev_task_unref (task_udp); - if (task_dns) - hev_task_unref (task_dns); - - hev_tsocks_cache_fini (); - hev_task_system_fini (); -} - -void -hev_socks5_tproxy_run (void) -{ - LOG_D ("socks5 tproxy run"); - - hev_task_run (task_event, hev_socks5_event_task_entry, NULL); - - if (task_tcp) - hev_task_run (task_tcp, hev_socks5_tcp_task_entry, NULL); - - if (task_udp) - hev_task_run (task_udp, hev_socks5_udp_task_entry, NULL); - - if (task_dns) - hev_task_run (task_dns, hev_socks5_dns_task_entry, NULL); - - hev_task_system_run (); } void hev_socks5_tproxy_stop (void) { + int i; + LOG_D ("socks5 tproxy stop"); - if (fd_event < 0) - return; + for (i = 0; i < workers; i++) { + HevSocks5Worker *worker; - if (eventfd_write (fd_event, 1) < 0) - LOG_E ("socks5 tproxy write event"); + worker = worker_list[i]; + if (!worker) + continue; + + hev_socks5_worker_stop (worker); + } } diff --git a/src/hev-socks5-worker.c b/src/hev-socks5-worker.c new file mode 100644 index 0000000..63fa4c3 --- /dev/null +++ b/src/hev-socks5-worker.c @@ -0,0 +1,596 @@ +/* + ============================================================================ + Name : hev-socks5-worker.c + Author : Heiher + Copyright : Copyright (c) 2025 hev + Description : Socks5 Worker + ============================================================================ + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "hev-utils.h" +#include "hev-config.h" +#include "hev-logger.h" +#include "hev-compiler.h" +#include "hev-config-const.h" +#include "hev-socket-factory.h" +#include "hev-socks5-session-tcp.h" +#include "hev-socks5-session-udp.h" +#include "hev-tproxy-session-dns.h" + +#include "hev-socks5-worker.h" + +static pthread_key_t key; +static pthread_once_t key_once = PTHREAD_ONCE_INIT; + +struct _HevSocks5Worker +{ + int quit; + int is_main; + int event_fds[2]; + + HevTask *task_tcp; + HevTask *task_udp; + HevTask *task_dns; + HevTask *task_event; + + HevList tcp_set; + HevList dns_set; + HevRBTree udp_set; +}; + +static void +pthread_key_creator (void) +{ + pthread_key_create (&key, NULL); +} + +static int +task_io_yielder (HevTaskYieldType type, void *data) +{ + HevSocks5Worker *self = data; + + hev_task_yield (type); + + return self->quit ? -1 : 0; +} + +static HevSocks5Worker * +hev_socks5_worker_self (void) +{ + return pthread_getspecific (key); +} + +static void +hev_socks5_tcp_session_task_entry (void *data) +{ + HevSocks5Worker *self = hev_socks5_worker_self (); + HevSocks5SessionTCP *tcp = data; + + hev_tproxy_session_run (HEV_TPROXY_SESSION (tcp)); + + hev_list_del (&self->tcp_set, &tcp->node); + hev_object_unref (HEV_OBJECT (tcp)); +} + +static void +hev_socks5_tcp_session_new (HevSocks5Worker *self, int fd) +{ + HevSocks5SessionTCP *tcp; + struct sockaddr_in6 addr; + socklen_t addrlen; + int stack_size; + HevTask *task; + int res; + + LOG_D ("socks5 tcp session new"); + + addrlen = sizeof (addr); + res = getsockname (fd, (struct sockaddr *)&addr, &addrlen); + if (res < 0) { + LOG_E ("socks5 tcp orig dest"); + close (fd); + return; + } + + tcp = hev_socks5_session_tcp_new ((struct sockaddr *)&addr, fd); + if (!tcp) { + close (fd); + return; + } + + stack_size = hev_config_get_misc_task_stack_size (); + task = hev_task_new (stack_size); + if (!task) { + hev_object_unref (HEV_OBJECT (tcp)); + return; + } + + hev_tproxy_session_set_task (HEV_TPROXY_SESSION (tcp), task); + hev_list_add_tail (&self->tcp_set, &tcp->node); + hev_task_run (task, hev_socks5_tcp_session_task_entry, tcp); +} + +static void +hev_socks5_tcp_task_entry (void *data) +{ + HevSocks5Worker *self = data; + HevListNode *node; + const char *addr; + const char *port; + int fd; + + LOG_D ("socks5 tcp task run"); + + addr = hev_config_get_tcp_address (); + port = hev_config_get_tcp_port (); + if (!addr) + goto exit; + + fd = hev_socket_factory_get (addr, port, SOCK_STREAM, !self->is_main); + if (fd < 0) + goto exit; + + hev_task_add_fd (hev_task_self (), fd, POLLIN); + + for (;;) { + int nfd; + + nfd = hev_task_io_socket_accept (fd, NULL, NULL, task_io_yielder, self); + if (nfd == -1) { + LOG_W ("socks5 tcp accept"); + continue; + } else if (nfd < 0) { + break; + } + + hev_socks5_tcp_session_new (self, nfd); + } + + node = hev_list_first (&self->tcp_set); + for (; node; node = hev_list_node_next (node)) { + HevSocks5SessionTCP *tcp; + + tcp = container_of (node, HevSocks5SessionTCP, node); + hev_tproxy_session_terminate (HEV_TPROXY_SESSION (tcp)); + } + + close (fd); +exit: + self->task_tcp = NULL; +} + +static int +hev_socks5_udp_recvmsg (HevSocks5Worker *self, int fd, struct sockaddr *saddr, + struct sockaddr *daddr, void *buf, size_t len) +{ + union + { + char buf[CMSG_SPACE (sizeof (struct sockaddr_in6))]; + struct cmsghdr align; + } u; + struct msghdr mh = { 0 }; + struct iovec iov; + int res; + + iov.iov_base = buf; + iov.iov_len = len; + mh.msg_iov = &iov; + mh.msg_iovlen = 1; + mh.msg_name = saddr; + mh.msg_namelen = sizeof (struct sockaddr_in6); + mh.msg_control = u.buf; + mh.msg_controllen = sizeof (u.buf); + + res = hev_task_io_socket_recvmsg (fd, &mh, 0, task_io_yielder, self); + if (res < 0) + return res; + + msg_to_sock_addr (&mh, daddr); + + return res; +} + +static HevSocks5SessionUDP * +hev_socks5_udp_session_find (HevSocks5Worker *self, struct sockaddr *addr) +{ + HevRBTreeNode *node = self->udp_set.root; + + while (node) { + HevSocks5SessionUDP *this; + int res; + + this = container_of (node, HevSocks5SessionUDP, node); + res = memcmp (&this->addr, addr, sizeof (struct sockaddr_in6)); + + if (res < 0) + node = node->left; + else if (res > 0) + node = node->right; + else + return this; + } + + return NULL; +} + +static void +hev_socks5_udp_session_add (HevSocks5Worker *self, HevSocks5SessionUDP *udp) +{ + HevRBTreeNode **new = &self->udp_set.root, *parent = NULL; + + while (*new) { + HevSocks5SessionUDP *this; + int res; + + this = container_of (*new, HevSocks5SessionUDP, node); + res = memcmp (&this->addr, &udp->addr, sizeof (struct sockaddr_in6)); + + parent = *new; + if (res < 0) + new = &((*new)->left); + else if (res > 0) + new = &((*new)->right); + } + + hev_rbtree_node_link (&udp->node, parent, new); + hev_rbtree_insert_color (&self->udp_set, &udp->node); +} + +static void +hev_socks5_udp_session_del (HevSocks5Worker *self, HevSocks5SessionUDP *udp) +{ + hev_rbtree_erase (&self->udp_set, &udp->node); +} + +static void +hev_socks5_udp_session_task_entry (void *data) +{ + HevSocks5Worker *self = hev_socks5_worker_self (); + HevSocks5SessionUDP *udp = data; + + hev_tproxy_session_run (HEV_TPROXY_SESSION (udp)); + + hev_socks5_udp_session_del (self, udp); + hev_object_unref (HEV_OBJECT (udp)); +} + +static HevSocks5SessionUDP * +hev_socks5_udp_session_new (HevSocks5Worker *self, struct sockaddr *addr) +{ + HevSocks5SessionUDP *udp; + int stack_size; + HevTask *task; + + LOG_D ("socks5 udp session new"); + + udp = hev_socks5_session_udp_new (addr); + if (!udp) + return NULL; + + stack_size = hev_config_get_misc_task_stack_size (); + task = hev_task_new (stack_size); + if (!task) { + hev_object_unref (HEV_OBJECT (udp)); + return NULL; + } + + hev_tproxy_session_set_task (HEV_TPROXY_SESSION (udp), task); + hev_socks5_udp_session_add (self, udp); + hev_task_run (task, hev_socks5_udp_session_task_entry, udp); + + return udp; +} + +static int +hev_socks5_udp_dispatch (HevSocks5Worker *self, struct sockaddr *saddr, + struct sockaddr *daddr, void *data, size_t len) +{ + HevSocks5SessionUDP *udp; + int res; + + udp = hev_socks5_udp_session_find (self, saddr); + if (!udp) { + udp = hev_socks5_udp_session_new (self, saddr); + if (!udp) + return -1; + } + + res = hev_socks5_session_udp_send (udp, data, len, daddr); + + return res; +} + +static void +hev_socks5_udp_task_entry (void *data) +{ + HevSocks5Worker *self = data; + HevRBTreeNode *node; + const char *addr; + const char *port; + int fd; + + LOG_D ("socks5 udp task run"); + + addr = hev_config_get_udp_address (); + port = hev_config_get_udp_port (); + if (!addr) + goto exit; + + fd = hev_socket_factory_get (addr, port, SOCK_DGRAM, !self->is_main); + if (fd < 0) + goto exit; + + hev_task_add_fd (hev_task_self (), fd, POLLIN); + + for (;;) { + struct sockaddr_in6 saddr = { 0 }; + struct sockaddr_in6 daddr = { 0 }; + struct sockaddr *sap; + struct sockaddr *dap; + void *buf; + int res; + + buf = hev_malloc (UDP_BUF_SIZE); + sap = (struct sockaddr *)&saddr; + dap = (struct sockaddr *)&daddr; + + res = hev_socks5_udp_recvmsg (self, fd, sap, dap, buf, UDP_BUF_SIZE); + if (res == -1 || res == 0) { + LOG_W ("socks5 udp recvmsg"); + hev_free (buf); + continue; + } else if (res < 0) { + hev_free (buf); + break; + } + + res = hev_socks5_udp_dispatch (self, sap, dap, buf, res); + if (res < 0) + hev_free (buf); + } + + node = hev_rbtree_first (&self->udp_set); + for (; node; node = hev_rbtree_node_next (node)) { + HevSocks5SessionUDP *udp; + + udp = container_of (node, HevSocks5SessionUDP, node); + hev_tproxy_session_terminate (HEV_TPROXY_SESSION (udp)); + } + + close (fd); +exit: + self->task_udp = NULL; +} + +static void +hev_socks5_dns_session_task_entry (void *data) +{ + HevSocks5Worker *self = hev_socks5_worker_self (); + HevTProxySessionDNS *dns = data; + + hev_tproxy_session_run (HEV_TPROXY_SESSION (dns)); + + hev_list_del (&self->dns_set, &dns->node); + hev_object_unref (HEV_OBJECT (dns)); +} + +static void +hev_socks5_dns_task_entry (void *data) +{ + HevSocks5Worker *self = data; + HevListNode *node; + const char *addr; + const char *port; + int stack_size; + int fd; + + LOG_D ("socks5 dns task run"); + + addr = hev_config_get_dns_address (); + port = hev_config_get_dns_port (); + if (!addr) + goto exit; + + fd = hev_socket_factory_get (addr, port, SOCK_DGRAM, !self->is_main); + if (fd < 0) + goto exit; + + hev_task_add_fd (hev_task_self (), fd, POLLIN); + stack_size = hev_config_get_misc_task_stack_size (); + + for (;;) { + HevTProxySessionDNS *dns; + struct sockaddr *sap; + struct sockaddr *dap; + HevTask *task; + void *buffer; + int res; + + dns = hev_tproxy_session_dns_new (); + sap = hev_tproxy_session_dns_get_saddr (dns); + dap = hev_tproxy_session_dns_get_daddr (dns); + buffer = hev_tproxy_session_dns_get_buffer (dns); + + res = hev_socks5_udp_recvmsg (self, fd, sap, dap, buffer, UDP_BUF_SIZE); + if (res == -1 || res == 0) { + LOG_W ("socks5 dns recvmsg"); + hev_object_unref (HEV_OBJECT (dns)); + continue; + } else if (res <= 0) { + hev_object_unref (HEV_OBJECT (dns)); + break; + } + + task = hev_task_new (stack_size); + hev_task_run (task, hev_socks5_dns_session_task_entry, dns); + hev_list_add_tail (&self->dns_set, &dns->node); + hev_tproxy_session_dns_set_size (dns, res); + hev_tproxy_session_set_task (HEV_TPROXY_SESSION (dns), task); + } + + node = hev_list_first (&self->dns_set); + for (; node; node = hev_list_node_next (node)) { + HevTProxySessionDNS *dns; + + dns = container_of (node, HevTProxySessionDNS, node); + hev_tproxy_session_terminate (HEV_TPROXY_SESSION (dns)); + } + + close (fd); +exit: + self->task_dns = NULL; +} + +static void +hev_socks5_event_task_entry (void *data) +{ + HevSocks5Worker *self = data; + int res; + + LOG_D ("socks5 event task run"); + + res = hev_task_io_pipe_pipe (self->event_fds); + if (res < 0) { + LOG_E ("socks5 proxy pipe"); + return; + } + + hev_task_add_fd (hev_task_self (), self->event_fds[0], POLLIN); + + for (;;) { + char val; + + res = hev_task_io_read (self->event_fds[0], &val, sizeof (val), NULL, + NULL); + if (res < sizeof (val)) + continue; + + break; + } + + self->quit = 1; + + if (self->task_tcp) + hev_task_wakeup (self->task_tcp); + if (self->task_udp) + hev_task_wakeup (self->task_udp); + if (self->task_dns) + hev_task_wakeup (self->task_dns); + + close (self->event_fds[0]); + close (self->event_fds[1]); +} + +HevSocks5Worker * +hev_socks5_worker_new (void) +{ + HevSocks5Worker *self; + + self = calloc (1, sizeof (HevSocks5Worker)); + if (!self) + return NULL; + + LOG_D ("%p socks5 worker new", self); + + self->event_fds[0] = -1; + self->event_fds[1] = -1; + + pthread_once (&key_once, pthread_key_creator); + + return self; +} + +void +hev_socks5_worker_destroy (HevSocks5Worker *self) +{ + LOG_D ("%p works worker destroy", self); + + free (self); +} + +int +hev_socks5_worker_init (HevSocks5Worker *self, int is_main) +{ + LOG_D ("%p works worker init", self); + + self->task_event = hev_task_new (-1); + if (!self->task_event) { + LOG_E ("socks5 worker task event"); + goto exit; + } + + self->task_tcp = hev_task_new (-1); + if (!self->task_tcp) { + LOG_E ("socks5 worker task tcp"); + goto free_event; + } + + self->task_udp = hev_task_new (-1); + if (!self->task_udp) { + LOG_E ("socks5 worker task udp"); + goto free_tcp; + } + + self->task_dns = hev_task_new (-1); + if (!self->task_dns) { + LOG_E ("socks5 worker task dns"); + goto free_udp; + } + + self->is_main = is_main; + pthread_setspecific (key, self); + + return 0; + +free_udp: + hev_task_unref (self->task_udp); +free_tcp: + hev_task_unref (self->task_tcp); +free_event: + hev_task_unref (self->task_event); +exit: + return -1; +} + +void +hev_socks5_worker_start (HevSocks5Worker *self) +{ + LOG_D ("%p works worker start", self); + + hev_task_run (self->task_event, hev_socks5_event_task_entry, self); + + if (self->task_tcp) + hev_task_run (self->task_tcp, hev_socks5_tcp_task_entry, self); + + if (self->task_udp) + hev_task_run (self->task_udp, hev_socks5_udp_task_entry, self); + + if (self->task_dns) + hev_task_run (self->task_dns, hev_socks5_dns_task_entry, self); +} + +void +hev_socks5_worker_stop (HevSocks5Worker *self) +{ + char val = 's'; + + LOG_D ("%p works worker stop", self); + + if (self->event_fds[1] < 0) + return; + + val = write (self->event_fds[1], &val, sizeof (val)); + if (val < 0) + LOG_E ("socks5 proxy write event"); +} diff --git a/src/hev-socks5-worker.h b/src/hev-socks5-worker.h new file mode 100644 index 0000000..51ce879 --- /dev/null +++ b/src/hev-socks5-worker.h @@ -0,0 +1,22 @@ +/* + ============================================================================ + Name : hev-socks5-worker.h + Author : Heiher + Copyright : Copyright (c) 2025 hev + Description : Socks5 Worker + ============================================================================ + */ + +#ifndef __HEV_SOCKS5_WORKER_H__ +#define __HEV_SOCKS5_WORKER_H__ + +typedef struct _HevSocks5Worker HevSocks5Worker; + +HevSocks5Worker *hev_socks5_worker_new (void); +void hev_socks5_worker_destroy (HevSocks5Worker *self); +int hev_socks5_worker_init (HevSocks5Worker *self, int is_main); + +void hev_socks5_worker_start (HevSocks5Worker *self); +void hev_socks5_worker_stop (HevSocks5Worker *self); + +#endif /* __HEV_SOCKS5_WORKER_H__ */