Linux系统编程之多进程、多线程并发服务器

本章节就服务器的开章,仅介绍多进程,多线程下高并发服务器模型的实现。这两种模型实现简单,逻辑清晰,但同时局限性也很大,受限于系统资源以及文件描述符上限等,后续会就多路IO转接服务器进行进一步分析。

高并发服务器

多进程并发服务器

多进程并发服务器,顾名思义,采用Linux下多进程机制,对于多个客户端链接请求,服务器端对应多个进程与其进行数据通信交互。可模型理解简单,代码实现也容易,但其缺点也显而易见,主要几种以下几个方面:

  1. 父进程能够创建的最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符)
  2. 系统内创建进程个数(内存大小相关)
  3. 进程创建过多是否降低整体服务性能(进程调度)

Server

#include "wrap.h"
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>

#define SERVER_PORT 8000
#define BUF_SIZE 1024

// 信号捕捉函数,处理子进程退出,回收系统资源
void do_sigchild(void* arg){
    // 0 -> 回收和当前调用waitpid一组的所有子进程
    // 即当前进程内的所有子进程
    waitpid(0,NULL,WNOHANG);
}

int main(int argc, char *argv[]){

    int sockfd,confd, readlen;
    struct sockaddr_in serveraddr,clientaddr;
    socklen_t clientaddr_len;
    char clientIP[128];
    char buf[BUF_SIZE];

    pid_t pid;

    // 设置信号捕追函数,捕追子进程退出,节约资源
    struct sigaction newact;
    newact.sa_flags = 0;
    newact.sa_handler = do_sigchild;
    sigemptyset(&newact);
    sigaction(SIGCHLD,&newact,NULL);

    sockfd = Socket(AF_INET, SOCK_STREAM, 0);

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(SERVER_PORT);

    Bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));

    Listen(sockfd,30);
    printf(" Accepting connections ...\n");

    while(1){

        clientaddr_len = sizeof(clientaddr);
        // 监听客户端链接,成功链接返回描述符,否则阻塞
        confd = Accept(sockfd, (struct sockaddr*)&clientaddr, &clientaddr_len);

        // 此时有客户端链接上服务器,创建子进程与其进行通信,父进程继续监听新的链接请求
        pid = fork();
        if(pid == 0){
            // 子进程内,关闭父子进程继承的文件描述符,节约系统资源
            Close(sockfd);
            // 数据交互
            while (1) {
                readlen = Read(confd, buf, BUF_SIZE);
                if (readlen == 0) {
                    printf("the other side has been closed.\n");
                    break;
                }
                printf("received from %s at PORT %d received: %s\n",
                       inet_ntop(AF_INET, &clientaddr.sin_addr, clientIP, sizeof(clientIP)),
                       ntohs(clientaddr.sin_port),
                       buf);
                for (int i = 0; i < readlen; i++)
                    buf[i] = toupper(buf[i]);
                Write(confd, buf, readlen);
            }
            // 数据交互结束,关闭socket链接,等待信号捕捉函数回收进程资源
            Close(confd);
            return 0;
        }else if (pid > 0) {
            // 父进程内关闭链接客户端的描述符,父进程只关心链接情况,而不参与数据请求(子进程参与数据请求)
            Close(confd);
        }else {
            // fork失败,退出
            perr_exit("fork");
        }
    }
}

以上代码需要注意几点:

  1. 子进程的回收处理需要在fork之前进程,因为信号捕捉函数会fork继承到子进程内,因为子进程的退出(发送SIGCHLD)可以得到处理,回收系统资源
  2. 进程的创建在链接请求操作完成之后进行,如果没有客户端链接,那么就没有子进程创建
  3. 父进程只负责监听客户端链接,分配描述符进行子进程的创建,继承下来的通信confd描述符需要关闭
  4. 子进程也需要关闭继承下来的sockfd,子进程只关心数据通信通信(confd),进一步节省系统资源

从模型中可以看出,多进程下高并发服务器下,对于进程的创建开销极其大,非常考验系统内存容量,并且受限于文件描述符的大小。为了进一步节省资源,需要对进程内不需要使用的文件描述的进行适当的关闭。同时,新的链接请求产生,就意味着新进程的创建,因此高并发下多个客户端同时链接,会造成服务器某一时刻负载极其大,容易出现未知的问题。

Clinet

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include "wrap.h"
#define MAXLINE 80
#define SERV_PORT 8000

int main(int argc, char *argv[])
{
    struct sockaddr_in servaddr;
    char buf[MAXLINE];
    int sockfd, n;
    char ServerIP[] ="127.0.0.1";

    sockfd = Socket(AF_INET, SOCK_STREAM, 0);
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;

    inet_pton(AF_INET, ServerIP, &servaddr.sin_addr);
    servaddr.sin_port = htons(SERV_PORT);
    Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

    while (fgets(buf, MAXLINE, stdin) != NULL) {
        Write(sockfd, buf, strlen(buf));
        n = Read(sockfd, buf, MAXLINE);
        if (n == 0)
            printf("the other side has been closed.\n");
        else
            Write(STDOUT_FILENO, buf, n);
    }
    Close(sockfd);
    return 0;
}

客户端代码就容易许多了,主要进行连接服务器,等待用户输入字符传入服务器处理,再将服务器回传数据显示打印到输出窗口中。

多线程并发服务器

从上面介绍的多进程服务模型可以推出,多线程服务模型也就是将进程概念转换为线程而已,但是考虑创建线程的开销肯定比进程来的小,所以多线程高并发服务器还是有一定的优势的。

多线程下高并发服务器的编程中需要注意以下几点:

  1. 调整进程内最大文件描述符上限
  2. 线程如有共享数据,考虑线程同步
  3. 服务于客户端线程退出时,退出处理。(退出值,分离态)
  4. 系统负载,随着链接客户端增加,导致其它线程不能及时得到CPU

Server

#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 8000
#define MAX_PTHREAD_SIZE 256

// 自定义结构体,存储客户端地址以及对应的服务器connfd
struct s_info {
    struct sockaddr_in cliaddr;
    int connfd;
};

// 线程工作载入实体
void *do_work(void *arg)
{
    int n,i;
    // 强转数据,得到s_info
    struct s_info *ts = (struct s_info*)arg;
    char buf[MAXLINE];
    char str[INET_ADDRSTRLEN];
    /* 可以在创建线程前设置线程创建属性,设为分离态,哪种效率高内? */
    pthread_detach(pthread_self());
    while (1) {
        n = Read(ts->connfd, buf, MAXLINE);
        if (n == 0) {
            // 客户端关闭,跳出循环
            printf("the other side has been closed.\n");
            break;
        }
        printf("received from %s at PORT %d\n",
               inet_ntop(AF_INET, &(*ts).cliaddr.sin_addr, str, sizeof(str)),
               ntohs((*ts).cliaddr.sin_port));
        for (i = 0; i < n; i++)
            buf[i] = toupper(buf[i]);
        Write(ts->connfd, buf, n);
    }
    // 客户端关闭,服务端线程关闭文件描述符
    // 由于线程已置位分离态,系统自动回收线程资源
    Close(ts->connfd);
}

int main(void)
{
    struct sockaddr_in servaddr, cliaddr;
    socklen_t cliaddr_len;
    int listenfd, connfd;
    int i = 0;
    pthread_t tid;
    // 配置同一时刻线程峰值数
    struct s_info ts[MAX_PTHREAD_SIZE];

    listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);
    Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    Listen(listenfd, 20);
    printf("Accepting connections ...\n");
    // 主函数内主负责监听客户端链接,创建子线程
    while (1) {
        cliaddr_len = sizeof(cliaddr);
        connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
        // 将必要的信息封装到结构体传入子线程中
        ts[i].cliaddr = cliaddr;
        ts[i].connfd = connfd;
        /* 达到线程最大数时,pthread_create出错处理, 增加服务器稳定性 */
        pthread_create(&tid, NULL, do_work, (void*)&ts[i]);
        i++;
        // 子线程数量过载,退出
        if(i>=MAX_PTHREAD_SIZE)
            break;
    }
    Close(listenfd);
    return 0;
}

以上代码仅仅是个模型,许多细节有待优化,集中以下几点:

  1. 线程峰值数量问题:上述采用的数组容器,其峰值预先定好,对于先前创建的线程因为客户端关闭而退出,后被系统回收了情况下,此时数组容器应当记录并将给予新创建的子线程使用
  2. 子线程创建时分离态的问题:子线程创建之前就通过 pthread_attr_setdetachstate 设置,相比创建时通过 pthread_detach 来说,更有效率
  3. 和多进程模型相似,新链接的产生,对应子线程的创建,同一时刻下过多客户端链接下多线程的创建造成服务器负载过高的问题
  4. 进程pcb大小问题,适当提高进程下线程数量,也可以提高多线程高并发下的优势

Client

客户端不涉及多进程和多线程概念,同上多进程下 Client 代码一致,此处不再贴源码了。

线程池并发服务器

通过以上两个高并发服务器,都有一个共同特点,那就是一对一的数据交互服务连接。一个客户端,服务器分配一个进程或者线程与其对接完成数据交互。然而应用到现实中去,如果同一时间多个客户端同时访问链接服务器,那么对于服务器端的负载过大是显而易见的,因此在此基础上,有一种更加高效的模型就诞生了,线程池高并发服务器。

同样采用线程来处理客户端的数据请求,事先分配一定数额的工作线程,然后由一个线程池管理。线程池内约束一定量的线程数量,优先保障这些线程的工作效率。先到来的客户端链接请求会优先得到线程的处理,如果同一时刻客户端访问过多,那么后面需要链接请求的,就需要等待工作线程处理结束,才能得到响应。

Server

#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX 10
int fds[MAX]={0};
sem_t sem;

// 静态初始化锁
pthread_mutex_t luck = PTHREAD_MUTEX_INITIALIZER;

void fds_init()
{
    pthread_mutex_lock(&luck);
    int i=0;
    for(;i<MAX;i++)
    {
        fds[i]=-1;
    }
    pthread_mutex_unlock(&luck);
}

int fds_add(int c)
{
    pthread_mutex_lock(&luck);
    int i=0;
    for(;i<MAX;i++)
    {
        if(fds[i]==-1)
        {
            fds[i]=c;
            return 1;
        }
    }
    pthread_mutex_unlock(&luck);
    return 0;
}

void fds_sub(int c)
{
    pthread_mutex_lock(&luck);
    int i=0;
    for(;i<MAX-1;i++)
    {
        fds[i]=fds[i+1];
        if(fds[i+1]==-1)
            break;
    }
    pthread_mutex_unlock(&luck);
    fds[MAX-1]=-1;
}

int fds_get()
{
    pthread_mutex_lock(&luck);
    int i=0;
    for(;i<MAX;i++)
    {
        if(fds[i]!=-1)
        {
            int c=fds[i];
            fds_sub(c);
            return c;
        }
    }
    pthread_mutex_unlock(&luck);
    return -1;
}

// 线程函数载入实体
void *pthread_fun(void *arg)
{
    while(1)
    {
        // 获取信号量,有剩余空余线程即进入,否则等待信号号产生
        sem_wait(&sem);
        // 通过锁机制获取待处理的客户端链接请求
        int c=fds_get();
        while(1)
        {
            char buff[128]={0};
            int n = Read(c,buff,127);
            if(n<=0)
            {
                Close(c);
                break;
            }
            Write(c,"I accept",sizeof("I accept"));
        }
    }
}

int main()
{
    // 初始化信号量
    sem_init(&sem,0,0);
    // 初始化
    fds_init();
    // 线程池工作队列 3 个
    for(int i=0;i<3;i++)
    {
        pthread_t id;
        int rt=pthread_create(&id,NULL,(void *)pthread_fun,NULL);
        assert(rt==0);
    }

    int sockfd=Socket(AF_INET,SOCK_STREAM,0);
    assert(sockfd!=-1);

    struct sockaddr_in ser,cli;
    memset(&ser,0,sizeof(ser));
    ser.sin_family=AF_INET;
    ser.sin_addr.s_addr=inet_addr("127.0.0.1");
    ser.sin_port=htons(8000);

    int res=Bind(sockfd,(struct sockaddr *)&ser,sizeof(ser));
    assert(res!=-1);

    Listen(sockfd,5);

    printf("Accepting connections ...\n");

    while(1)
    {
        int len=sizeof(cli);
        int c=Accept(sockfd,(struct sockaddr *)&cli,&len);
        if(c>=0)
        {
            if(!fds_add(c))
            {
                printf("Please wait a memmet");
                Close(c);
                continue;
            }
            // 产生一个信号量,其中一个工作线程开始处理
            sem_post(&sem);
        }
        else
        {
            printf("error\n");
            continue;
        }
    }
}

上述代码中:

  1. 线程池维护三个固定线程,其主要负责处理客户端的请求
  2. 客户端同时链接请求峰值为 10,同时得到响应的是工作线程峰值 3,其他的记录在数组中,工作线程处理完成后通过取得信号量,继续处理
  3. 工作线程处理请求时,使用锁机制保证各个线程处理请求时数据唯一
  4. 工作线程中有两个循环,内层循环处理和已连接上的客户端进行数据交互,外层循环监测信号量,处理文件描述符集合新的客户端请求

参考链接

小结

本章主要对Linux下高并发服务中常见的多进程,多线程模型进行实现,其模型搭建简单快速,但是局限性也很大。一来进程和线程的频繁创建太消耗资源(一个客户端对应一个进程或者线程),二来创建的进程、线程的数量大小也受限于系统资源大小。线程池模型是之前两种模型上的升级版,是一种相对理想的模型,其可以动态的响应客户端的请求,内部使用信号量、锁等同步机制保证访问资源的唯一性,同时内部使用线程代替进程,消耗资源也得到有效控制。当然最理想的还要在下章节中介绍的多路IO转接模型 select、poll、epoll 等模型。

邢文鹏Linux教学资料

坚持原创技术分享,您的支持将鼓励我继续创作!