Nignx 并发模型是典形的非阻塞 IO 和事件驱动 (event-driven) 模式 (其它并发模型参见
The C10K Problem。它在各个平台上使用本平台支持
的同步 IO 事件监听机制 (select
, poll
, epoll
, kqueue
etc.) 捕获网络事件,
并调用这些事件对应的回调处理函数,完成请求的处理。
本篇就请求处理过程中,需要对 socket
监听的事件和事件的回调函数切换进行总结。
预备知识
-
事件驱动的并发模型,基本上可以认为是一个有限状态自动转换机, 一个任务 (即一个请求) 在预先定义的各个状态中切换。而状态的切换是由事件 (读事件、 写事件、超时事件等等) 触发并驱动。由于此并发模型要同时处理多个任务,那么也就要对 各个任务的状态进行记录。某个任务无事件发生时,要记下它的状态并即时切换到另外的有 事件发生的任务中。当被挂起的任务有事件发生时,根据上次保存下来的它的最后状态, 接着执行下去。
-
Nginx 中使用
ngx_event_t
表示内部发生的事件,并使用其handler
成员函数完成 事件处理逻辑。 (文件将handler
函数 和 事件结构体的回调函数混用,同指handler
函数)。 -
Nginx 中每个普通连接
ngx_connection_t
包含两个分别表示读事件和写事件的ngx_event_t
类型成员。 -
连接上发生读事件 - 指此连接上有数据可读,或者客户端关闭了此连接。
-
连接上发生写事件 - 指此连接
socket
发送缓冲区有空闲空间,Nginx 可以向此 连接写入数据。 -
将封装了
select
,epoll
等事件监听和通知函数的模块统称为 事件监听机制。 -
Nginx 每个监控
socket
对应结构体ngx_listening_t
都包含一个连接ngx_connection_t
,便于事件监听机制统一处理普通连接和监听socket
上的事件。 -
事件监听机制的阻塞函数,会根据事件的类型,调用不同事件结构体对应的回调函数。比 如 Nginx 对
epoll
的封装模块阻塞函数ngx_epoll_process_events
:------------event/modules/ngx_event_epoll.c:513--------- static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { ... events = epoll_wait(ep, event_list, (int) nevents, timer; ... for (i = 0; i < events; i++) { c = event_list[i].data.ptr; ... rev->c->read; ... if ((revents & EPOLLIN) && rev->active) { ... rev->handler(rev); } wev = c->write; if ((revents & EPOLLOUT) && wev->active) { wev->handler(rev); } ... } }
-
此文不考虑请求处理过程中发生的异常 (数据读写超时、接收或发送过程中客户端关闭 连接等情形) 和某些特殊处理 (比如 开启了
defer red accept
等情形)。
接入连接
新连接的到来,会触发监听 socket
上的读事件,读事件处理函数在 worker
进程初始
化时设定。
ngx_worker_process_cycle
ngx_worker_process_init
ngx_event_process_init
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
c = ngx_get_connection(ls[i].fd, cycle->log);
...
rev = c->read;
rev->accept = 1;
...
rev->handler = ngx_event_accept;
}
ngx_listen_t
结构体的 handler
函数用于接收到新的连接之后,根据业务对连接进行
进一步的初始化。
------------http/ngx_http.c:1670-----------
static ngx_listening_t *
ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
{
...
ls->handler = ngx_http_init_connection;
...
}
然后,worker
进程竞争到 accept_mutex
后,对监听 socket
进行读事件监听 (详
情参见 延迟操作)。
------------event/ngx_event_accept.c:309---
static ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
...
c = ls[i].connection;
...
ngx_add_event(c->read, NGX_READ_EVENT, 0);
...
}
当监听 socket
上有新连接到来时,事件监听机制调用监听 socket
的读事件结构体中
的 handler
成员函数,即 ngx_event_accept
函数,接入新的连接,分配必要的数据
结构,并且调用 ngx_listening_t
中跟业务 (HTTP, MAIL) 相关的 handler
对新连接
进行初始化 (比如初始化新连接的业务相关成员、设置连接的事件结构体及回调函数等等)。
--------------http/ngx_event_accept.c:17---------------
void
ngx_event_accept(ngx_event_t *ev)
{
...
s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
...
c = ngx_get_connection(s, ev->log);
...
c->listening = ls;
...
ls->handler(c);
}
上文已经看到,在 HTTP 模式下,ngx_listening_t
的 handler
函数为
ngx_http_init_connection
。此函数设置新接入的普通连接的读事件回调函数为
ngx_http_init_request
,设置写事件回调函数为 ngx_http_empty_handler
,而后将连
接读事件结构体注册到事件驱动机制中,并添加读事件超时处理。
-------------http/ngx_http_request.c:177--------------
void
ngx_http_init_connection(ngx_connection_t *c)
{
...
rev = c->read;
rev->handler = ngx_http_init_request;
c->write->handler = ngx_http_empty_handler;
...
ngx_add_timer(rev, c->listening->post_accept_timeout);
ngx_handle_read_event(rev, 0);
...
}
请求读取
ngx_http_empty_handler
函数没有任何实质性逻辑,因为根据 HTTP 的交互规则,接到
请求并处理完成后,才需要向此连接写入数据,在此之前,不关心此连接是否可写。
当普通连接上发生读事件时,事件驱动机制调用读事件回调函数 ngx_http_init_request
。
此函数构造 ngx_http_request_t
变量,对其进行初始化,并设置新的读事件回调函数。
----------------http/ngx_http_request.c:232----------
static void
ngx_http_init_request(ngx_event_t *rev)
{
...
c = rev->data;
...
r = ngx_pcalloc(c->pool, sizeof(ngx_http_request_t));
...
r->connection = c;
...
r->main_conf = cscf->ctx->main_conf;
r->srv_conf = cscf->ctx->srv_conf;
r->loc_conf = cscf->ctx->loc_conf;
rev->handler = ngx_http_process_request_line;
r->read_event_handler = ngx_http_block_reading;
...
rev->handler(rev);
}
ngx_http_init_request
函数直接调用 ngx_http_process_request_line
尝试从连接
中读取并处理 HTTP request
的 status line
。如果接收缓存区的数据包含有完整的
status line
,解析处理后,继续对缓冲区数据进行分析,尝试读取 HTTP request
的 headers
。如果任何一个环节的解析,缓冲区数据不足时,Nginx 会将此连接读事件结
构体再次加入事件监听机制中,直到有新的数据到达再继续进行处理。
--------------http/ngx_http_request.c:680---------------
static void
ngx_http_process_request_line(ngx_event_t *rev)
{
...
n = ngx_http_read_request_header(r);
...
rc = ngx_http_parse_request_line(r, r->header_in);
...
rev->handler = ngx_http_process_request_headers;
ngx_http_process_request_headers(rev);
...
}
--------------http/ngx_http_request.c:913---------------
static void
ngx_http_process_request_headers(ngx_event_t *rev)
{
...
c = rev->data;
r = c->data;
...
for ( ;; ) {
...
n = ngx_http_read_request_header(r)
...
rc = ngx_http_parse_header_line(r, r->header_in,
cscf->underscores_in_headers);
...
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
ngx_http_process_request(r);
return;
}
...
}
...
}
headers
也解析完成后,调用 ngx_http_process_request
对请求的各个参数进行检验
和根据参数对请求配置进行调整。同时,设置新的事件回调函数。
--------------http/ngx_http_request.c:1551---------------
static void
ngx_http_process_request(ngx_http_request_t *r)
{
...
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
ngx_http_handler(r);
=> r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
...
}
ngx_http_core_run_phase
函数使请求处理进入到了 phase
处理状态 (参见
模块分类)。这个状态调用各个模块对请求进行处理。直到处理完
毕,然后开始往客户端发送处理结果。
在 phase
处理过程中,读写事件回调函数被统一设置为 ngx_http_request_handler
。
这个函数随后成为事件监听机制对该请求的唯一调用,而具体事件的处理逻辑由
ngx_http_request_t
的 read_event_handler
和 write_event_handler
成员函数
负责。
结果返回
拿最常见的静态文件请求做例子。在 ngx_http_static_module
模块接到请求时,在本地
文件系统查找文件,并准备返回包头和包体数据。
略去一系列的 filter
模块处理暂且不表,数据发送逻辑在
ngx_http_write_filter_module
模块的 ngx_http_write_filter
中实现。
--------------http/ngx_http_write_filter_module.c:46-----
ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
...
c = r->connection;
...
chain = c->send_chain(c, r->out, limit);
...
if (chain) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
...
}
如果,第一次调用 ngx_http_write_filter
没有将数据全部发送的话,它会返回
NGX_AGAIN
。这时执行流程回到 ngx_http_core_content_phase
中:
-------------http/ngx_http_core_module.c:1302-----------
ngx_int_t
ngx_http_core_content_phase(ngx_http_request_t *r,
ngx_http_phase_handler_t *ph)
{
...
rc = ph->handler(r);
if (rc != NGX_DECLINED) {
ngx_http_finalize_request(r, rc);
return NGX_OK;
}
...
}
------------http/ngx_http_request.c:1840---------------
void
ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
...
if (r->buffered || c->buffered || r->postponed || r->blocked) {
if (ngx_http_set_write_handler(r) != NGX_OK) {
ngx_http_terminate_request(r, 0);
}
return;
}
...
}
ngx_http_set_write_handler
调整 read_event_handler
和 write_event_handler
的值,并且开始监听该请求所在连接上的写事件:
-------------http/ngx_http_request.c:2133--------------
static ngx_int_t
ngx_http_set_write_handler(ngx_http_request_t *r)
{
...
r->read_event_handler = r->discard_body ?
ngx_http_discarded_request_body_handler:
ngx_http_test_reading;
r->write_event_handler = ngx_http_writer;
...
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
ngx_http_close_request(r, 0);
return NGX_ERROR;
}
...
}
当连接上有写事件发生时,事件监听机制调用写事件回调函数 ngx_http_request_handler
,
ngx_http_request_handler
函数又调用 write_event_handler
指向的回调函数
ngx_http_writer
。ngx_http_writer
函数再次尝试将数据发送出去:
---------------http/ngx_http_request.c:2166------------
static void
ngx_http_writer(ngx_http_request_t *r)
{
...
rc = ngx_http_output_filter(r, NULL);
...
if (r->buffered || r->postponed || (r == r->main && c->buffered)) {
if (!wev->ready && !wev->delayed) {
ngx_add_timer(wev, clcf->send_timeout);
}
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
ngx_http_close_request(r, ));
}
return;
}
...
r->write_event_handler = ngx_http_request_empty_handler;
ngx_http_finalize_request(r, rc);
}
如果数据依然未完全发送,ngx_http_writer
函数会再次将写事件加入监听机制中。如果
数据发送完成,尝试再次回收请求结构体资源。
Comments
不要轻轻地离开我,请留下点什么...