事件驱动函数 -- ngx_process_events_and_timers

2015-03-16 08:45:16   最后更新: 2015-08-05 23:42:47   访问数量:1639




在 worker 的主循环中,主要工作是在不断的循环调用 ngx_process_events_and_timers 函数

这个函数就是事件驱动函数,主要完成以下工作:

  1. 调用事件驱动模块实现的 ngx_epoll_process_events 函数(ngx_process_events 宏)处理网络事件
  2. 调用 ngx_event_process_posted 函数处理 ngx_posted_accept_events 和 ngx_posted_events 两个队列中的事件
  3. 调用 ngx_event_expire_timers 函数处理定时器事件

 

// static ngx_int_t // ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, // ngx_uint_t flags) // epoll 事件处理函数 {{{ static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); // epoll_wait,获取发生的事件列表 event_list events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } // 出错处理 if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } // 超时 if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } // 如果 timer 为 -1(永久阻塞),则说明调用出错 ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } // 事件处理 for (i = 0; i < events; i++) { c = event_list[i].data.ptr; // 无论是 32 位还是 64 位系统,地址的最低位一定是0 // 因此,nginx 使用连接地址的最后一位保存 instance 变量 // // 这里取出 instance 变量,并恢复地址 instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } if ((revents & EPOLLIN) && rev->active) { #if (NGX_HAVE_EPOLLRDHUP) if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } #endif rev->ready = 1; if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; // 如果进程正在占用锁,则先将事件缓存在队列中 ngx_post_event(rev, queue); } else { // 调用回调函数,accept 及事件处理 rev->handler(rev); } } wev = c->write; if ((revents & EPOLLOUT) && wev->active) { if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } wev->ready = 1; if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return NGX_OK; } // }}}

 

 

上述代码进行了丰富的注释,这个函数主要的工作就是执行 epoll_wait 并处理所有事件

需要注意的就是,nginx 最核心的机制 -- 异步非阻塞的实现:

如果进程正在占用锁,则在获取锁之后为 flags 添加了 NGX_POST_EVENTS 标识,那么,在 ngx_epoll_process_events 函数中,仅仅将事件插入相应的队列,而不进行处理,只有等到进程空闲的时候,即释放锁以后或没有获得锁的某次调用才会去处理事件,这么做大大提高了运行效率

 

// void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) // 队列事件的处理 {{{ void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); ngx_delete_posted_event(ev); ev->handler(ev); } } // }}}

 

这段代码在释放锁之后执行,将队列中的事件取出处理,执行事件相应的 handler 回调函数

 

// void ngx_event_expire_timers(void) // 定时器事件处理 {{{ void ngx_event_expire_timers(void) { ngx_event_t *ev; ngx_rbtree_node_t *node, *root, *sentinel; sentinel = ngx_event_timer_rbtree.sentinel; for ( ;; ) { ngx_mutex_lock(ngx_event_timer_mutex); root = ngx_event_timer_rbtree.root; if (root == sentinel) { return; } node = ngx_rbtree_min(root, sentinel); /* node->key <= ngx_current_time */ if ((ngx_msec_int_t) (node->key - ngx_current_msec) <= 0) { ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer)); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "event timer del: %d: %M", ngx_event_ident(ev->data), ev->timer.key); ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); ngx_mutex_unlock(ngx_event_timer_mutex); #if (NGX_DEBUG) ev->timer.left = NULL; ev->timer.right = NULL; ev->timer.parent = NULL; #endif ev->timer_set = 0; ev->timedout = 1; ev->handler(ev); continue; } break; } ngx_mutex_unlock(ngx_event_timer_mutex); } // }}}

 

这个函数中,将所有超时事件取出并循环处理

 

 

 






技术帖      linux      unix      网络      龙潭书斋      epoll      nginx      事件      events      epoll_wait      accept      回调函数      异步非阻塞     


京ICP备15018585号