牛骨文教育服务平台(让学习变的简单)
博文笔记

Nginx filter分析

创建时间:2013-10-11 投稿人: 浏览次数:9863

注:这篇文章整合了我们小组成员(卫越,雕梁,吉兆)的一些博客内容组成。


在CONTENT阶段产生的数据被发往客户端(系统发送缓存区)之前,会先经过过滤。Nginx的filter的工作方式和做鱼有些类似。比如一条鱼,可以把它切成鱼片(也可以切块,切泥),然后通过不同的烹饪方法就得到水煮鱼或者日式生鱼片或者废了等等。同样是一条鱼,加工得到的结果却截然不同,就是因为中间不同的工序赋予了这条鱼各种属性。Nginx的filter也是一个道理,前面的Handler好比这条鱼,filter负责加工,最后得到的HTTP响应就会各种各样,格式可以是JSON或者YAML,内容可能多一些或者少一些,HTTP属性可各异,可以选择压缩,甚至内容可以被丢弃。


对应HTTP请求的响应头和响应体,Nginx分别设置了header filter和body filter。两种机制都是采用链表的方式,不同过滤模块对应链表的一个节点,一般而言一个模块会同时注册header filter和body filter。一个典型的filter模块,比如gzip模块使用类似如下的代码来注册:

static ngx_http_output_header_filter_pt  ngx_http_next_header_filter;
static ngx_http_output_body_filter_pt    ngx_http_next_body_filter;

...
   
static ngx_int_t
ngx_http_gzip_filter_init(ngx_conf_t *cf)
{
    ngx_http_next_header_filter = ngx_http_top_header_filter;
    ngx_http_top_header_filter = ngx_http_gzip_header_filter;

    ngx_http_next_body_filter = ngx_http_top_body_filter;
    ngx_http_top_body_filter = ngx_http_gzip_body_filter;

    return NGX_OK;
}

上面的代码中,gzip模块首先在模块的开头声明了两个static类型的全局变量ngx_http_next_header_filter和ngx_http_next_body_filter,在ngx_http_gzip_filter_init函数中,这二个变量分别被赋值为ngx_http_top_header_filter及ngx_http_top_body_filter。而后二者定义在ngx_http.c,并在ngx_http.h头文件中被导出。ngx_http_top_header_filter和ngx_http_top_body_filter实际上是filter链表的头结点,每次注册一个新的filter模块时,它们的值先被保存在新模块的内部全局变量ngx_http_next_header_filter及ngx_http_next_body_filter,然后被赋值为新模块注册的filter函数,而且Nginx filter是先从头节点开始执行,所以越晚注册的模块越早执行。


采用默认编译选项,Nginx默认编译的模块如下:

ngx_module_t *ngx_modules[] = {
    &ngx_core_module,
    &ngx_errlog_module,
    &ngx_conf_module,
    &ngx_events_module,
    &ngx_event_core_module,
    &ngx_epoll_module,
    &ngx_regex_module,
    &ngx_http_module,
    &ngx_http_core_module,
    &ngx_http_log_module,
    &ngx_http_upstream_module,
    &ngx_http_static_module,
    &ngx_http_autoindex_module,
    &ngx_http_index_module,
    &ngx_http_auth_basic_module,
    &ngx_http_access_module,
    &ngx_http_limit_conn_module,
    &ngx_http_limit_req_module,
    &ngx_http_geo_module,
    &ngx_http_map_module,
    &ngx_http_split_clients_module,
    &ngx_http_referer_module,
    &ngx_http_rewrite_module,
    &ngx_http_proxy_module,
    &ngx_http_fastcgi_module,
    &ngx_http_uwsgi_module,
    &ngx_http_scgi_module,
    &ngx_http_memcached_module,
    &ngx_http_empty_gif_module,
    &ngx_http_browser_module,
    &ngx_http_upstream_ip_hash_module,
    &ngx_http_upstream_keepalive_module,
    &ngx_http_write_filter_module,          /* 最后一个body filter,负责往外发送数据 */
    &ngx_http_header_filter_module,         /* 最后一个header filter,负责在内存中拼接出完整的http响应头,并调用ngx_http_write_filter发送 */
    &ngx_http_chunked_filter_module,        /* 对响应头中没有content_length头的请求,强制短连接(低于http 1.1)或采用chunked编码(http 1.1) */
    &ngx_http_range_header_filter_module,   /* header filter,负责处理range头 */
    &ngx_http_gzip_filter_module,           /* 支持流式的数据压缩 */
    &ngx_http_postpone_filter_module,       /* body filter,负责处理子请求和主请求数据的输出顺序 */
    &ngx_http_ssi_filter_module,            /* 支持过滤SSI请求,采用发起子请求的方式,去获取include进来的文件 */
    &ngx_http_charset_filter_module,        /* 支持添加charset,也支持将内容从一种字符集转换到另外一种字符集 */
    &ngx_http_userid_filter_module,         /* 支持添加统计用的识别用户的cookie */
    &ngx_http_headers_filter_module,        /* 支持设置expire和Cache-control头,支持添加任意名称的头 */
    &ngx_http_copy_filter_module,           /* 根据需求重新复制输出链表中的某些节点(比如将in_file的节点从文件读出并复制到新的节点),
                                               并交给后续filter进行处理 */
    &ngx_http_range_body_filter_module,     /* body filter,支持range功能,如果请求包含range请求,那就只发送range请求的一段内容 */
    &ngx_http_not_modified_filter_module,   /* 如果请求的if-modified-since等于回复的last-modified值,说明回复没有变化,清空所有回复的内容,返回304 */ 
    NULL
};

从模块的命名可以很容易看出哪些模块是filter模块,一般而言Nginx的filter模块名以filter_module结尾,普通的模块名以module结尾。上面的列表从下往上看,ngx_http_not_modified_filter_module实际上filter链的第一个节点,而ngx_http_write_filter_module是最后一个节点。filter模块的执行顺序特别重要,比如数据经过gzip模块后就变成了压缩之后的数据,如果在gzip模块后面运行的filter模块需要再查看数据的原始内容就不可能了(除非再做解压),第三方模块会被Nginx注册在ngx_http_copy_filter_module之后,ngx_http_headers_filter_module之前。这样设定的原因是为了确保一些模块比如gzip filter,chunked filter,copy filter运行在filter链的开头或尾部。


Nginx header filter

通常Nginx调用ngx_http_send_header函数来发送响应头,看下它的实现:

ngx_int_t
ngx_http_send_header(ngx_http_request_t *r)
{
    if (r->err_status) {
        r->headers_out.status = r->err_status;
        r->headers_out.status_line.len = 0;
    }

    return ngx_http_top_header_filter(r);
}

上面的代码中调用了ngx_http_top_header_filter,也就是header  filter的头节点,按照上一节介绍的顺序,ngx_http_not_modified_filter_module是最后一个注册的filter模块,而最后定义的会最先执行,初始化之后,它实际上是ngx_http_not_modified_header_filter函数:

static ngx_int_t
ngx_http_not_modified_header_filter(ngx_http_request_t *r)
{
    if (r->headers_out.status != NGX_HTTP_OK
        || r != r->main
        || r->headers_out.last_modified_time == -1)
    {
        return ngx_http_next_header_filter(r);
    }

    if (r->headers_in.if_unmodified_since) {
        return ngx_http_test_precondition(r);
    }

    if (r->headers_in.if_modified_since) {
        return ngx_http_test_not_modified(r);
    }

    return ngx_http_next_header_filter(r);
}

而在ngx_http_not_modified_header_filter函数中,它会调用模块内部定义的函数指针变量ngx_http_next_header_filter,而该变量保存的是上一模块注册的header filter函数,同样的下一个header filter函数内部也会调用其模块内部的ngx_http_next_header_filter,直到调用到最后一个header filter - ngx_http_header_filter。

ngx_http_header_filter,这个filter负责计算响应头的总大小,并分配内存,组装响应头,并调用ngx_http_write_filter发送。Nginx中,header filter只会被调用一次,ngx_http_header_filter函数中首先会检查r->header_sent标识是否已经被设置,如果是的话,则直接返回;否则设置该标识,并发送响应头。另外如果是子请求的话,也会直接退出函数。


 Nginx body filter

Nginx中通常调用ngx_http_output_filter函数来发送响应体,它的实现如下:

ngx_int_t
ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = r->connection;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http output filter "%V?%V"", &r->uri, &r->args);

    rc = ngx_http_top_body_filter(r, in);

    if (rc == NGX_ERROR) {
        /* NGX_ERROR may be returned by any filter */
        c->error = 1;
    }

    return rc;
}

body filter链调用的原理和header filter一样,和ngx_http_send_header函数不同的是,上面的函数多了一个类型为ngx_chain_t *的参数,因为Nginx实现的是流式的输出,并不用等到整个响应体都生成了才往客户端发送数据,而是产生一部分内容之后将其组织成链表,调用ngx_http_output_filter发送,并且待发送的内容可以在文件中,也可以是在内存中,Nginx会负责将数据流式的,高效的传输出去。而且当发送缓存区满了时,Nginx还会负责保存未发送完的数据,调用者只需要对新数据调用一次ngx_http_output_filter即可。

 

ngx_http_copy_filter_module

ngx_http_copy_filter_module是响应体过滤链(body filter)中非常重要的一个模块,这个filter模块主要是来将一些需要复制的buf(可能在文件中,也可能在内存中)重新复制一份交给后面的filter模块处理。先来看它的初始化函数: 

static ngx_int_t
ngx_http_copy_filter_init(ngx_conf_t *cf)
{
    ngx_http_next_body_filter = ngx_http_top_body_filter;
    ngx_http_top_body_filter = ngx_http_copy_filter;

    return NGX_OK;
}

可以看到,它只注册了body filter,而没有注册header filter,也就是说只有body filter链中才有这个模块。

该模块有一个命令,命令名为output_buffers,用来配置可用的buffer数和buffer大小,它的值保存在copy filter的loc conf的bufs字段,默认数量为1,大小为32768字节。这个参数具体的作用后面会做介绍。

Nginx中,一般filter模块可以header filter函数中根据请求响应头设置一个模块上下文(context),用来保存相关的信息,在body filter函数中使用这个上下文。而copy filter没有header filter,因此它的context的初始化也是放在body filter中的,而它的ctx就是ngx_output_chain_ctx_t,为什么名字是output_chain呢,这是因为copy filter的主要逻辑的处理都放在ngx_output_chain模块中,另外这个模块在core目录下,而不是属于http目录。

接下来看一下上面说到的context结构:

struct ngx_output_chain_ctx_s {
    ngx_buf_t                   *buf;              /* 保存临时的buf */
    ngx_chain_t                 *in;               /* 保存了将要发送的chain */
    ngx_chain_t                 *free;             /* 保存了已经发送完毕的chain,以便于重复利用 */
    ngx_chain_t                 *busy;             /* 保存了还未发送的chain */

    unsigned                     sendfile:1;       /* sendfile标记 */
    unsigned                     directio:1;       /* directio标记 */
#if (NGX_HAVE_ALIGNED_DIRECTIO)
    unsigned                     unaligned:1;
#endif
    unsigned                     need_in_memory:1; /* 是否需要在内存中保存一份(使用sendfile的话,
                                                      内存中没有文件的拷贝的,而我们有时需要处理文件,
                                                      此时就需要设置这个标记) */
    unsigned                     need_in_temp:1;   /* 是否需要在内存中重新复制一份,不管buf是在内存还是文件,
                                                      这样的话,后续模块可以直接修改这块内存 */
#if (NGX_HAVE_FILE_AIO)
    unsigned                     aio:1;

    ngx_output_chain_aio_pt      aio_handler;
#endif

    off_t                        alignment;

    ngx_pool_t                  *pool;
    ngx_int_t                    allocated;        /* 已经分别的buf个数 */
    ngx_bufs_t                   bufs;             /* 对应loc conf中设置的bufs */
    ngx_buf_tag_t                tag;              /* 模块标记,主要用于buf回收 */

    ngx_output_chain_filter_pt   output_filter;    /* 一般是ngx_http_next_filter,也就是继续调用filter链 */
    void                        *filter_ctx;       /* 当前filter的上下文,
                                                      这里是由于upstream也会调用output_chain */
};

为了更好的理解context结构每个域的具体含义,接下来分析filter的具体实现:

static ngx_int_t
ngx_http_copy_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t                     rc;
    ngx_connection_t             *c;
    ngx_output_chain_ctx_t       *ctx;
    ngx_http_core_loc_conf_t     *clcf;
    ngx_http_copy_filter_conf_t  *conf;

    c = r->connection;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http copy filter: "%V?%V"", &r->uri, &r->args);
    
    /* 获取ctx */
    ctx = ngx_http_get_module_ctx(r, ngx_http_copy_filter_module);
    
    /* 如果为空,则说明需要初始化ctx */
    if (ctx == NULL) {
        ctx = ngx_pcalloc(r->pool, sizeof(ngx_output_chain_ctx_t));
        if (ctx == NULL) {
            return NGX_ERROR;
        }

        ngx_http_set_ctx(r, ctx, ngx_http_copy_filter_module);

        conf = ngx_http_get_module_loc_conf(r, ngx_http_copy_filter_module);
        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

        /* 设置sendfile */
        ctx->sendfile = c->sendfile;
        /* 如果request设置了filter_need_in_memory的话,ctx的这个域就会被设置 */
        ctx->need_in_memory = r->main_filter_need_in_memory
                              || r->filter_need_in_memory;
        /* 和上面类似 */
        ctx->need_in_temp = r->filter_need_temporary;

        ctx->alignment = clcf->directio_alignment;

        ctx->pool = r->pool;
        ctx->bufs = conf->bufs;
        ctx->tag = (ngx_buf_tag_t) &ngx_http_copy_filter_module;
        /* 可以看到output_filter就是下一个body filter节点 */
        ctx->output_filter = (ngx_output_chain_filter_pt)
                                  ngx_http_next_body_filter;
        /* 此时filter ctx为当前的请求 */
        ctx->filter_ctx = r;

    ...

        if (in && in->buf && ngx_buf_size(in->buf)) {
            r->request_output = 1;
        }
    }

    ...

    for ( ;; ) {
        /* 最关键的函数,下面会详细分析 */
        rc = ngx_output_chain(ctx, in);

        if (ctx->in == NULL) {
            r->buffered &= ~NGX_HTTP_COPY_BUFFERED;

        } else {
            r->buffered |= NGX_HTTP_COPY_BUFFERED;
        }

        ...

        return rc;
    }
}

上面的代码去掉了AIO相关的部分,函数首先设置并初始化context,接着调用ngx_output_chain函数,这个函数实际上包含了copy filter模块的主要逻辑,它的原型为:

ngx_int_t
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)

分段来看它的代码,下面这段代码是一个快捷路径(short path),也就是说当能直接确定所有的in chain都不需要复制的时,可以直接调用output_filter来交给剩下的filter去处理:

    if (ctx->in == NULL && ctx->busy == NULL) {

        /*
         * the short path for the case when the ctx->in and ctx->busy chains
         * are empty, the incoming chain is empty too or has the single buf
         * that does not require the copy
         */

        if (in == NULL) {
            return ctx->output_filter(ctx->filter_ctx, in);
        }

        if (in->next == NULL
#if (NGX_SENDFILE_LIMIT)
            && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT)
#endif
            && ngx_output_chain_as_is(ctx, in->buf))
        {
            return ctx->output_filter(ctx->filter_ctx, in);
        }
    }

上面可以看到了一个函数ngx_output_chain_as_is,这个函数很关键,下面还会再次被调用,这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝:

static ngx_inline ngx_int_t
ngx_output_chain_as_is(ngx_output_chain_ctx_t *ctx, ngx_buf_t *buf)
{
    ngx_uint_t  sendfile;

    /* 是否为特殊buf(special buf),是的话返回1,也就是不用拷贝 */
    if (ngx_buf_special(buf)) {
        return 1;
    }

    /* 如果buf在文件中,并且使用了directio的话,需要拷贝buf */
    if (buf->in_file && buf->file->directio) {
        return 0;
    }

    /* sendfile标记 */
    sendfile = ctx->sendfile;

#if (NGX_SENDFILE_LIMIT)
    /* 如果pos大于sendfile的限制,设置标记为0 */
    if (buf->in_file && buf->file_pos >= NGX_SENDFILE_LIMIT) {
        sendfile = 0;
    }

#endif

    if (!sendfile) {
        /* 如果不走sendfile,而且buf不在内存中,则我们就需要复制到内存一份 */
        if (!ngx_buf_in_memory(buf)) {
            return 0;
        }

        buf->in_file = 0;
    }

    /* 如果需要内存中有一份拷贝,而并不在内存中,此时返回0,表示需要拷贝 */
    if (ctx->need_in_memory && !ngx_buf_in_memory(buf)) {
        return 0;
    }

    /* 如果需要内存中有可修改的拷贝,并且buf存在于只读的内存中或者mmap中,则返回0 */ 
    if (ctx->need_in_temp && (buf->memory || buf->mmap)) {
        return 0;
    }

    return 1;
}

上面有两个标记要注意,一个是need_in_memory ,这个主要是用于当使用sendfile的时候,Nginx并不会将请求文件拷贝到内存中,而有时需要操作文件的内容,此时就需要设置这个标记。然后后面的body filter就能操作内容了。 


第二个是need_in_temp,这个主要是用于把本来就存在于内存中的buf复制一份可修改的拷贝出来,这里有用到的模块有charset,也就是编解码 filter。

然后接下来这段是复制in chain到ctx->in的结尾,它是通过调用ngx_output_chain_add_copy来进行add copy的,这个函数比较简单,这里就不分析了,不过只有一个要注意的地方,那就是如果buf是存在于文件中,并且file_pos超过了sendfile limit,此时就会切割buf为两个buf,然后保存在两个chain中,最终连接起来:

    /* add the incoming buf to the chain ctx->in */

    if (in) {
        if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

然后就是主要的逻辑处理阶段。这里nginx做的非常巧妙也非常复杂,首先是chain的重用,然后是buf的重用。 

先来看chain的重用。关键的几个结构以及域:ctx的free,busy以及ctx->pool的chain域。 

其中每次发送没有发完的chain就放到busy中,而已经发送完毕的就放到free中,而最后会调用  ngx_free_chain来将free的chain放入到pool->chain中,而在ngx_alloc_chain_link中,如果pool->chain中存在chain的话,就不用malloc了,而是直接返回pool->chain,相关的代码如下:

/* 链接cl到pool->chain中 */  
#define ngx_free_chain(pool, cl)                                               
    cl->next = pool->chain;                                                    
    pool->chain = cl  

/* 从pool中分配chain */
ngx_chain_t *  
ngx_alloc_chain_link(ngx_pool_t *pool)  
{  
    ngx_chain_t  *cl;  
  
    cl = pool->chain;  
    /* 如果cl存在,则直接返回cl */
    if (cl) {  
        pool->chain = cl->next;  
        return cl;  
    }  
    /* 否则才会malloc chain */  
    cl = ngx_palloc(pool, sizeof(ngx_chain_t));  
    if (cl == NULL) {  
        return NULL;  
    }  
  
    return cl;  
}  

然后是buf的重用,严格意义上来说buf的重用是从free中的chain中取得的,当free中的buf被重用,则这个buf对应的chain就会被链接到ctx->pool中,从而这个chain就会被重用。也就是说首先考虑的是buf的重用,只有当这个chain的buf确定不需要被重用(或者说已经被重用)的时候,chain才会被链接到ctx->pool中被重用。 

还有一个就是ctx的allocated域,这个域表示了当前的上下文中已经分配了多少个buf,output_buffer命令用来设置output的buf大小以及buf的个数。而allocated如果比output_buffer大的话,则需要先发送完已经存在的buf,然后才能再次重新分配buf。 

来看代码,上面所说的重用以及buf的控制,代码里面都可以看的比较清晰。下面这段主要是拷贝buf前所做的一些工作,比如判断是否拷贝,以及给buf分贝内存等:

    /* out为最终需要传输的chain,也就是交给剩下的filter处理的chain */
    out = NULL;  
    /* last_out为out的最后一个chain */  
    last_out = &out;  
    last = NGX_NONE;  
  
    for ( ;; ) {  
  
        /* 开始遍历chain */  
        while (ctx->in) {  
  
            /* 取得当前chain的buf大小 */  
            bsize = ngx_buf_size(ctx->in->buf);  
  
            /* 跳过bsize为0的buf */  
            if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {  
                ngx_debug_point();  
  
                ctx->in = ctx->in->next;  
  
                continue;  
            }  
  
            /* 判断是否需要复制buf */  
            if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {  
  
                /* move the chain link to the output chain */  
                /* 如果不需要复制,则直接链接chain到out,然后继续循环 */  
                cl = ctx->in;  
                ctx->in = cl->next;  
  
                *last_out = cl;  
                last_out = &cl->next;  
                cl->next = NULL;  
  
                continue;  
            }  
  
            /* 到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中,
               因此这里先判断ctx->buf是否为空 */  
            if (ctx->buf == NULL) {  
  
                /* 如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话,
                   这个函数都会返回NGX_DECLINED */  
                rc = ngx_output_chain_align_file_buf(ctx, bsize);  
  
                if (rc == NGX_ERROR) {  
                    return NGX_ERROR;  
                }  
  
                /* 大部分情况下,都会落入这个分支 */  
                if (rc != NGX_OK) {  
  
                    /* 准备分配buf,首先在free中寻找可以重用的buf */
                    if (ctx->free) {  
  
                        /* get the free buf */  
                        /* 得到free buf */  
                        cl = ctx->free;  
                        ctx->buf = cl->buf;  
                        ctx->free = cl->next;  
                        /* 将要重用的chain链接到ctx->poll中,以便于chain的重用 */  
                        ngx_free_chain(ctx->pool, cl);  
  
                    } else if (out || ctx->allocated == ctx->bufs.num) {  
                        /* 如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf。
                           这里可以看到如果out存在的话,nginx会跳出循环,然后发送out,
                           等发送完会再次处理,这里很好的体现了nginx的流式处理 */  
                        break;  
  
                    } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {  
                        /* 上面这个函数也比较关键,它用来取得buf。接下来会详细看这个函数 */  
                        return NGX_ERROR;  
                    }  
                }  
            }  
            /* 从原来的buf中拷贝内容或者从文件中读取内容 */
            rc = ngx_output_chain_copy_buf(ctx);

            if (rc == NGX_ERROR) {
                return rc;
            }

            if (rc == NGX_AGAIN) {
                if (out) {
                    break;
                }

                return rc;
            }

            /* delete the completed buf from the ctx->in chain */

            if (ngx_buf_size(ctx->in->buf) == 0) {
                ctx->in = ctx->in->next;
            }
            /* 分配新的chain节点 */
            cl = ngx_alloc_chain_link(ctx->pool);
            if (cl == NULL) {
                return NGX_ERROR;
            }

            cl->buf = ctx->buf;
            cl->next = NULL;
            *last_out = cl;
            last_out = &cl->next;
            ctx->buf = NULL; 
        } 
        ...
    }

上面的代码分析的时候有个很关键的函数,那就是ngx_output_chain_get_buf,这个函数当没有可重用的buf时用来分配buf。 
如果当前的buf位于最后一个chain,则需要特殊处理,一是buf的recycled域,另外是将要分配的buf的大小。 

先来说recycled域,这个域表示当前的buf需要被回收。而一般情况下Nginx(比如在非last buf)会缓存一部分buf(默认是1460字节),然后再发送,而设置了recycled的话,就不会让它缓存buf,也就是尽量发送出去,然后以供回收使用。 因此如果是最后一个buf,则不需要设置recycled域的,否则的话,需要设置recycled域。

然后就是buf的大小。这里会有两个大小,一个是需要复制的buf的大小,一个是配置文件中设置的大小。如果不是最后一个buf,则只需要分配配置中设置的buf的大小就行了。如果是最后一个buf,则就处理不太一样,下面的代码会看到:

static ngx_int_t  
ngx_output_chain_get_buf(ngx_output_chain_ctx_t *ctx, off_t bsize)  
{  
    size_t       size;  
    ngx_buf_t   *b, *in;  
    ngx_uint_t   recycled;  
  
    in = ctx->in->buf;  
    /* 可以看到这里分配的buf,每个buf的大小是配置文件中设置的size */  
    size = ctx->bufs.size;  
    /* 默认有设置recycled域 */  
    recycled = 1;  
    /* 如果当前的buf是属于最后一个chain的时候,需要特殊处理 */  
    if (in->last_in_chain) {  
        /* 如果buf大小小于配置指定的大小,则直接按实际大小分配,不设置回收标记 */
        if (bsize < (off_t) size) {  
  
            /* 
             * allocate a small temp buf for a small last buf 
             * or its small last part 
             */  
            size = (size_t) bsize;  
            recycled = 0;  
  
        } else if (!ctx->directio  
                   && ctx->bufs.num == 1  
                   && (bsize < (off_t) (size + size / 4)))  
        {  
            /* 
             * allocate a temp buf that equals to a last buf, 
             * if there is no directio, the last buf size is lesser 
             * than 1.25 of bufs.size and the temp buf is single 
             */  
  
            size = (size_t) bsize;  
            recycled = 0;  
        }  
    }  
    /* 开始分配buf内存 */  
    b = ngx_calloc_buf(ctx->pool);  
    if (b == NULL) {  
        return NGX_ERROR;  
    }  
  
    if (ctx->directio) {  
        /* directio需要对齐 */  
  
        b->start = ngx_pmemalign(ctx->pool, size, (size_t) ctx->alignment);  
        if (b->start == NULL) {  
            return NGX_ERROR;  
        }  
  
    } else {  
        /* 大部分情况会走到这里 */  
        b->start = ngx_palloc(ctx->pool, size);  
        if (b->start == NULL) {  
            return NGX_ERROR;  
        }  
    }  
  
    b->pos = b->start;  
    b->last = b->start;  
    b->end = b->last + size;  
    /* 设置temporary */  
    b->temporary = 1;  
    b->tag = ctx->tag;  
    b->recycled = recycled;  
  
    ctx->buf = b;  
    /* 更新allocated,可以看到每分配一个就加1 */  
    ctx->allocated++;  
  
    return NGX_OK;  
}  

分配新的buf和chain,并调用ngx_output_chain_copy_buf拷贝完数据之后,Nginx就将新的chain链表交给下一个body filter继续处理:

        if (out == NULL && last != NGX_NONE) {

            if (ctx->in) {
                return NGX_AGAIN;
            }

            return last;
        }

        last = ctx->output_filter(ctx->filter_ctx, out);

        if (last == NGX_ERROR || last == NGX_DONE) {
            return last;
        }

        ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
                                ctx->tag);
        last_out = &out;

在其他body filter处理完之后,ngx_output_chain函数还需要更新chain链表,以便回收利用,ngx_chain_update_chains函数主要是将处理完毕的chain节点放入到free链表,没有处理完毕的放到busy链表中,另外这个函数用到了tag,它只回收copy filter产生的chain节点。

 

ngx_http_write_filter_module

ngx_http_write_filter_module是最后一个body filter,可以看到它的注册函数的特殊性:

static ngx_int_t
ngx_http_write_filter_init(ngx_conf_t *cf)
{
    ngx_http_top_body_filter = ngx_http_write_filter;

    return NGX_OK;
}

ngx_http_write_filter_module是第一个注册body filter的模块,于是它也是最后一个执行的body filter模块。

直接来看ngx_http_write_filter,下面的代码中去掉了一些调试代码:

ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    off_t                      size, sent, nsent, limit;
    ngx_uint_t                 last, flush;
    ngx_msec_t                 delay;
    ngx_chain_t               *cl, *ln, **ll, *chain;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

    c = r->connection;

    if (c->error) {
        return NGX_ERROR;
    }

    size = 0;
    flush = 0;
    last = 0;
    ll = &r->out;

    /* find the size, the flush point and the last link of the saved chain */
 
    for (cl = r->out; cl; cl = cl->next) {
        ll = &cl->next;

#if 1
        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
            return NGX_ERROR;
        }
#endif

        size += ngx_buf_size(cl->buf);

        if (cl->buf->flush || cl->buf->recycled) {
            flush = 1;
        }

        if (cl->buf->last_buf) {
            last = 1;
        }
    }

    /* add the new chain to the existent one */

    for (ln = in; ln; ln = ln->next) {
        cl = ngx_alloc_chain_link(r->pool);
        if (cl == NULL) {
            return NGX_ERROR;
        }

        cl->buf = ln->buf;
        *ll = cl;
        ll = &cl->next;

#if 1
        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
            return NGX_ERROR;
        }
#endif

        size += ngx_buf_size(cl->buf);

        if (cl->buf->flush || cl->buf->recycled) {
            flush = 1;
        }

        if (cl->buf->last_buf) {
            last = 1;
        }
    }

    *ll = NULL;

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /*
     * avoid the output if there are no last buf, no flush point,
     * there are the incoming bufs and the size of all bufs
     * is smaller than "postpone_output" directive
     */

    if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
        return NGX_OK;
    }
    /* 如果请求由于被限速而必须延迟发送时,设置一个标识后退出 */
    if (c->write->delayed) {
        c->buffered |= NGX_HTTP_WRITE_BUFFERED;
        return NGX_AGAIN;
    }
    /* 如果buffer总大小为0,而且当前连接之前没有由于底层发送接口的原因延迟,则检查是否有特殊标记 */
    if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
        /* last_buf标记,表示请求体已经发送结束 */
        if (last) {
            r->out = NULL;
            c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

            return NGX_OK;
        }
        /* flush生效,而且又没有实际数据,则清空当前的未发送队列 */
        if (flush) {
            do {
                r->out = r->out->next;
            } while (r->out);

            c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

            return NGX_OK;
        }

        return NGX_ERROR;
    }
    /* 请求有速率限制,则计算当前可以发送的大小 */
    if (r->limit_rate) {
        limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
                - (c->sent - clcf->limit_rate_after);

        if (limit <= 0) {
            c->write->delayed = 1;
            ngx_add_timer(c->write,
                          (ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));

            c->buffered |= NGX_HTTP_WRITE_BUFFERED;

            return NGX_AGAIN;
        }

        if (clcf->sendfile_max_chunk
            && (off_t) clcf->sendfile_max_chunk < limit)
        {
            limit = clcf->sendfile_max_chunk;
        }

    } else {
        limit = clcf->sendfile_max_chunk;
    }

    sent = c->sent;
    /* 发送数据 */
    chain = c->send_chain(c, r->out, limit);

    if (chain == NGX_CHAIN_ERROR) {
        c->error = 1;
        return NGX_ERROR;
    }
    /* 更新限速相关的信息 */
    if (r->limit_rate) {

        nsent = c->sent;

        if (clcf->limit_rate_after) {

            sent -= clcf->limit_rate_after;
            if (sent < 0) {
                sent = 0;
            }

            nsent -= clcf->limit_rate_after;
            if (nsent < 0) {
                nsent = 0;
            }
        }

        delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate);

        if (delay > 0) {
            limit = 0;
            c->write->delayed = 1;
            ngx_add_timer(c->write, delay);
        }
    }

    if (limit
        && c->write->ready
        && c->sent - sent >= limit - (off_t) (2 * ngx_pagesize))
    {
        c->write->delayed = 1;
        ngx_add_timer(c->write, 1);
    }
    /* 更新输出链,释放已经发送的节点 */
    for (cl = r->out; cl && cl != chain; /* void */) {
        ln = cl;
        cl = cl->next;
        ngx_free_chain(r->pool, ln);
    }

    r->out = chain;
    /* 如果数据未发送完毕,则设置一个标记 */
    if (chain) {
        c->buffered |= NGX_HTTP_WRITE_BUFFERED;
        return NGX_AGAIN;
    }

    c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
    /* 如果由于底层发送接口导致数据未发送完全,且当前请求没有其他数据需要发送,此时要返回NGX_AGAIN,表示还有数据未发送 */
    if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
        return NGX_AGAIN;
    }

    return NGX_OK;
}

Nginx将待发送的chain链表保存在r->out,上面的函数先检查之前未发送完的链表中是否有flush,recycled以及last_buf标识,并计算所有buffer的大小,接着对新输入的chain链表做同样的事情,并将新链表加到r->out的队尾。

如果没有输出链表中没有被标识为最后一块buffer的节点,而且没有需要flush或者急着回收的buffer,并且当前队列中buffer总大小不够postpone_output指令设置的大小(默认为1460字节)时,函数会直接返回。

ngx_http_write_filter会调用c->send_chain往客户端发送数据,c->send_chain的取值在不同操作系统,编译选项以及协议下(https下用的是ngx_ssl_send_chain)会取不同的函数,典型的linux操作系统下,它的取值为ngx_linux_sendfile_chain,也就是最终会调用这个函数来发送数据。它的函数原型为:

ngx_chain_t *
ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)

第一个参数是当前的连接,第二个参数是所需要发送的chain,第三个参数是所能发送的最大值。 

首先看一下这个函数定义的一些重要局部变量:

send 表示将要发送的buf已经已经发送的大小;  
sent表示已经发送的buf的大小; 
prev_send 表示上一次发送的大小,也就是已经发送的buf的大小;  
fprev 和prev-send类似,只不过是file类型的; 
complete表示是否buf被完全发送了,也就是sent是否等于send - prev_send;
header表示需要是用writev来发送的buf,也就是only in memory的buf;  
struct iovec  *iov, headers[NGX_HEADERS] 这个主要是用于sendfile和writev的参数,这里注意上面header数组保存的就是iovec。


下面看函数开头的一些初始化代码:

    wev = c->write;  
  
    if (!wev->ready) {  
        return in;  
    }  

    /* the maximum limit size is 2G-1 - the page size */

    if (limit == 0 || limit > (off_t) (NGX_SENDFILE_LIMIT - ngx_pagesize)) {  
        limit = NGX_SENDFILE_LIMIT - ngx_pagesize;  
    }  
  
  
    send = 0;  

    /* 设置header,也就是in memory的数组 */  
    header.elts = headers;  
    header.size = sizeof(struct iovec);  
    header.nalloc = NGX_HEADERS;  
    header.pool = c->pool;

下面这段代码就是处理in memory的部分,然后将buf放入对应的iovec数组,处理核心思想就是合并内存连续并相邻的buf(不管是in memory还是in file):

        for (cl = in; cl && send < limit;  cl = cl->next) {  

            if (ngx_buf_special(cl->buf)) {  
                continue;  
            }  

            /* 如果既不在内存中,又不在文件中,则返回错误 */
            if (!ngx_buf_in_memory(cl->buf) && !cl->buf->in_file) {
                return NGX_CHAIN_ERROR;
            }  

            /* 如果不只是在buf中,这是因为有时in file的buf可能需要内存中也有拷贝,
               如果一个buf同时in memoey和in file的话,Nginx会把它当做in file来处理 */  
            if (!ngx_buf_in_memory_only(cl->buf)) {  
                break;  
            } 
  
            /* 得到buf的大小 */
            size = cl->buf->last - cl->buf->pos;  
  
            /* 大于limit的话修改为size */ 
            if (send + size > limit) {  
                size = limit - send;  
            }
 
            /* 如果prev等于pos,则说明当前的buf的数据和前一个buf的数据是连续的 */ 
            if (prev == cl->buf->pos) {  
                iov->iov_len += (size_t) size;  
  
            } else {  
                if (header.nelts >= IOV_MAX) {
                    break;
                }
                /* 否则说明是不同的buf,因此增加一个iovc */  
                iov = ngx_array_push(&header);  
                if (iov == NULL) {  
                    return NGX_CHAIN_ERROR;  
                }  
  
                iov->iov_base = (void *) cl->buf->pos;  
                iov->iov_len = (size_t) size;  
            }  
  
            /* 这里可以看到prev保存了当前buf的结尾 */  
            prev = cl->buf->pos + (size_t) size;  
            /* 更新发送的大小 */ 
            send += size;  
        }    

然后是in file的处理,这里比较核心的一个判断就是fprev == cl->buf->file_pos,和上面的in memory类似,fprev保存的就是上一次处理的buf的尾部。这里如果这两个相等,那就说明当前的两个buf是连续的(文件连续):

        /* 如果header的大小不为0则说明前面有需要发送的buf,
           并且数据大小已经超过限制则跳过in file处理 */
        if (header.nelts == 0 && cl && cl->buf->in_file && send < limit) {  
            /* 得到file  
            file = cl->buf;  
  
            /* 开始合并 */  
            do {  
                /* 得到大小 */  
                size = cl->buf->file_last - cl->buf->file_pos;  
  
                /* 如果太大则进行对齐处理 */  
                if (send + size > limit) {  
                    size = limit - send;  
  
                    aligned = (cl->buf->file_pos + size + ngx_pagesize - 1)  
                               & ~((off_t) ngx_pagesize - 1);  
  
                    if (aligned <= cl->buf->file_last) {  
                        size = aligned - cl->buf->file_pos;  
                    }  
                }  
  
                /* 设置file_size */  
                file_size += (size_t) size;  
                /* 设置需要发送的大小 */  
                send += size;  
                /* 和上面的in memory处理一样就是保存这次的last */  
                fprev = cl->buf->file_pos + size;  
                cl = cl->next;  
  
            } while (cl  
                     && cl->buf->in_file  
                     && send < limit  
                     && file->file->fd == cl->buf->file->fd  
                     && fprev == cl->buf->file_pos);  
        } 

然后就是发送部分,这里in file使用sendfile,in memory使用writev。处理逻辑比较简单,就是发送后判断发送成功的大小 

        if (file) {  
#if 1  
            if (file_size == 0) {  
                ngx_debug_point();  
                return NGX_CHAIN_ERROR;  
            }  
#endif  
#if (NGX_HAVE_SENDFILE64)  
            offset = file->file_pos;  
#else  
            offset = (int32_t) file->file_pos;  
#endif  
  
            /* 数据在文件中则调用sendfile发送数据 */
            rc = sendfile(c->fd, file->file->fd, &offset, file_size);  

            ...

            /* 得到发送成功的字节数 */  
            sent = rc > 0 ? rc : 0;  
  
        } else {
            /* 数据在内存中则调用writev发送数据 */  
            rc = writev(c->fd, header.elts, header.nelts);  
           
            ...
            /* 得到发送成功的字节数 */
            sent = rc > 0 ? rc : 0;  
        }

接下来就是需要根据发送成功的字节数来更新chain:

        /* 如果send - prev_send == sent则说明该发送的都发完了 */  
        if (send - prev_send == sent) {  
            complete = 1;  
        }  
        /* 更新congnect的sent域 */  
        c->sent += sent;  
  
        /* 开始重新遍历chain,这里是为了防止没有发送完全的情况,此时我们就需要切割buf了 */  
        for (cl = in; cl; cl = cl->next) {  
  
            if (ngx_buf_special(cl->buf)) {  
                continue;  
            }  
  
            if (sent == 0) {  
                break;  
            }  
            /* 得到buf size */ 
            size = ngx_buf_size(cl->buf);  
  
            /* 如果大于当前的size,则说明这个buf的数据已经被完全发送完毕了,因此更新它的域 */  
            if (sent >= size){  
                /* 更新sent域 */  
                sent -= size;  
                /* 如果在内存则更新pos */  
                if (ngx_buf_in_memory(cl->buf)) {  
                    cl->buf->pos = cl->buf->last;  
                }  
                /* 如果在file中则更显file_pos */  
                if (cl->buf->in_file) {  
                    cl->buf->file_pos = cl->buf->file_last;  
                }  
  
                continue;  
            }  
  
            /* 到这里说明当前的buf只有一部分被发送出去了,因此只需要修改指针。以便于下次发送 */  
            if (ngx_buf_in_memory(cl->buf)) {  
                cl->buf->pos += (size_t) sent;  
            }  
            /* 同上 */
            if (cl->buf->in_file) {  
                cl->buf->file_pos += sent;  
            }  
  
            break;  
        }  

最后一部分是一些是否退出循环的判断。这里要注意,Nginx中如果发送未完全的话,将会直接返回,返回的就是没有发送完毕的chain,它的buf也已经被更新。然后Nginx返回去处理其他的事情,等待可写之后再次发送未发送完的数据:

        if (eintr) {  
            continue;  
        }  
        /* 如果未完成,则设置wev->ready为0后返回 */  
        if (!complete) {  
            wev->ready = 0;  
            return cl;  
        }  
        /* 发送数据超过限制,或没有数据了 */
        if (send >= limit || cl == NULL) {  
            return cl;  
        }  
        /* 更新in,也就是开始处理下一个chain */
        in = cl;  


un → zh
声明:该文观点仅代表作者本人,牛骨文系教育信息发布平台,牛骨文仅提供信息存储空间服务。