Nignx 并发模型是典形的非阻塞 IO 和事件驱动 (event-driven) 模式 (其它并发模型参见 The C10K Problem。它在各个平台上使用本平台支持 的同步 IO 事件监听机制 (select, poll, epoll, kqueue etc.) 捕获网络事件, 并调用这些事件对应的回调处理函数,完成请求的处理。

本篇就请求处理过程中,需要对 socket 监听的事件和事件的回调函数切换进行总结。

预备知识

  • 事件驱动的并发模型,基本上可以认为是一个有限状态自动转换机, 一个任务 (即一个请求) 在预先定义的各个状态中切换。而状态的切换是由事件 (读事件、 写事件、超时事件等等) 触发并驱动。由于此并发模型要同时处理多个任务,那么也就要对 各个任务的状态进行记录。某个任务无事件发生时,要记下它的状态并即时切换到另外的有 事件发生的任务中。当被挂起的任务有事件发生时,根据上次保存下来的它的最后状态, 接着执行下去。

  • Nginx 中使用 ngx_event_t 表示内部发生的事件,并使用其 handler 成员函数完成 事件处理逻辑。 (文件将 handler 函数 和 事件结构体的回调函数混用,同指 handler 函数)。

  • Nginx 中每个普通连接 ngx_connection_t 包含两个分别表示读事件和写事件的 ngx_event_t 类型成员。

  • 连接上发生读事件 - 指此连接上有数据可读,或者客户端关闭了此连接。

  • 连接上发生写事件 - 指此连接 socket 发送缓冲区有空闲空间,Nginx 可以向此 连接写入数据。

  • 将封装了 select, epoll 等事件监听和通知函数的模块统称为 事件监听机制

  • Nginx 每个监控 socket 对应结构体 ngx_listening_t 都包含一个连接 ngx_connection_t,便于事件监听机制统一处理普通连接和监听 socket 上的事件。

  • 事件监听机制的阻塞函数,会根据事件的类型,调用不同事件结构体对应的回调函数。比 如 Nginx 对 epoll 的封装模块阻塞函数 ngx_epoll_process_events

    ------------event/modules/ngx_event_epoll.c:513---------
    static ngx_int_t
    ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
    {
        ...
        events = epoll_wait(ep, event_list, (int) nevents, timer;
        ...
        for (i = 0; i < events; i++) {
            c = event_list[i].data.ptr;
            ...
            rev->c->read;
            ...
            if ((revents & EPOLLIN) && rev->active) {
                ...
                rev->handler(rev);
            }
    
            wev = c->write;
    
            if ((revents & EPOLLOUT) && wev->active) {
                wev->handler(rev);
            }
            ...
        }
    }
    
  • 此文不考虑请求处理过程中发生的异常 (数据读写超时、接收或发送过程中客户端关闭 连接等情形) 和某些特殊处理 (比如 开启了defer red accept 等情形)。

接入连接

新连接的到来,会触发监听 socket 上的读事件,读事件处理函数在 worker 进程初始 化时设定。

    ngx_worker_process_cycle
        ngx_worker_process_init
            ngx_event_process_init

                ls = cycle->listening.elts;
                for (i = 0; i < cycle->listening.nelts; i++) {
                    c = ngx_get_connection(ls[i].fd, cycle->log);
                    ...
                    rev = c->read;
                    rev->accept = 1;
                    ...
                    rev->handler = ngx_event_accept;
                }

ngx_listen_t 结构体的 handler 函数用于接收到新的连接之后,根据业务对连接进行 进一步的初始化。

    ------------http/ngx_http.c:1670-----------
    static ngx_listening_t *
    ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
    {
        ...
        ls->handler = ngx_http_init_connection;
        ...
    }

然后,worker 进程竞争到 accept_mutex 后,对监听 socket 进行读事件监听 (详 情参见 延迟操作)。

    ------------event/ngx_event_accept.c:309---
    static ngx_int_t
    ngx_enable_accept_events(ngx_cycle_t *cycle)
    {
        ...
        c = ls[i].connection;
        ...
        ngx_add_event(c->read, NGX_READ_EVENT, 0);
        ...
    }

当监听 socket 上有新连接到来时,事件监听机制调用监听 socket 的读事件结构体中 的 handler 成员函数,即 ngx_event_accept 函数,接入新的连接,分配必要的数据 结构,并且调用 ngx_listening_t 中跟业务 (HTTP, MAIL) 相关的 handler 对新连接 进行初始化 (比如初始化新连接的业务相关成员、设置连接的事件结构体及回调函数等等)。

    --------------http/ngx_event_accept.c:17---------------
    void
    ngx_event_accept(ngx_event_t *ev)
    {
        ...
        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
        ...
        c = ngx_get_connection(s, ev->log);
        ...
        c->listening = ls;
        ...
        ls->handler(c);
    }

上文已经看到,在 HTTP 模式下,ngx_listening_thandler 函数为 ngx_http_init_connection。此函数设置新接入的普通连接的读事件回调函数为 ngx_http_init_request,设置写事件回调函数为 ngx_http_empty_handler,而后将连 接读事件结构体注册到事件驱动机制中,并添加读事件超时处理。

    -------------http/ngx_http_request.c:177--------------
    void
    ngx_http_init_connection(ngx_connection_t *c)
    {
        ...
        rev = c->read;
        rev->handler = ngx_http_init_request;
        c->write->handler = ngx_http_empty_handler;
        ...
        ngx_add_timer(rev, c->listening->post_accept_timeout);

        ngx_handle_read_event(rev, 0);
        ...
    }

请求读取

ngx_http_empty_handler 函数没有任何实质性逻辑,因为根据 HTTP 的交互规则,接到 请求并处理完成后,才需要向此连接写入数据,在此之前,不关心此连接是否可写。

当普通连接上发生读事件时,事件驱动机制调用读事件回调函数 ngx_http_init_request。 此函数构造 ngx_http_request_t 变量,对其进行初始化,并设置新的读事件回调函数。

    ----------------http/ngx_http_request.c:232----------
    static void
    ngx_http_init_request(ngx_event_t *rev)
    {
        ...
        c = rev->data;
        ...
        r = ngx_pcalloc(c->pool, sizeof(ngx_http_request_t));
        ...
        r->connection = c;
        ...
        r->main_conf = cscf->ctx->main_conf;
        r->srv_conf = cscf->ctx->srv_conf;
        r->loc_conf = cscf->ctx->loc_conf;

        rev->handler = ngx_http_process_request_line;
        r->read_event_handler = ngx_http_block_reading;
        ...
        rev->handler(rev);
    }

ngx_http_init_request 函数直接调用 ngx_http_process_request_line 尝试从连接 中读取并处理 HTTP requeststatus line。如果接收缓存区的数据包含有完整的 status line,解析处理后,继续对缓冲区数据进行分析,尝试读取 HTTP requestheaders。如果任何一个环节的解析,缓冲区数据不足时,Nginx 会将此连接读事件结 构体再次加入事件监听机制中,直到有新的数据到达再继续进行处理。

    --------------http/ngx_http_request.c:680---------------
    static void
    ngx_http_process_request_line(ngx_event_t *rev)
    {
        ...
        n = ngx_http_read_request_header(r);
        ...
        rc = ngx_http_parse_request_line(r, r->header_in);
        ...
        rev->handler = ngx_http_process_request_headers;
        ngx_http_process_request_headers(rev);
        ...
    }

    --------------http/ngx_http_request.c:913---------------
    static void
    ngx_http_process_request_headers(ngx_event_t *rev)
    {
        ...
        c = rev->data;
        r = c->data;
        ...
        for ( ;; ) {
            ...
            n = ngx_http_read_request_header(r)
            ...
            rc = ngx_http_parse_header_line(r, r->header_in,
                                            cscf->underscores_in_headers);
            ...
            if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
                ngx_http_process_request(r);
                return;
            }
            ...
        }
        ...
    }

headers 也解析完成后,调用 ngx_http_process_request 对请求的各个参数进行检验 和根据参数对请求配置进行调整。同时,设置新的事件回调函数。

    --------------http/ngx_http_request.c:1551---------------
    static void
    ngx_http_process_request(ngx_http_request_t *r)
    {
        ...
        c->read->handler = ngx_http_request_handler;
        c->write->handler = ngx_http_request_handler;
        r->read_event_handler = ngx_http_block_reading;

        ngx_http_handler(r);
        =>  r->write_event_handler = ngx_http_core_run_phases;
            ngx_http_core_run_phases(r);
        ...
    }

ngx_http_core_run_phase 函数使请求处理进入到了 phase 处理状态 (参见 模块分类)。这个状态调用各个模块对请求进行处理。直到处理完 毕,然后开始往客户端发送处理结果。

phase 处理过程中,读写事件回调函数被统一设置为 ngx_http_request_handler。 这个函数随后成为事件监听机制对该请求的唯一调用,而具体事件的处理逻辑由 ngx_http_request_tread_event_handlerwrite_event_handler 成员函数 负责。

结果返回

拿最常见的静态文件请求做例子。在 ngx_http_static_module 模块接到请求时,在本地 文件系统查找文件,并准备返回包头和包体数据。

略去一系列的 filter 模块处理暂且不表,数据发送逻辑在 ngx_http_write_filter_module 模块的 ngx_http_write_filter 中实现。

    --------------http/ngx_http_write_filter_module.c:46-----
    ngx_int_t
    ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
    {
        ...
        c = r->connection;
        ...
        chain = c->send_chain(c, r->out, limit);
        ...
        if (chain) {
            c->buffered |= NGX_HTTP_WRITE_BUFFERED;
            return NGX_AGAIN;
        }
        ...
    }

如果,第一次调用 ngx_http_write_filter 没有将数据全部发送的话,它会返回 NGX_AGAIN。这时执行流程回到 ngx_http_core_content_phase 中:

    -------------http/ngx_http_core_module.c:1302-----------
    ngx_int_t
    ngx_http_core_content_phase(ngx_http_request_t *r,
        ngx_http_phase_handler_t *ph)
    {
        ...
        rc = ph->handler(r);

        if (rc != NGX_DECLINED) {
            ngx_http_finalize_request(r, rc);
            return NGX_OK;
        }
        ...
    }

    ------------http/ngx_http_request.c:1840---------------
    void
    ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
    {
        ...
        if (r->buffered || c->buffered || r->postponed || r->blocked) {
            if (ngx_http_set_write_handler(r) != NGX_OK) {
                ngx_http_terminate_request(r, 0);
            }

            return;
        }
        ...
    }

ngx_http_set_write_handler 调整 read_event_handlerwrite_event_handler 的值,并且开始监听该请求所在连接上的写事件:

    -------------http/ngx_http_request.c:2133--------------
    static ngx_int_t
    ngx_http_set_write_handler(ngx_http_request_t *r)
    {
        ...
        r->read_event_handler = r->discard_body ?
                                    ngx_http_discarded_request_body_handler:
                                    ngx_http_test_reading;
        r->write_event_handler = ngx_http_writer;
        ...
        if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
            ngx_http_close_request(r, 0);
            return NGX_ERROR;
        }
        ...
    }

当连接上有写事件发生时,事件监听机制调用写事件回调函数 ngx_http_request_handlerngx_http_request_handler 函数又调用 write_event_handler 指向的回调函数 ngx_http_writerngx_http_writer 函数再次尝试将数据发送出去:

    ---------------http/ngx_http_request.c:2166------------
    static void
    ngx_http_writer(ngx_http_request_t *r)
    {
        ...
        rc = ngx_http_output_filter(r, NULL);
        ...
        if (r->buffered || r->postponed || (r == r->main && c->buffered)) {
            if (!wev->ready && !wev->delayed) {
                ngx_add_timer(wev, clcf->send_timeout);
            }

            if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
                ngx_http_close_request(r, ));
            }

            return;
        }
        ...
        r->write_event_handler = ngx_http_request_empty_handler;

        ngx_http_finalize_request(r, rc);
    }

如果数据依然未完全发送,ngx_http_writer 函数会再次将写事件加入监听机制中。如果 数据发送完成,尝试再次回收请求结构体资源。

Comments

不要轻轻地离开我,请留下点什么...

comments powered by Disqus

Published

Category

Nginx

Tags

Contact