1.模块依赖
2. 基础数据结构
2.1 ngx_event_t
struct ngx_event_s {
void *data;
unsigned write:1;
unsigned accept:1;
/* used to detect the stale events in kqueue and epoll */
unsigned instance:1;
/*
* the event was passed or would be passed to a kernel;
* in aio mode - operation was posted.
*/
unsigned active:1;
unsigned disabled:1;
/* the ready event; in aio mode 0 means that no operation can be posted */
unsigned ready:1;
unsigned oneshot:1;
/* aio operation is complete */
unsigned complete:1;
unsigned eof:1;
unsigned error:1;
unsigned timedout:1;
unsigned timer_set:1;
unsigned delayed:1;
unsigned deferred_accept:1;
/* the pending eof reported by kqueue, epoll or in aio chain operation */
unsigned pending_eof:1;
unsigned posted:1;
unsigned closed:1;
/* to test on worker exit */
unsigned channel:1;
unsigned resolver:1;
unsigned cancelable:1;
#if (NGX_HAVE_KQUEUE)
unsigned kq_vnode:1;
/* the pending errno reported by kqueue */
int kq_errno;
#endif
/*
* kqueue only:
* accept: number of sockets that wait to be accepted
* read: bytes to read when event is ready
* or lowat when event is set with NGX_LOWAT_EVENT flag
* write: available space in buffer when event is ready
* or lowat when event is set with NGX_LOWAT_EVENT flag
*
* iocp: TODO
*
* otherwise:
* accept: 1 if accept many, 0 otherwise
* read: bytes to read when event is ready, -1 if not known
*/
int available;
ngx_event_handler_pt handler;
#if (NGX_HAVE_IOCP)
ngx_event_ovlp_t ovlp;
#endif
ngx_uint_t index;
ngx_log_t *log;
ngx_rbtree_node_t timer;
/* the posted queue */
ngx_queue_t queue;
#if 0
/* the threads support */
/*
* the event thread context, we store it here
* if $(CC) does not understand __thread declaration
* and pthread_getspecific() is too costly
*/
void *thr_ctx;
#if (NGX_EVENT_T_PADDING)
/* event should not cross cache line in SMP */
uint32_t padding[NGX_EVENT_T_PADDING];
#endif
#endif
};
data:指向当前连接ngx_connection_t
write:用于标识事件,1表示写事件,缺省时表示读事件
accept:表示是accept事件还是posted事件,值为1表示是accept事件。
instance:在kqueue和epoll时会使用到,表示陈旧事件。初始时为1,在加入到事件监听后,如果检测到事件,发现内核传递回的与用户层的值不一致 ,认为是陈旧事件
active:表示事件是否处理事件监听中。在调用添加事件或者添加连接时,会设置为1,删除事件或者删除连接时,会设置为0
ready:表示事件是否已经就绪。在处理监听事件时,调用具体的handler或者加入到队列之前
disabled:用于kqueue情况下
oneshot:用于kqueue和eventport情况
complete:用于异步aio事件的处理
2.2 ngx_http_connection_t
struct ngx_connection_s {
void *data;
ngx_event_t *read;
ngx_event_t *write;
ngx_socket_t fd;
ngx_recv_pt recv;
ngx_send_pt send;
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening;
off_t sent;
ngx_log_t *log;
ngx_pool_t *pool;
int type;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
ngx_proxy_protocol_t *proxy_protocol;
#if (NGX_SSL || NGX_COMPAT)
ngx_ssl_connection_t *ssl;
#endif
ngx_udp_connection_t *udp;
struct sockaddr *local_sockaddr;
socklen_t local_socklen;
ngx_buf_t *buffer;
ngx_queue_t queue;
ngx_atomic_uint_t number;
ngx_msec_t start_time;
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned pipeline:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned shared:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
unsigned need_last_buf:1;
unsigned need_flush_buf:1;
#if (NGX_HAVE_SENDFILE_NODISKIO || NGX_COMPAT)
unsigned busy_count:2;
#endif
#if (NGX_THREADS || NGX_COMPAT)
ngx_thread_task_t *sendfile_task;
#endif
};
3. 模块的初始化
事件模块的初始化流程为
配置的创建是通过类型为ngx_command_t的ngx_events_commands在处理 events配置块时创建的,即通过ngx_events_block。
在ngx_count_modules中会计算在类型为NGX_EVENT_MODULE的模块中的下标,来填充ctx_index.
static char *
ngx_events_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rv;
void ***ctx;
ngx_uint_t i;
ngx_conf_t pcf;
ngx_event_module_t *m;
if (*(void **) conf) {
return "is duplicate";
}
/* count the number of the event modules and set up their indices */
ngx_event_max_module = ngx_count_modules(cf->cycle, NGX_EVENT_MODULE);
ctx = ngx_pcalloc(cf->pool, sizeof(void *));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}
*ctx = ngx_pcalloc(cf->pool, ngx_event_max_module * sizeof(void *));
if (*ctx == NULL) {
return NGX_CONF_ERROR;
}
*(void **) conf = ctx;
for (i = 0; cf->cycle->modules[i]; i++) {
if (cf->cycle->modules[i]->type != NGX_EVENT_MODULE) {
continue;
}
m = cf->cycle->modules[i]->ctx;
if (m->create_conf) {
(*ctx)[cf->cycle->modules[i]->ctx_index] =
m->create_conf(cf->cycle);
if ((*ctx)[cf->cycle->modules[i]->ctx_index] == NULL) {
return NGX_CONF_ERROR;
}
}
}
pcf = *cf;
cf->ctx = ctx;
cf->module_type = NGX_EVENT_MODULE;
cf->cmd_type = NGX_EVENT_CONF;
rv = ngx_conf_parse(cf, NULL);
*cf = pcf;
if (rv != NGX_CONF_OK) {
return rv;
}
for (i = 0; cf->cycle->modules[i]; i++) {
if (cf->cycle->modules[i]->type != NGX_EVENT_MODULE) {
continue;
}
m = cf->cycle->modules[i]->ctx;
if (m->init_conf) {
rv = m->init_conf(cf->cycle,
(*ctx)[cf->cycle->modules[i]->ctx_index]);
if (rv != NGX_CONF_OK) {
return rv;
}
}
}
return NGX_CONF_OK;
}
3.1 ngx_event_module_init
检查事件配置中的连接数connections是否超过了资源限制rlim_cur或者核心配置中的rlimit_nofile
如果核心配置已经开启了master,则分配共享内存用于多进程的accept
static ngx_int_t
ngx_event_module_init(ngx_cycle_t *cycle)
{
void ***cf;
u_char *shared;
size_t size, cl;
ngx_shm_t shm;
ngx_time_t *tp;
ngx_core_conf_t *ccf;
ngx_event_conf_t *ecf;
cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module);
ecf = (*cf)[ngx_event_core_module.ctx_index];
if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"using the \"%s\" event method", ecf->name);
}
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ngx_timer_resolution = ccf->timer_resolution;
#if !(NGX_WIN32)
{
ngx_int_t limit;
struct rlimit rlmt;
if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"getrlimit(RLIMIT_NOFILE) failed, ignored");
} else {
if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur
&& (ccf->rlimit_nofile == NGX_CONF_UNSET
|| ecf->connections > (ngx_uint_t) ccf->rlimit_nofile))
{
limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ?
(ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;
ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
"%ui worker_connections exceed "
"open file resource limit: %i",
ecf->connections, limit);
}
}
}
#endif /* !(NGX_WIN32) */
if (ccf->master == 0) {
return NGX_OK;
}
if (ngx_accept_mutex_ptr) {
return NGX_OK;
}
/* cl should be equal to or greater than cache line size */
cl = 128;
size = cl /* ngx_accept_mutex */
+ cl /* ngx_connection_counter */
+ cl; /* ngx_temp_number */
#if (NGX_STAT_STUB)
size += cl /* ngx_stat_accepted */
+ cl /* ngx_stat_handled */
+ cl /* ngx_stat_requests */
+ cl /* ngx_stat_active */
+ cl /* ngx_stat_reading */
+ cl /* ngx_stat_writing */
+ cl; /* ngx_stat_waiting */
#endif
shm.size = size;
ngx_str_set(&shm.name, "nginx_shared_zone");
shm.log = cycle->log;
if (ngx_shm_alloc(&shm) != NGX_OK) {
return NGX_ERROR;
}
shared = shm.addr;
ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
ngx_accept_mutex.spin = (ngx_uint_t) -1;
if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
cycle->lock_file.data)
!= NGX_OK)
{
return NGX_ERROR;
}
ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);
(void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"counter: %p, %uA",
ngx_connection_counter, *ngx_connection_counter);
ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);
tp = ngx_timeofday();
ngx_random_number = (tp->msec << 16) + ngx_pid;
#if (NGX_STAT_STUB)
ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl);
ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl);
ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl);
ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl);
ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl);
ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl);
ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl);
#endif
return NGX_OK;
}
3.2 ngx_event_process_init
检查核心配置中是否开启master,配置的worker_processes是否大于1,以及事件配置中是否开启accept_mutex
初始化ngx_posted_accept_events, ngx_posted_next_events和ngx_posted_events队到
初始化ngx_event_timer_rbtree定时器
根据事件配置中的use参数,调用对应的actions.init初始化事件驱动框架
分配连接池,读事件池,写事件池,初始化空闲连接链表
为监听套接字分配连接,设置事件的accept为1,初始化事件的handler,添加到事件驱动框架中
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
ngx_uint_t m, i;
ngx_event_t *rev, *wev;
ngx_listening_t *ls;
ngx_connection_t *c, *next, *old;
ngx_core_conf_t *ccf;
ngx_event_conf_t *ecf;
ngx_event_module_t *module;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);
if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
ngx_use_accept_mutex = 1;
ngx_accept_mutex_held = 0;
ngx_accept_mutex_delay = ecf->accept_mutex_delay;
} else {
ngx_use_accept_mutex = 0;
}
#if (NGX_WIN32)
/*
* disable accept mutex on win32 as it may cause deadlock if
* grabbed by a process which can't accept connections
*/
ngx_use_accept_mutex = 0;
#endif
ngx_use_exclusive_accept = 0;
ngx_queue_init(&ngx_posted_accept_events);
ngx_queue_init(&ngx_posted_next_events);
ngx_queue_init(&ngx_posted_events);
if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
return NGX_ERROR;
}
for (m = 0; cycle->modules[m]; m++) {
if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
continue;
}
if (cycle->modules[m]->ctx_index != ecf->use) {
continue;
}
module = cycle->modules[m]->ctx;
if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
/* fatal */
exit(2);
}
break;
}
#if !(NGX_WIN32)
if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
struct sigaction sa;
struct itimerval itv;
ngx_memzero(&sa, sizeof(struct sigaction));
sa.sa_handler = ngx_timer_signal_handler;
sigemptyset(&sa.sa_mask);
if (sigaction(SIGALRM, &sa, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigaction(SIGALRM) failed");
return NGX_ERROR;
}
itv.it_interval.tv_sec = ngx_timer_resolution / 1000;
itv.it_interval.tv_usec = (ngx_timer_resolution % 1000) * 1000;
itv.it_value.tv_sec = ngx_timer_resolution / 1000;
itv.it_value.tv_usec = (ngx_timer_resolution % 1000 ) * 1000;
if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setitimer() failed");
}
}
if (ngx_event_flags & NGX_USE_FD_EVENT) {
struct rlimit rlmt;
if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"getrlimit(RLIMIT_NOFILE) failed");
return NGX_ERROR;
}
cycle->files_n = (ngx_uint_t) rlmt.rlim_cur;
cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n,
cycle->log);
if (cycle->files == NULL) {
return NGX_ERROR;
}
}
#else
if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
"the \"timer_resolution\" directive is not supported "
"with the configured event method, ignored");
ngx_timer_resolution = 0;
}
#endif
cycle->connections =
ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
if (cycle->connections == NULL) {
return NGX_ERROR;
}
c = cycle->connections;
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->read_events == NULL) {
return NGX_ERROR;
}
rev = cycle->read_events;
for (i = 0; i < cycle->connection_n; i++) {
rev[i].closed = 1;
rev[i].instance = 1;
}
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->write_events == NULL) {
return NGX_ERROR;
}
wev = cycle->write_events;
for (i = 0; i < cycle->connection_n; i++) {
wev[i].closed = 1;
}
i = cycle->connection_n;
next = NULL;
do {
i--;
c[i].data = next;
c[i].read = &cycle->read_events[i];
c[i].write = &cycle->write_events[i];
c[i].fd = (ngx_socket_t) -1;
next = &c[i];
} while (i);
cycle->free_connections = next;
cycle->free_connection_n = cycle->connection_n;
/* for each listening socket */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport && ls[i].worker != ngx_worker) {
continue;
}
#endif
c = ngx_get_connection(ls[i].fd, cycle->log);
if (c == NULL) {
return NGX_ERROR;
}
c->type = ls[i].type;
c->log = &ls[i].log;
c->listening = &ls[i];
ls[i].connection = c;
rev = c->read;
rev->log = c->log;
rev->accept = 1;
#if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls[i].deferred_accept;
#endif
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)
&& cycle->old_cycle)
{
if (ls[i].previous) {
/*
* delete the old accept events that were bound to
* the old cycle read events array
*/
old = ls[i].previous->connection;
if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
old->fd = (ngx_socket_t) -1;
}
}
#if (NGX_WIN32)
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
ngx_iocp_conf_t *iocpcf;
rev->handler = ngx_event_acceptex;
if (ngx_use_accept_mutex) {
continue;
}
if (ngx_add_event(rev, 0, NGX_IOCP_ACCEPT) == NGX_ERROR) {
return NGX_ERROR;
}
ls[i].log.handler = ngx_acceptex_log_error;
iocpcf = ngx_event_get_conf(cycle->conf_ctx, ngx_iocp_module);
if (ngx_event_post_acceptex(&ls[i], iocpcf->post_acceptex)
== NGX_ERROR)
{
return NGX_ERROR;
}
} else {
rev->handler = ngx_event_accept;
if (ngx_use_accept_mutex) {
continue;
}
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}
#else
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport) {
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
continue;
}
#endif
if (ngx_use_accept_mutex) {
continue;
}
#if (NGX_HAVE_EPOLLEXCLUSIVE)
if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
&& ccf->worker_processes > 1)
{
ngx_use_exclusive_accept = 1;
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
continue;
}
#endif
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
#endif
}
return NGX_OK;
}
4. 事件处理
逻辑主要是ngx_process_events_and_timers函数中
对于master模式下时,需要使用accept锁时,对于获取到锁worker进程,会设置NGX_POST_EVENTS标识,如果读事件是连接类型,则将事件放入ngx_posted_accept_events队列中,否则放入ngx_posted_events
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
}
//ngx_epoll_module
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
处理完ngx_posted_accept_events队列中的事件后,获取到accept_mutex锁的需要释放锁,然后处理ngx_posted_events队列中的事件
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
ngx_event_expire_timers();
ngx_event_process_posted(cycle, &ngx_posted_events);
4.1 连接事件的处理
在处理连接到来时的事件时(ngx_event_accept),先将当前事件的ready标识设置为0。调用accept后在为新的端口分配连接时,在从空闲连接池中获取连接对象时,会将数据清0,对于事件驱动类型为iocp或者事件是延时accept事件时,会将新分配连接的读事件的ready标识设置为1
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
rev->ready = 1;
}
if (ev->deferred_accept) {
rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
rev->available = 1;
#endif
}
对于没有将ready标识设置为1,后面是在哪里将ready标识设置为1的呢?
对ngx_epoll_module为例,是在ngx_epoll_process_events函数中,当使用epoll_wait返回监听到的事件时,在判断当前读事件是激活时,会将ready状态设置为1
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
#endif
rev->ready = 1;
rev->available = -1;
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
那是在什么时候将ready设置为0的呢?
在为新的连接分配连接对象时,会设置连接对象的recv回调,当recv回调执行时,读取结束后,会将ready状态设置为0
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
ngx_unix_recv在从套接字上读取数据时
当对端关闭时,会将ready状态设置为0
读取直到遇到中断错误时,会将ready状态设置为0
ssize_t
ngx_unix_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
ssize_t n;
ngx_err_t err;
ngx_event_t *rev;
rev = c->read;
#if (NGX_HAVE_KQUEUE)
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"recv: eof:%d, avail:%d, err:%d",
rev->pending_eof, rev->available, rev->kq_errno);
if (rev->available == 0) {
if (rev->pending_eof) {
rev->ready = 0;
rev->eof = 1;
if (rev->kq_errno) {
rev->error = 1;
ngx_set_socket_errno(rev->kq_errno);
return ngx_connection_error(c, rev->kq_errno,
"kevent() reported about an closed connection");
}
return 0;
} else {
rev->ready = 0;
return NGX_AGAIN;
}
}
}
#endif
#if (NGX_HAVE_EPOLLRDHUP)
if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
&& ngx_use_epoll_rdhup)
{
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"recv: eof:%d, avail:%d",
rev->pending_eof, rev->available);
if (rev->available == 0 && !rev->pending_eof) {
rev->ready = 0;
return NGX_AGAIN;
}
}
#endif
do {
n = recv(c->fd, buf, size, 0);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"recv: fd:%d %z of %uz", c->fd, n, size);
if (n == 0) {
rev->ready = 0;
rev->eof = 1;
#if (NGX_HAVE_KQUEUE)
/*
* on FreeBSD recv() may return 0 on closed socket
* even if kqueue reported about available data
*/
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
rev->available = 0;
}
#endif
return 0;
}
if (n > 0) {
#if (NGX_HAVE_KQUEUE)
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
rev->available -= n;
/*
* rev->available may be negative here because some additional
* bytes may be received between kevent() and recv()
*/
if (rev->available <= 0) {
if (!rev->pending_eof) {
rev->ready = 0;
}
rev->available = 0;
}
return n;
}
#endif
#if (NGX_HAVE_FIONREAD)
if (rev->available >= 0) {
rev->available -= n;
/*
* negative rev->available means some additional bytes
* were received between kernel notification and recv(),
* and therefore ev->ready can be safely reset even for
* edge-triggered event methods
*/
if (rev->available < 0) {
rev->available = 0;
rev->ready = 0;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"recv: avail:%d", rev->available);
} else if ((size_t) n == size) {
if (ngx_socket_nread(c->fd, &rev->available) == -1) {
n = ngx_connection_error(c, ngx_socket_errno,
ngx_socket_nread_n " failed");
break;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"recv: avail:%d", rev->available);
}
#endif
#if (NGX_HAVE_EPOLLRDHUP)
if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
&& ngx_use_epoll_rdhup)
{
if ((size_t) n < size) {
if (!rev->pending_eof) {
rev->ready = 0;
}
rev->available = 0;
}
return n;
}
#endif
if ((size_t) n < size
&& !(ngx_event_flags & NGX_USE_GREEDY_EVENT))
{
rev->ready = 0;
}
return n;
}
err = ngx_socket_errno;
if (err == NGX_EAGAIN || err == NGX_EINTR) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
"recv() not ready");
n = NGX_AGAIN;
} else {
n = ngx_connection_error(c, err, "recv() failed");
break;
}
} while (err == NGX_EINTR);
rev->ready = 0;
if (n == NGX_ERROR) {
rev->error = 1;
}
return n;
}