fdevent系统-连接socket及超时处理

上一篇我们探讨了lighttpd对监听socket的处理,这次我们看看连接socket的处理,以及相关超时的处理。

lighttpd和客户端建立连接的过程:

1.lighttpd检测监听socket的IO事件,如果有可读事件发生,那么表示有新的连接请求,于是调用network.c/network_server_handle_fdevent()来处理连接请求。

2.network_server_handle_fdevent()函数调用connections.c/connection_accept() 接受客户端的请求,建立连接,得到连接socket的fd,也就是accept函数的返回值。

3.建立连接后,这个连接对应的状态机状态被设置为CON_STATE_REQUEST_START,即开始读取客户端发过来的request。

4.从connection_accept函数返回到network_server_handle_fdevent()函数的for循环中后,程序调用connection_state_machine()函数,这个函数是根据当前连接的状态机状态来设置状态机的下一个状态,CON_STATE_REQUEST_START的下一个状态是CON_STATE_READ,这个状态表示连接正在读取客户端发送的数据。

5.当连接的状态机被设置成CON_STATE_READ后,在connection_state_machine()函数中有这样一个switch语句:

switch (con->state)
    {
    case CON_STATE_READ_POST:
    case CON_STATE_READ:
    case CON_STATE_CLOSE:
        fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_IN);
        break;
    case CON_STATE_WRITE:
        /*
         * request write-fdevent only if we really need it
         * - if we have data to write
         * - if the socket is not writable yet
         */
        if (!chunkqueue_is_empty(con->write_queue) && (con->is_writable == 0) &&(con->traffic_limit_reached == 0))
        {
            fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_OUT);
        }
        else
        {
            fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd);
        }
        break;
    default:
        fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd);
        break;
    }

它将状态处在CON_STATE_READ_POST,CON_STATE_READ和CON_STATE_CLOSE的连接对应的连接socket fd加入到fdevent系统中,并监听【可读】事件。将处CON_STATE_WRITE状态且有数据要写的连接对应的socket fd加入到fdevent系统中,并监听【可写】事件。其他状态的连接则把对应的fd从fdevent系统中删除,因为这些连接不会有IO事件发生。

这样,连接socket fd就被加入到了fdevent系统中,之后等待IO事件的发生,这一部分在上一篇已经说明过了:

//启动事件轮询。底层使用的是IO多路转接。
if ((n = fdevent_poll(srv->ev, 1000)) > 0)
{
            /* n是事件的数量 */
            int revents;
            int fd_ndx = -1;
            /* 逐个处理已经准备好的请求,直到所有的请求处理结束 */
            do
            {
                fdevent_handler handler;
                void *context;
                handler_t r;

                fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引
                revents = fdevent_event_get_revent(srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型
                fd = fdevent_event_get_fd(srv->ev, fd_ndx); //获得该文件描述符
                handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数
                context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境
                /*
                 * connection_handle_fdevent needs a joblist_append
                 */
                /**
                 * 调用回调函数进行I/O事件处理,并传入相关参数
                 */
                switch (r = (*handler) (srv, context, revents))
                {
                case HANDLER_FINISHED:
                case HANDLER_GO_ON:
                case HANDLER_WAIT_FOR_EVENT:
                case HANDLER_WAIT_FOR_FD:
                    break;
                case HANDLER_ERROR:
                    SEGFAULT();
                    break;
                default:
                    log_error_write(srv, __FILE__, __LINE__, "d", r);
                    break;
                }
            }while (--n > 0);
        }
        else if (n < 0 && errno != EINTR)
        {
            log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno));
        }

连接fd对应的处理函数是connections.c/connection_handle_fdevent()函数:

handler_t connection_handle_fdevent(void *s, void *context,int revents)
{
    server *srv = (server *) s;
    connection *con = context;
    //把这个连接加到作业队列中。
     joblist_append(srv, con);
    if (revents & FDEVENT_IN)
    {
        con->is_readable = 1;
    }
    if (revents & FDEVENT_OUT)
    {
        con->is_writable = 1;
        /*
         * we don"t need the event twice
         */
    }
    if (revents & ~(FDEVENT_IN | FDEVENT_OUT))
    {
        /*
         * looks like an error 即可读又可写,可能是一个错误。
         */
        /*
         * FIXME: revents = 0x19 still means that we should read from the queue
         */
        if (revents & FDEVENT_HUP)
        {
            if (con->state == CON_STATE_CLOSE)
            {
                con->close_timeout_ts = 0;
            }
            else
            {
                /*
                 * sigio reports the wrong event here there was no HUP at all
                 */
                connection_set_state(srv, con, CON_STATE_ERROR);
            }
        }
        else if (revents & FDEVENT_ERR)
        {
            connection_set_state(srv, con, CON_STATE_ERROR);
        }
        else
        {
            log_error_write(srv, __FILE__, __LINE__, "sd","connection closed: poll() -> ???", revents);
        }
    }
    if (con->state == CON_STATE_READ|| con->state == CON_STATE_READ_POST)
    {
        connection_handle_read_state(srv, con);
 //继续读取数据,直到数据读取完毕
     }
 // 数据的写回并没有放给状态机去处理。
     if (con->state == CON_STATE_WRITE&& !chunkqueue_is_empty(con->write_queue) && con->is_writable)
    {
        if (-1 == connection_handle_write(srv, con))
        {
            connection_set_state(srv, con, CON_STATE_ERROR);
            log_error_write(srv, __FILE__, __LINE__, "ds", con->fd,"handle write failed.");
        }
        else if (con->state == CON_STATE_WRITE)
        {
            //写数据出错,记录当前时间,用来判断连接超时。
             con->write_request_ts = srv->cur_ts;
        }
    }
    if (con->state == CON_STATE_CLOSE)
    {
        /*
         * flush the read buffers 清空缓冲区中的数据。
         */
        int b;
        //获取缓冲区中数据的字节数
         if (ioctl(con->fd, FIONREAD, &b))
        {
            log_error_write(srv, __FILE__, __LINE__, "ss","ioctl() failed", strerror(errno));
        }
        if (b > 0)
        {
            char buf[1024];
            log_error_write(srv, __FILE__, __LINE__, "sdd","CLOSE-read()", con->fd, b);
            //将缓冲区中的数据读取后并丢弃,此时连接已经关闭,数据是无用数据。
             read(con->fd, buf, sizeof(buf));
        }
        else
        {
            /*
             * nothing to read 缓冲区中没有数据。复位连接关闭超时计时。
             */
            con->close_timeout_ts = 0;
        }
    }
    return HANDLER_FINISHED;
}

connection_handle_fdevent()函数根据当前连接fd所发生的IO事件,对connection结构体中的标记变量赋值,如is_writable,is_readable等,并做一些时间的记录。这些事件所对应的【真正的IO处理则交给状态机处理】。状态机根据这些标记变量进行相应的动作处理。

下面的图简要的描述了fdevent系统对连接fd和监听fd的处理:

这里写图片描述

接下来简单地看下连接超时的处理。

连接超时有三种:读数据超时,写数据超时和关闭超时。

处理超时的代码在server.c中的main函数woker进程开始部分:

    /* main-loop */
    while (!srv_shutdown) { //只要srv_shutdown不为1,工作进程持续执行
        int n;
        size_t ndx;
        time_t min_ts;

        /* 处理HUP信号,代码省略 */

        /* 处理ALARM信号 */
        if (handle_sig_alarm) { 
            /* a new second */

#ifdef USE_ALARM
            /* reset notification */
            handle_sig_alarm = 0;
#endif

            /* get current time */
            min_ts = time(NULL);
            /* 比较服务器记录的时间和当前时间
             * 如果值不一样,说明已经过了1s
             */
            if (min_ts != srv->cur_ts) {
                int cs = 0;
                connections *conns = srv->conns;
                handler_t r;

                switch(r = plugins_call_handle_trigger(srv)) { //调用plugins_call_handle_trigger来处理各个模块的ALARM信号处理函数
                case HANDLER_GO_ON:
                    break;
                case HANDLER_ERROR:
                    log_error_write(srv, __FILE__, __LINE__, "s", "one of the triggers failed");
                    break;
                default:
                    log_error_write(srv, __FILE__, __LINE__, "d", r);
                    break;
                }

                /* trigger waitpid */
                srv->cur_ts = min_ts; //更新服务器记录时间

                /* cleanup stat-cache */
                stat_cache_trigger_cleanup(srv); //清除缓存,删除一些比较旧的节点
                /**
                 * check all connections for timeouts
                 *
                 */
                for (ndx = 0; ndx < conns->used; ndx++) { //处理超时连接
                    int changed = 0;
                    connection *con;
                    int t_diff;

                    con = conns->ptr[ndx];

                    if (con->state == CON_STATE_READ ||
                        con->state == CON_STATE_READ_POST) { //连接状态是读
                        if (con->request_count == 1) { //处理一个请求
                            if (srv->cur_ts - con->read_idle_ts > con->conf.max_read_idle) {
                                /* time - out */
#if 0
                                log_error_write(srv, __FILE__, __LINE__, "sd",
                                        "connection closed - read-timeout:", con->fd);
#endif
                                connection_set_state(srv, con, CON_STATE_ERROR); //调用connection_set_state进行状态机的状态转换
                                changed = 1;
                            }
                        } else { //连接同时处理多个请求
                            if (srv->cur_ts - con->read_idle_ts > con->conf.max_keep_alive_idle) {
                                /* time - out */
#if 0
                                log_error_write(srv, __FILE__, __LINE__, "sd",
                                        "connection closed - read-timeout:", con->fd);
#endif
                                connection_set_state(srv, con, CON_STATE_ERROR);
                                changed = 1;
                            }
                        }
                    }

                    if ((con->state == CON_STATE_WRITE) &&
                        (con->write_request_ts != 0)) { //连接状态是写
#if 0
                        if (srv->cur_ts - con->write_request_ts > 60) {
                            log_error_write(srv, __FILE__, __LINE__, "sdd",
                                    "connection closed - pre-write-request-timeout:", con->fd, srv->cur_ts - con->write_request_ts);
                        }
#endif

                        if (srv->cur_ts - con->write_request_ts > con->conf.max_write_idle) {
                            /* time - out */
#if 1
                            log_error_write(srv, __FILE__, __LINE__, "sbsosds",
                                    "NOTE: a request for",
                                    con->request.uri,
                                    "timed out after writing",
                                    con->bytes_written,
                                    "bytes. We waited",
                                    (int)con->conf.max_write_idle,
                                    "seconds. If this a problem increase server.max-write-idle");
#endif
                            connection_set_state(srv, con, CON_STATE_ERROR);
                            changed = 1;
                        }
                    }
                    /* we don"t like div by zero */
                    if (0 == (t_diff = srv->cur_ts - con->connection_start)) t_diff = 1;
                    /* 处理传输速度限制
                     * 如果某一时刻平均传输速度达到了用户设置的最大值,则停止发送数据(con->traffic_limit_reached将被设为1,
                     * 进入下面if中处理)。只要检测到平均传输速度小于用户设置的最大值就继续发送数据,
                     * 则满足if的条件,con->traffic_limit_reached设为 0,同时调用状态机切换函数。
                     * 代码省略
                     * /                    
            }       
        }

        /* 根据当前的资源利用情况禁用或启用 server sockets 服务,代码省略 */

        /* something else,代码省略*/

        /* 轮询 I/O 事件的发生,
         * 其中等待 I/O 事件发生的超时值为1秒。
         * 代码省略
         */

        /* 进行其他处理,之后while一次循环完成 */
    }

为清晰地看超时处理部分的代码,我把一些无关的代码略去了。

总结如下:

while (未收到终止信号)
{
    if (收到HUP)
        处理HUP信号
    if (handle_sig_alarm标识为1)
    {
        获取当前时间;
        if (当前时间 != 服务器时间)
        {
            调用各模块的超时处理函数;
            更新服务器记录时间;
            for (每个连接)
                处理超时;
        }
    }
    根据当前的资源利用情况禁用或启用 server sockets 服务;

    轮询 I/O 事件的发生;
    对每一个发生的 I/O 事件进行处理;
}

可以看到,作者通过当前时间和服务器记录的当前时间来判断时间是否过了一秒。如果两个时间不一样,那么时间就过了一秒,子进程每循环一次都要比较服务器记录的时间和当前时间,直到两个时间不一样为止。

也就是说,作者好像并没有使用SIGALRM信号来判断超时,从代码中我们可以看到,关于SIGALRM的使用,只有在定义了USE_ALARM之后才生效:

#ifdef USE_ALARM
            /* reset notification */
            handle_sig_alarm = 0;
#endif

然而我们查找USE_ALARM,发现事实上该标识没有被定义(而是被注释掉了):

#ifndef __sgi
/* IRIX doesn"t like the alarm based time() optimization */
/* #define USE_ALARM */
#endif

所以handle_sig_alarm一直为1,即作者这里并不使用SIGALRM信号,这样可以减少很多信号处理,降低程序的复杂度。但是每次循环程序都要轮询一次,可能会影响效率(实际上效果如何,需要大家试一下哈~)

在处理程序中,lighttpd通过比较read_idle_ts,write_request_ts和当前时间的差值来判断连接是否读超时或写超时。如果这两个差值分别大于max_read_idle和max_write_idle则表示超时。如果一个连接正在处理多个请求时,读超时是和max_keep_alive_idle比较。这些上限值在配置中设置。

对于read_idle_ts,在连接进入CON_STATE_REQUEST_START状态时,记录了当前时间。如果连接长时间没有去读取request请求,则也表示连接超时。当连接开始读数据时,read_idle_ts记录开始读数据的时间。

对于write_request_ts,在处理CON_STATE_WRITE状态时,有对其赋值的语句。在connection_handle_fdevent函数中也有。事实上,只有在调用connection_handle_write函数出错并且连接处在CON_STATE_WRITE状态时,记录当前时间。

可见,lighttpd对读和写的超时处理是不一样的。对于读,设定了最长时间,不管读多少数据,一旦时间超了就算超时。而对于写,只有在写出错的时候才开始计算超时。如果没有出错,那么写数据花再多的时间也不算超时。(可能出现上传到一半就超时的问题,但是在绝大多数情况下,上传数据都是很小的,而下载的数据往往很多,因此,这样处理可以提高效率,如果需要上传大量数据,可以修改配置中的超时限制)。

fdevent就分析到这里!~

学习内容参考自:

http://www.cnblogs.com/kernel_hcy/archive/2010/03/22/1691951.html

文章导航