网络服务主模型

一.概述

Lighttpd采用多进程网络服务模型。

进程分两种:监控进程watcher 和 工作进程 workers。

监控进程:fork工作进程并监视工作进程的数目,一旦有工作进程退出,监控进程立即fork新的工作进程。

工作进程:接收客户端请求并做出服务响应。

一般情况下,存在一个监控进程和多个工作进程。

max-worker值默认为0时,没有监控进程,只有一个工作进程。

关于初始化:Lighttpd很多地方内存申请都是采用calloc,malloc()和calloc()的主要区别是前者不能初始化所分配的内存空间,而后者能。

主流程入口文件:server.c

二.Lighttpd进程守护化

main函数中的函数daemonize调用使得Lighttpd进程转换为守护进程,从而脱离控制终端,在后台提供服务,避免了执行过程中的信息在终端上显示,也避免了服务被终端信息打断。
(启动时假如选项D可不守护化)

下面我们来分析下:

#ifdef HAVE_FORK
static void daemonize(void) {
#ifdef SIGTTOU
/* 下面用于屏蔽一些有关控制终端操作的信号,防止守护进程没有正常运作之前控制终端受到干扰退出或挂起 */
    signal(SIGTTOU, SIG_IGN); //忽略后台进程写控制终端信号
#endif
#ifdef SIGTTIN
    signal(SIGTTIN, SIG_IGN); //忽略后台进程读控制终端信号
#endif
#ifdef SIGTSTP
    signal(SIGTSTP, SIG_IGN); //忽略终端挂起
#endif
/* 下面开始从普通进程转换为守护进程
 * 目标1:后台运行。
 * 做法:脱离控制终端->调用fork之后终止父进程,子进程被init收养,此步达到后台运行的目标。
 */
    if (0 != fork()) exit(0);
/*,目标2:脱离控制终端,登陆会话和进程组。
 * 做法:使用setsid创建新会话,成为新会话的首进程,则与原来的
 * 登陆会话和进程组自动脱离,从而脱离控制终端。
 * (上一步的fork保证了子进程不可能是一个会话的首进程,这是调用setsid的必要条件)
 */
    if (-1 == setsid()) exit(0);
/* 上面已经完成了大部分工作,但是有的系统上,当会话首进程打开
 * 一个尚未与任何会话相关联的终端设备时,该设备自动作为控制
 * 终端分配给该会话。
 * 为避免该情况,我们再次fork进程,于是新进程不再是会话首进程。
 * 会话首进程退出时可能会给所有会话内的进程发送SIGHUP,而该
 * 信号默认是结束进程,故需要忽略该信号来防止孙子进程意外结束。
 */
    signal(SIGHUP, SIG_IGN);

    if (0 != fork()) exit(0);
/* 最后目标:改变工作目录到根目录。
 * 原因:进程活动时,其工作目录所在的文件系统不能卸下。
 */
    if (0 != chdir("/")) exit(0);
}
#endif

进程属于一个进程组(一个或多个进程的集合),登陆会话是包含一个或多个进程组的集合,这些进程组共享一个控制终端。

三.多进程网络服务模型

Lighttpd一开始是单进程的,在完成一组公共操作后开始转换为多进程。

代码框架如下:

这里写图片描述

具体代码分析注释:

#ifdef USE_ALARM
    signal(SIGALRM, signal_handler);

    /* setup periodic timer (1 second) */
    if (setitimer(ITIMER_REAL, &interval, NULL)) { //每隔一秒产生一个ALARM
        log_error_write(srv, __FILE__, __LINE__, "s", "setting timer failed");
        return -1;
    }

    getitimer(ITIMER_REAL, &interval);
#endif

#ifdef HAVE_FORK
    /* start watcher and workers */
    num_childs = srv->srvconf.max_worker; //存放最大的子进程的数目
    if (num_childs > 0) {
        int child = 0; //child变量用于标记是否为子进程,0代表父进程,1代表子进程
        while (!child && !srv_shutdown && !graceful_shutdown) { //子进程不可进入,srv_shutdown=1 或 graceful_shutdown=1时父进程跳出
            if (num_childs > 0) {
                switch (fork()) { //创建子进程
                case -1:
                    return -1;
                case 0:
                    child = 1; //子进程标记
                    break;
                default:
                    num_childs--; //父进程
                    break;
                }
            } else { //子进程产生完毕
                int status; //保存子进程退出状态

                if (-1 != wait(&status)) { //阻塞等待子进程退出,收尸
                    /** 
                     * one of our workers went away 
                     */
                    num_childs++; //表示可以再产生新的子进程
                } else {
                    switch (errno) { //发生中断
                    case EINTR:
                        /**
                         * if we receive a SIGHUP we have to close our logs ourself as we don"t 
                         * have the mainloop who can help us here
                         */
                        if (handle_sig_hup) {
                            handle_sig_hup = 0;

                            log_error_cycle(srv); //重新打开日志文件

                            /**
                             * forward to all procs in the process-group
                             * 
                             * we also send it ourself
                             */
                            if (!forwarded_sig_hup) { //通知组内所有进程
                                forwarded_sig_hup = 1; //只通知一次,使得后面的kill只调用一次
                                kill(0, SIGHUP);
                            }
                        }
                        break;
                    default:
                        break;
                    }
                }
            }
        }

        /**
         * for the parent this is the exit-point 
         */
        if (!child) { //父进程的退出点,关闭所以工作进程,做一些清理工作(关闭日志,连接的网络资源,插件,内存等)。
            /** 
             * kill all children too 
             */
            if (graceful_shutdown) {
                kill(0, SIGINT);
            } else if (srv_shutdown) {
                kill(0, SIGTERM);
            }

            log_error_close(srv); 
            network_close(srv);
            connections_free(srv);
            plugins_free(srv);
            server_free(srv);
            return 0;
        }
    }
#endif

下面的是子进程部分的关键代码分析:

一开始子进程进行各种初始化工作,包括fd时间处理器的初始化(fdevent_init(srv->max_fds + 1, srv->event_handler)),stat cache初始化(stat_cache_init())等。

子进程工作在一个大while循环中。

while的工作流程如下:

1、判断连接是否断开。如果断开,则调用处理程序进行处理并重新开始新一轮的日志记录。

2、判断是否接到了alarm函数发出的信号。接受到信号后,判断服务器记录的时间是否和当前时间相同。如果相同,说明时间还没有过一秒,继续处理连接请求。如果不相同,则时间已经过了一秒。那么,服务器则触发插件,清理超时连接,清理stat-cache缓存。这理里面最重要的是处理超时连接。程序中通过一个for循环查询所有的连接,比较其idle的时间和允许的最大idle时间来判断连接是否超时。如果连接超时,则让连接进入出错的状态(connection_set_state(srv, con, CON_STATE_ERROR);)。

3、判断服务器socket连接是否失效。如果失效了,则在不是服务器过载的情况下将所有连接重新加入到fdevent中。

4、如果socket没有失效,判断服务器是否过载。如果过载了,则关闭所有连接,清理服务器并退出服务器。

5、分配文件描述符。

6、启动事件轮询。等待各种IO时间的发生。包括文件读写,socket请求等。

7、一旦有事件发生,调用相应的处理函数进行处理。

8、最后,检查joblist中是否有未处理的job并处理之。

至此,一次循环结束了。然后,继续循环直到服务器关闭。

    /* main-loop */
    while (!srv_shutdown) { //只要srv_shutdown不为1,工作进程持续执行
        int n;
        size_t ndx;
        time_t min_ts;

        if (handle_sig_hup) { //如果收到HUP信号
            handler_t r;

            /* reset notification */
            handle_sig_hup = 0; //重置标识

            /* cycle logfiles */

            switch(r = plugins_call_handle_sighup(srv)) { //通过plugins_call_handle_sighup来调用各个模块的HUP处理函数
            case HANDLER_GO_ON:
                break;
            default:
                log_error_write(srv, __FILE__, __LINE__, "sd", "sighup-handler return with an error", r);
                break;
            }

            if (-1 == log_error_cycle(srv)) { //重新打开日志文件,并写入收到HUP信号到日志。此处并没有重新读取配置文件
                log_error_write(srv, __FILE__, __LINE__, "s", "cycling errorlog failed, dying");

                return -1;
            } else {
#ifdef HAVE_SIGACTION
                log_error_write(srv, __FILE__, __LINE__, "sdsd", 
                    "logfiles cycled UID =",
                    last_sighup_info.si_uid,
                    "PID =",
                    last_sighup_info.si_pid);
#else
                log_error_write(srv, __FILE__, __LINE__, "s", 
                    "logfiles cycled");
#endif
            }
        }

        if (handle_sig_alarm) { //收到ALARM信号
            /* a new second */

#ifdef USE_ALARM
            /* reset notification */
            handle_sig_alarm = 0;
#endif

            /* get current time */
            min_ts = time(NULL);

            if (min_ts != srv->cur_ts) {
                int cs = 0;
                connections *conns = srv->conns;
                handler_t r;

                switch(r = plugins_call_handle_trigger(srv)) { //调用plugins_call_handle_trigger来处理各个模块的ALARM信号处理函数
                case HANDLER_GO_ON:
                    break;
                case HANDLER_ERROR:
                    log_error_write(srv, __FILE__, __LINE__, "s", "one of the triggers failed");
                    break;
                default:
                    log_error_write(srv, __FILE__, __LINE__, "d", r);
                    break;
                }

                /* trigger waitpid */
                srv->cur_ts = min_ts; //更新服务器记录时间

                /* cleanup stat-cache */
                stat_cache_trigger_cleanup(srv); //清除缓存,删除一些比较旧的节点
                /**
                 * check all connections for timeouts
                 *
                 */
                for (ndx = 0; ndx < conns->used; ndx++) { //处理超时连接
                    int changed = 0;
                    connection *con;
                    int t_diff;

                    con = conns->ptr[ndx];

                    if (con->state == CON_STATE_READ ||
                        con->state == CON_STATE_READ_POST) {
                        if (con->request_count == 1) {
                            if (srv->cur_ts - con->read_idle_ts > con->conf.max_read_idle) {
                                /* time - out */
#if 0
                                log_error_write(srv, __FILE__, __LINE__, "sd",
                                        "connection closed - read-timeout:", con->fd);
#endif
                                /* lighttpd 中采用了状态机(state-engine)去处理每一个连接,
                                 * 状态机中的每一种节点表示连接当时所处的状态,包括 connect 、
                                 * reqstart 、 read 、 reqend 、 readpost 、handlereq、
                                 * respstart、write、respend、error、close 这 11 个状态
                                 */
                                connection_set_state(srv, con, CON_STATE_ERROR); //调用connection_set_state进行状态机的状态转换
                                changed = 1;
                            }
                        } else {
                            if (srv->cur_ts - con->read_idle_ts > con->conf.max_keep_alive_idle) {
                                /* time - out */
#if 0
                                log_error_write(srv, __FILE__, __LINE__, "sd",
                                        "connection closed - read-timeout:", con->fd);
#endif
                                connection_set_state(srv, con, CON_STATE_ERROR);
                                changed = 1;
                            }
                        }
                    }
        ………………
                    /* we don"t like div by zero */
                    if (0 == (t_diff = srv->cur_ts - con->connection_start)) t_diff = 1;
                    /* 处理传输速度限制
                     * 如果某一时刻平均传输速度达到了用户设置的最大值,则停止发送数据(con->traffic_limit_reached将被设为1,
                     * 进入下面if中处理)。只要检测到平均传输速度小于用户设置的最大值就继续发送数据,
                     * 则满足if的条件,con->traffic_limit_reached设为 0,同时调用状态机切换函数。
                     */                 
                    if (con->traffic_limit_reached &&
                        (con->conf.kbytes_per_second == 0 ||
                         ((con->bytes_written / t_diff) < con->conf.kbytes_per_second * 1024))) {
                        /* enable connection again */
                        con->traffic_limit_reached = 0;

                        changed = 1;
                    }

                    if (changed) {
                        connection_state_machine(srv, con);
                    }
                    con->bytes_written_cur_second = 0;
                    *(con->conf.global_bytes_per_second_cnt_ptr) = 0;

#if 0
                    if (cs == 0) {
                        fprintf(stderr, "connection-state: ");
                        cs = 1;
                    }

                    fprintf(stderr, "c[%d,%d]: %s ",
                        con->fd,
                        con->fcgi.fd,
                        connection_get_state(con->state));
#endif
                }

                if (cs == 1) fprintf(stderr, "
");
            }
        }

        /* 根据当前的资源利用情况禁用或启用 server sockets 服务
         *
         * 禁用:当文件描述符(当前文件描述符和等待文件描述符之和)大于 0.9 倍服务器最大
         * (系统允许或用户设置)文件描述符数目或当前连接大于最大(系统允许或用户设置)连接
         * 数目或收到终止服务器指令时。
         *
         * 启用:当文件描述符(当前文件描述符和等待文件描述符之和)小于 0.8 倍服务器最大
         * (系统允许或用户设置)文件描述符数目并且当前连接小于 0.9 倍最大(系统允许或用户设
         * 置)连接数目并且终止服务器标志为 0 时。
         */
        if (srv->sockets_disabled) {
            /* our server sockets are disabled, why ? */

            if ((srv->cur_fds + srv->want_fds < srv->max_fds * 0.8) && /* we have enough unused fds */
                (srv->conns->used < srv->max_conns * 0.9) &&
                (0 == graceful_shutdown)) {
                for (i = 0; i < srv->srv_sockets.used; i++) {
                    server_socket *srv_socket = srv->srv_sockets.ptr[i];
                    fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);
                }

                log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets enabled again");

                srv->sockets_disabled = 0;
            }
        } else {
            if ((srv->cur_fds + srv->want_fds > srv->max_fds * 0.9) || /* out of fds */
                (srv->conns->used > srv->max_conns) || /* out of connections */
                (graceful_shutdown)) { /* graceful_shutdown */

                /* disable server-fds */

                for (i = 0; i < srv->srv_sockets.used; i++) { //逐个删除该工作进程上的所有在 socket 描述符上的事件监听器(通过 fdevent_event_del 函数)。
                    server_socket *srv_socket = srv->srv_sockets.ptr[i];
                    fdevent_event_del(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd);

                    if (graceful_shutdown) { //如果是关闭服务器则注销事件监听器结构(主要是释放内存空间,防止内存泄露)并且关闭文件描述符、删除附属文件。
                        /* we don"t want this socket anymore,
                         *
                         * closing it right away will make it possible for
                         * the next lighttpd to take over (graceful restart)
                         *  */

                        fdevent_unregister(srv->ev, srv_socket->fd);
                        close(srv_socket->fd);
                        srv_socket->fd = -1;

                        /* network_close() will cleanup after us */

                        if (srv->srvconf.pid_file->used &&
                            srv->srvconf.changeroot->used == 0) {
                            if (0 != unlink(srv->srvconf.pid_file->ptr)) {
                                if (errno != EACCES && errno != EPERM) {
                                    log_error_write(srv, __FILE__, __LINE__, "sbds",
                                            "unlink failed for:",
                                            srv->srvconf.pid_file,
                                            errno,
                                            strerror(errno));
                                }
                            }
                        }
                    }
                }

                if (graceful_shutdown) {
                    log_error_write(srv, __FILE__, __LINE__, "s", "[note] graceful shutdown started");
                } else if (srv->conns->used > srv->max_conns) {
                    log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets disabled, connection limit reached");
                } else {
                    log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets disabled, out-of-fds");
                }

                srv->sockets_disabled = 1;
            }
        }
if (graceful_shutdown && srv->conns->used == 0) {
            /* we are in graceful shutdown phase and all connections are closed
             * we are ready to terminate without harming anyone */
            srv_shutdown = 1;
        }

        /* we still have some fds to share */
        if (srv->want_fds) { //如果有待处理的文件描述符,则通过状态机切换函数进行处理,为了合理利用资源,程序会保证至少有 16 个空闲文件描述符
            /* check the fdwaitqueue for waiting fds */
            int free_fds = srv->max_fds - srv->cur_fds - 16;
            connection *con;

            for (; free_fds > 0 && NULL != (con = fdwaitqueue_unshift(srv, srv->fdwaitqueue)); free_fds--) {
                connection_state_machine(srv, con);

                srv->want_fds--;
            }
        }
        /* 通过 fdevent_poll -> epoll_wait(以 USE_LINUX_EPOLL 为例)来轮询 I/O 事件的发生,
         * 其中等待 I/O 事件发生的超时值 timeout_ms=1000milliseconds,即 1 秒。
         * 如果在等待的这 1 秒内有 I/O 事件发生,则返回的 n 值记录事件数目,随后用一个 do-while
         * 循环对每一个发生的 I/O 事件进行处理。
         */
        if ((n = fdevent_poll(srv->ev, 1000)) > 0) {
            /* n is the number of events */
            int revents;
            int fd_ndx;
#if 0
            if (n > 0) {
                log_error_write(srv, __FILE__, __LINE__, "sd",
                        "polls:", n);
            }
#endif
            fd_ndx = -1;
            do {
                fdevent_handler handler;
                void *context;
                handler_t r;

                fd_ndx  = fdevent_event_next_fdndx (srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引
                revents = fdevent_event_get_revent (srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型
                fd      = fdevent_event_get_fd     (srv->ev, fd_ndx); //获得该文件描述符
                handler = fdevent_get_handler(srv->ev, fd);           //获得 I/O 事件处理的回调函数
                context = fdevent_get_context(srv->ev, fd);           //获得 I/O 事件处理的上下文环境

                /* connection_handle_fdevent needs a joblist_append */
#if 0
                log_error_write(srv, __FILE__, __LINE__, "sdd",
                        "event for", fd, revents);
#endif
                switch (r = (*handler)(srv, context, revents)) {      //调用回调函数进行 I/O 事件处理,并传入相关参数。
                case HANDLER_FINISHED:
                case HANDLER_GO_ON:
                case HANDLER_WAIT_FOR_EVENT:
                case HANDLER_WAIT_FOR_FD:
                    break;
                case HANDLER_ERROR:
                    /* should never happen */
                    SEGFAULT();
                    break;
                default:
                    log_error_write(srv, __FILE__, __LINE__, "d", r);
                    break;
                }
            } while (--n > 0);
        } else if (n < 0 && errno != EINTR) {
            log_error_write(srv, __FILE__, __LINE__, "ss",
                    "fdevent_poll failed:",
                    strerror(errno));
        }

参考资料:《Lighttpd源码分析》 高群凯 编著

文章导航