Skip to content

Latest commit

 

History

History
483 lines (378 loc) · 16.1 KB

16.md

File metadata and controls

483 lines (378 loc) · 16.1 KB

第16节 并发实现N:1多个客户端连接一个服务器

❤️💕💕计算机网络--TCP/IP 学习。Myblog:http://nsddd.top


[TOC]

阻塞

在上一节的代码中,我们发现服务端有三个地方可能造成阻塞

  1. accept:如果没有新的客户端连接,就会阻塞在
  2. read:如果客户端不发送数据,就会阻塞
  3. write:如果写的缓冲区满了,也会阻塞

如何解决上面程序阻塞问题,第十节

  1. 使用多线程实现
  2. 使用多进程实现
  3. 使用IO多路转接(复用)实现(单线程可用,但是效率低)
  4. 使用IO多路转接+多线程实现

image-20220721101327615

分析和解决

单线程 / 进程在 TCP 通信过程中,服务器端启动之后可以同时和多个客户端建立连接,并进行网络通信,但是在介绍 TCP 通信流程的时候,提供的服务器代码却不能完成这样的需求,先简单的看一下之前的服务器代码的......

在 TCP 通信过程中,服务器端启动之后可以同时和多个客户端建立连接,并进行网络通信,但是在介绍 TCP 通信流程的时候,提供的服务器代码却不能完成这样的需求,先简单的看一下之前的服务器代码的处理思路,再来分析代码中的弊端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
    
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(10000);   
    
    
    addr.sin_addr.s_addr = INADDR_ANY;  
    int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
    
    ret = listen(lfd, 128);
    
    struct sockaddr_in cliaddr;
    int clilen = sizeof(cliaddr);
    int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
    
    while(1)
    {
        
        char buf[1024];
        memset(buf, 0, sizeof(buf));
        int len = read(cfd, buf, sizeof(buf));
        if(len > 0)
        {
            printf("客户端say: %s\n", buf);
            write(cfd, buf, len);
        }
        else if(len  == 0)
        {
            printf("客户端断开了连接...\n");
            break;
        }
        else
        {
            perror("read");
            break;
        }
    }
    close(cfd);
    close(lfd);
    return 0;
}

在上面的代码中用到了三个会引起程序阻塞的函数,分别是:

  • accept():如果服务器端没有新客户端连接,阻塞当前进程 / 线程,如果检测到新连接解除阻塞,建立连接
  • read():如果通信的套接字对应的读缓冲区没有数据,阻塞当前进程 / 线程,检测到数据解除阻塞,接收数据
  • write():如果通信的套接字写缓冲区被写满了,阻塞当前进程 / 线程(这种情况比较少见)

如果需要和发起新的连接请求的客户端建立连接,那么就必须在服务器端通过一个循环调用 accept() 函数,另外已经和服务器建立连接的客户端需要和服务器通信,发送数据时的阻塞可以忽略,当接收不到数据时程序也会被阻塞,这时候就会非常矛盾,被 accept() 阻塞就无法通信,被 read() 阻塞就无法和客户端建立新连接。因此得出一个结论,基于上述处理方式,在单线程 / 单进程场景下,服务器是无法处理多连接的,解决方案也有很多,常用的有三种:

  1. 使用多线程实现
  2. 使用多进程实现
  3. 使用 IO 多路转接(复用)实现
  4. 使用 IO 多路转接 + 多线程实现

如果要编写多进程版的并发服务器程序,首先要考虑,创建出的多个进程都是什么角色,这样就可以在程序中对号入座了。在 Tcp 服务器端一共有两个角色,分别是:监听和通信,监听是一个持续的动作,如果有新连接就建立连接,如果没有新连接就阻塞。关于通信是需要和多个客户端同时进行的,因此需要多个进程,这样才能达到互不影响的效果。进程也有两大类:父进程和子进程,通过分析我们可以这样分配进程:

  • 父进程:
    • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用 accept() 函数
    • 创建子进程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
    • 回收子进程资源:子进程退出回收其内核 PCB 资源,防止出现僵尸进程
  • 子进程:负责通信,基于父进程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。
    • 发送数据:send() / write()
    • 接收数据:recv() / read()

在多进程版的服务器端程序中,多个进程是有血缘关系,对应有血缘关系的进程来说,还需要想明白他们有哪些资源是可以被继承的,哪些资源是独占的,以及一些其他细节:

  • 子进程是父进程的拷贝,在子进程的内核区 PCB 中,文件描述符也是可以被拷贝的,因此在父进程可以使用的文件描述符在子进程中也有一份,并且可以使用它们做和父进程一样的事情。
  • 父子进程有用各自的独立的虚拟地址空间,因此所有的资源都是独占的
  • 为了节省系统资源,对于只有在父进程才能用到的资源,可以在子进程中将其释放掉,父进程亦如此。
  • 由于需要在父进程中做 accept() 操作,并且要释放子进程资源,如果想要更高效一下可以使用信号的方式处理

img

多进程版并发 TCP 服务器示例代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <errno.h>

// 信号处理函数
void callback(int num)
{
    while(1)
    {
        pid_t pid = waitpid(-1, NULL, WNOHANG);
        if(pid <= 0)
        {
            printf("子进程正在运行, 或者子进程被回收完毕了\n");
            break;
        }
        printf("child die, pid = %d\n", pid);
    }
}

int childWork(int cfd);
int main()
{
    // 1. 创建监听的套接字
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    if(lfd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 将socket()返回值和本地的IP端口绑定到一起
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(10000);   // 大端端口
    // INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址
    // 这个宏可以代表任意一个IP地址
    // 这个宏一般用于本地的绑定操作
    addr.sin_addr.s_addr = INADDR_ANY;  // 这个宏的值为0 == 0.0.0.0
    //    inet_pton(AF_INET, "192.168.237.131", &addr.sin_addr.s_addr);
    int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
        perror("bind");
        exit(0);
    }

    // 3. 设置监听
    ret = listen(lfd, 128);
    if(ret == -1)
    {
        perror("listen");
        exit(0);
    }

    // 注册信号的捕捉
    struct sigaction act;
    act.sa_flags = 0;
    act.sa_handler = callback;
    sigemptyset(&act.sa_mask);
    sigaction(SIGCHLD, &act, NULL);

    // 接受多个客户端连接, 对需要循环调用 accept
    while(1)
    {
        // 4. 阻塞等待并接受客户端连接
        struct sockaddr_in cliaddr;
        int clilen = sizeof(cliaddr);
        int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
        if(cfd == -1)
        {
            if(errno == EINTR)
            {
                // accept调用被信号中断了, 解除阻塞, 返回了-1
                // 重新调用一次accept
                continue;
            }
            perror("accept");
            exit(0);
 
        }
        // 打印客户端的地址信息
        char ip[24] = {0};
        printf("客户端的IP地址: %s, 端口: %d\n",
               inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof(ip)),
               ntohs(cliaddr.sin_port));
        // 新的连接已经建立了, 创建子进程, 让子进程和这个客户端通信
        pid_t pid = fork();
        if(pid == 0)
        {
            // 子进程 -> 和客户端通信
            // 通信的文件描述符cfd被拷贝到子进程中
            // 子进程不负责监听
            close(lfd);
            while(1)
            {
                int ret = childWork(cfd);
                if(ret <=0)
                {
                    break;
                }
            }
            // 退出子进程
            close(cfd);
            exit(0);
        }
        else if(pid > 0)
        {
            // 父进程不和客户端通信
            close(cfd);
        }
    }
    return 0;
}


// 5. 和客户端通信
int childWork(int cfd)
{

    // 接收数据
    char buf[1024];
    memset(buf, 0, sizeof(buf));
    int len = read(cfd, buf, sizeof(buf));
    if(len > 0)
    {
        printf("客户端say: %s\n", buf);
        write(cfd, buf, len);
    }
    else if(len  == 0)
    {
        printf("客户端断开了连接...\n");
    }
    else
    {
        perror("read");
    }

    return len;
}

在上面的示例代码中,父子进程中分别关掉了用不到的文件描述符(父进程不需要通信,子进程也不需要监听)。如果客户端主动断开连接,那么服务器端负责和客户端通信的子进程也就退出了,子进程退出之后会给父进程发送一个叫做 SIGCHLD 的信号,在父进程中通过 sigaction() 函数捕捉了该信号,通过回调函数 callback() 中的 waitpid() 对退出的子进程进行了资源回收。

另外还有一个细节要说明一下,这是父进程的处理代码:

int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
while(1)
{
        int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
        if(cfd == -1)
        {
            if(errno == EINTR)
            {
                
                
                continue;
            }
            perror("accept");
            exit(0);
 
        }
 }

如果父进程调用 accept() 函数没有检测到新的客户端连接,父进程就阻塞在这儿了,这时候有子进程退出了,发送信号给父进程,父进程就捕捉到了这个信号 SIGCHLD, 由于信号的优先级很高,会打断代码正常的执行流程,因此父进程的阻塞被中断,转而去处理这个信号对应的函数 callback(),处理完毕,再次回到 accept() 位置,但是这是已经无法阻塞了,函数直接返回 - 1,此时函数调用失败,错误描述为 accept: Interrupted system call,对应的错误号为 EINTR,由于代码是被信号中断导致的错误,所以可以在程序中对这个错误号进行判断,让父进程重新调用 accept(),继续阻塞或者接受客户端的新连接。

编写多线程版的并发服务器程序和多进程思路差不多,考虑明白了对号入座即可。多线程中的线程有两大类:主线程(父线程)和子线程,他们分别要在服务器端处理监听和通信流程。根据多进程的处理思路,就可以这样设计了:

  • 主线程:
    • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用 accept() 函数
    • 创建子线程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
    • 回收子线程资源:由于回收需要调用阻塞函数,这样就会影响 accept(),直接做线程分离即可。
  • 子线程:负责通信,基于主线程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。
    • 发送数据:send() / write()
    • 接收数据:recv() / read()

在多线程版的服务器端程序中,多个线程共用同一个地址空间,有些数据是共享的,有些数据的独占的,下面来分析一些其中的一些细节:

  • 同一地址空间中的多个线程的栈空间是独占的
  • 多个线程共享全局数据区,堆区,以及内核区的文件描述符等资源,因此需要注意数据覆盖问题,并且在多个线程访问共享资源的时候,还需要进行线程同步。

img

多线程版 Tcp 服务器示例代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>

struct SockInfo
{
    int fd;                      // 通信
    pthread_t tid;               // 线程ID
    struct sockaddr_in addr;     // 地址信息
};

// 初始化结构体数组
struct SockInfo infos[128];

void* working(void* arg)
{
    while(1)
    {
        struct SockInfo* info = (struct SockInfo*)arg;
        // 接收数据
        char buf[1024];
        int ret = read(info->fd, buf, sizeof(buf));
        if(ret == 0)
        {
            printf("客户端已经关闭连接...\n");
            info->fd = -1;
            break;
        }
        else if(ret == -1)
        {
            printf("接收数据失败...\n");
            info->fd = -1;
            break;
        }
        else
        {
            write(info->fd, buf, strlen(buf)+1);
        }
    }
    return NULL;
}

int main()
{
    // 1. 创建用于监听的套接字
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 绑定
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;          // ipv4
    addr.sin_port = htons(8989);        // 字节序应该是网络字节序
    addr.sin_addr.s_addr =  INADDR_ANY; // == 0, 获取IP的操作交给了内核
    int ret = bind(fd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
        perror("bind");
        exit(0);
    }

    // 3.设置监听
    ret = listen(fd, 100);
    if(ret == -1)
    {
        perror("listen");
        exit(0);
    }

    // 4. 等待, 接受连接请求
    int len = sizeof(struct sockaddr);

    // 数据初始化
    int max = sizeof(infos) / sizeof(infos[0]);
    for(int i=0; i<max; ++i)
    {
        bzero(&infos[i], sizeof(infos[i]));
        infos[i].fd = -1;
        infos[i].tid = -1;
    }

    // 父进程监听, 子进程通信
    while(1)
    {
        // 创建子线程
        struct SockInfo* pinfo;
        for(int i=0; i<max; ++i)
        {
            if(infos[i].fd == -1)
            {
                pinfo = &infos[i];
                break;
            }
            if(i == max-1)
            {
                sleep(1);
                i--;
            }
        }

        int connfd = accept(fd, (struct sockaddr*)&pinfo->addr, &len);
        printf("parent thread, connfd: %d\n", connfd);
        if(connfd == -1)
        {
            perror("accept");
            exit(0);
        }
        pinfo->fd = connfd;
        pthread_create(&pinfo->tid, NULL, working, pinfo);
        pthread_detach(pinfo->tid);
    }

    // 释放资源
    close(fd);  // 监听

    return 0;
}

在编写多线程版并发服务器代码的时候,需要注意父子线程共用同一个地址空间中的文件描述符,因此每当在主线程中建立一个新的连接,都需要将得到文件描述符值保存起来,不能在同一变量上进行覆盖,这样做丢失了之前的文件描述符值也就不知道怎么和客户端通信了。

在上面示例代码中是将成功建立连接之后得到的用于通信的文件描述符值保存到了一个全局数组中,每个子线程需要和不同的客户端通信,需要的文件描述符值也就不一样,只要保证存储每个有效文件描述符值的变量对应不同的内存地址,在使用的时候就不会发生数据覆盖的现象,造成通信数据的混乱了。

END 链接