nginx worker 初始化

2015-02-02 00:06:59   最后更新: 2015-05-10 17:13:00   访问数量:1872




worker 进程启动后,首先进行的是初始化工作

 

 

 

// static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) // 初始化 worker {{{ static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) { sigset_t set; uint64_t cpu_affinity; ngx_int_t n; ngx_uint_t i; struct rlimit rlmt; ngx_core_conf_t *ccf; ngx_listening_t *ls; // 设定环境变量 if (ngx_set_environment(cycle, NULL) == NULL) { /* fatal */ exit(2); } ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 设置进程优先级 if (worker >= 0 && ccf->priority != 0) { if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setpriority(%d) failed", ccf->priority); } } // 设置进程的各种资源限制 if (ccf->rlimit_nofile != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile; rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile; if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_NOFILE, %i) failed", ccf->rlimit_nofile); } } if (ccf->rlimit_core != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_core; rlmt.rlim_max = (rlim_t) ccf->rlimit_core; if (setrlimit(RLIMIT_CORE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_CORE, %O) failed", ccf->rlimit_core); } } #ifdef RLIMIT_SIGPENDING if (ccf->rlimit_sigpending != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_sigpending; rlmt.rlim_max = (rlim_t) ccf->rlimit_sigpending; if (setrlimit(RLIMIT_SIGPENDING, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_SIGPENDING, %i) failed", ccf->rlimit_sigpending); } } #endif // 是否是 root 权限 if (geteuid() == 0) { if (setgid(ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setgid(%d) failed", ccf->group); /* fatal */ exit(2); } // 初始化组清单 if (initgroups(ccf->username, ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "initgroups(%s, %d) failed", ccf->username, ccf->group); } // 将执行权限设置为当前用户 if (setuid(ccf->user) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setuid(%d) failed", ccf->user); /* fatal */ exit(2); } } if (worker >= 0) { // 获取 CPU 相关信息 cpu_affinity = ngx_get_cpu_affinity(worker); if (cpu_affinity) { // 将进程绑定到一个 CPU 核心上 ngx_setaffinity(cpu_affinity, cycle->log); } } #if (NGX_HAVE_PR_SET_DUMPABLE) /* allow coredump after setuid() in Linux 2.4.x */ // 生成核心转储文件 if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "prctl(PR_SET_DUMPABLE) failed"); } #endif // 改变当前工作目录到指定目录 if (ccf->working_directory.len) { if (chdir((char *) ccf->working_directory.data) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "chdir(\"%s\") failed", ccf->working_directory.data); /* fatal */ exit(2); } } sigemptyset(&set); if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); } // 初始化随机数种子发生器 srandom((ngx_pid << 16) ^ ngx_time()); /* * disable deleting previous events for the listening sockets because * in the worker processes there are no events at all at this point */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ls[i].previous = NULL; } // 初始化模块 for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->init_process) { if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } // 关闭父进程与其他子进程通信的域套接字 fd for (n = 0; n < ngx_last_process; n++) { if (ngx_processes[n].pid == -1) { continue; } if (n == ngx_process_slot) { continue; } if (ngx_processes[n].channel[1] == -1) { continue; } if (close(ngx_processes[n].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } } if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } #if 0 ngx_last_process = 0; #endif // 给 ngx_channel 注册一个读事件处理函数 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } } // }}}

 

主要进行了以下设置:

  1. 根据 cycle 中的参数设置环境变量
  2. 设置当前的进程优先级
  3. 根据 cycle 中的参数设置进程的资源限制
  4. 赋予进程执行权限
  5. 将 worker 绑定到固定的 CPU,提高 CPU 缓存命中率
  6. 设置在异常情况下生成核心转储文件
  7. 改变当前工作目录
  8. 清空信号屏蔽字
  9. 初始化随即种子发生器,同样是为了便于后续调试的
  10. 初始化各个模块(ngx_event_process_init)
  11. 关闭父进程与其他子进程通信的域套接字 fd
  12. 给 ngx_channel 注册一个读事件处理函数

 

在 ngx_spawn_process 函数创建子进程后,将全局变量 ngx_channel 赋值为了:

ngx_channel = ngx_processes[s].channel[1];

 

 

在整个初始化过程中,最重要的一个过程是循环调用各个模块的 init_process 回调函数,其中,事件模块的 init_process 函数指针指向的是 ngx_event_process_init 这个函数

// static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) // 事件模块初始化 {{{ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ngx_uint_t m, i; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *next, *old; ngx_core_conf_t *ccf; ngx_event_conf_t *ecf; ngx_event_module_t *module; ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module); if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } #if (NGX_WIN32) /* * disable accept mutex on win32 as it may cause deadlock if * grabbed by a process which can't accept connections */ ngx_use_accept_mutex = 0; #endif ngx_queue_init(&ngx_posted_accept_events); ngx_queue_init(&ngx_posted_events); if (ngx_event_timer_init(cycle->log) == NGX_ERROR) { return NGX_ERROR; } for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_EVENT_MODULE) { continue; } if (ngx_modules[m]->ctx_index != ecf->use) { continue; } module = ngx_modules[m]->ctx; if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { /* fatal */ exit(2); } break; } #if !(NGX_WIN32) if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) { struct sigaction sa; struct itimerval itv; ngx_memzero(&sa, sizeof(struct sigaction)); sa.sa_handler = ngx_timer_signal_handler; sigemptyset(&sa.sa_mask); if (sigaction(SIGALRM, &sa, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigaction(SIGALRM) failed"); return NGX_ERROR; } itv.it_interval.tv_sec = ngx_timer_resolution / 1000; itv.it_interval.tv_usec = (ngx_timer_resolution % 1000) * 1000; itv.it_value.tv_sec = ngx_timer_resolution / 1000; itv.it_value.tv_usec = (ngx_timer_resolution % 1000 ) * 1000; if (setitimer(ITIMER_REAL, &itv, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setitimer() failed"); } } if (ngx_event_flags & NGX_USE_FD_EVENT) { struct rlimit rlmt; if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "getrlimit(RLIMIT_NOFILE) failed"); return NGX_ERROR; } cycle->files_n = (ngx_uint_t) rlmt.rlim_cur; cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n, cycle->log); if (cycle->files == NULL) { return NGX_ERROR; } } #endif cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections == NULL) { return NGX_ERROR; } c = cycle->connections; cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events == NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events == NULL) { return NGX_ERROR; } wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; #if (NGX_THREADS) c[i].lock = 0; #endif } while (i); cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n; /* for each listening socket */ ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ngx_get_connection(ls[i].fd, cycle->log); if (c == NULL) { return NGX_ERROR; } c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1; #if (NGX_HAVE_DEFERRED_ACCEPT) rev->deferred_accept = ls[i].deferred_accept; #endif if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ls[i].previous) { /* * delete the old accept events that were bound to * the old cycle read events array */ old = ls[i].previous->connection; if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT) == NGX_ERROR) { return NGX_ERROR; } old->fd = (ngx_socket_t) -1; } } #if (NGX_WIN32) if (ngx_event_flags & NGX_USE_IOCP_EVENT) { ngx_iocp_conf_t *iocpcf; rev->handler = ngx_event_acceptex; if (ngx_use_accept_mutex) { continue; } if (ngx_add_event(rev, 0, NGX_IOCP_ACCEPT) == NGX_ERROR) { return NGX_ERROR; } ls[i].log.handler = ngx_acceptex_log_error; iocpcf = ngx_event_get_conf(cycle->conf_ctx, ngx_iocp_module); if (ngx_event_post_acceptex(&ls[i], iocpcf->post_acceptex) == NGX_ERROR) { return NGX_ERROR; } } else { rev->handler = ngx_event_accept; if (ngx_use_accept_mutex) { continue; } if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } #else rev->handler = ngx_event_accept; if (ngx_use_accept_mutex) { continue; } if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { if (ngx_add_conn(c) == NGX_ERROR) { return NGX_ERROR; } } else { if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } #endif } return NGX_OK; } // }}}

 

 

获取配置信息

函数首先获取了 core 模块的配置信息

{ daemon = 1, master = 1, timer_resolution = 0, worker_processes = 1, debug_points = 0, rlimit_nofile = -1, rlimit_sigpending = -1, rlimit_core = -1, priority = 0, cpu_affinity_n = 0, cpu_affinity = 0x0, username = 0x0, user = 4294967295, group = 4294967295, working_directory = { len = 0, data = 0x0 }, lock_file = { len = 49, data = 0x810ff9c "/home/zeyu/Workspace/nginx-1.7.7/" }, pid = { len = 24, data = 0x80ba016 "/var/run/nginx/nginx.pid" }, oldpid = { len = 32, data = 0x810ff7c "/var/run/nginx/nginx.pid.oldbin" }, env = { elts = 0x8100088, nelts = 1, size = 8, nalloc = 1, pool = 0x80ff700 }, environment = 0x8110044 }

 

 

然后,获取了 event 模块的配置信息

{ connections = 1024, use = 1, multi_accept = 0, accept_mutex = 1, accept_mutex_delay = 500, name = 0x80bdb09 "epoll" }

 

 

初始化发送接收队列

然后调用 ngx_queue_init 初始化了 ngx_posted_accept_events 和 ngx_posted_events 两个队列

#define ngx_queue_init(q) \ (q)->prev = q; \ (q)->next = q

 

 

定时器(红黑树)初始化 -- ngx_event_timer_init

在这之后,函数调用 ngx_event_timer_init 初始化了一个红黑树,用来实现定时器

#define ngx_rbtree_init(tree, s, i) \ ngx_rbtree_sentinel_init(s); \ (tree)->root = s; \ (tree)->sentinel = s; \ (tree)->insert = i // ngx_int_t ngx_event_timer_init(ngx_log_t *log) // 定时器初始化 {{{ ngx_int_t ngx_event_timer_init(ngx_log_t *log) { ngx_rbtree_init(&ngx_event_timer_rbtree, &ngx_event_timer_sentinel, ngx_rbtree_insert_timer_value); #if (NGX_THREADS) if (ngx_event_timer_mutex) { ngx_event_timer_mutex->log = log; return NGX_OK; } ngx_event_timer_mutex = ngx_mutex_init(log, 0); if (ngx_event_timer_mutex == NULL) { return NGX_ERROR; } #endif return NGX_OK; } // }}}

 

关于红黑树的构造及插入、删除算法,可以参看:

红黑树

 

nginx 使用红黑树完成计时功能

由于红黑树是一个优化的二叉查找树,他的最大深度叶子的深度不会大于他最小深度叶子深度的2倍,所以,只要中序遍历即可快速找出超时的事件

 

epoll 模块初始化 -- ngx_epoll_init

在接下来的代码中,调用了 NGX_EVENT_MODULE 类型模块的 actions.init 回调函数

由于只有 ngx_event_core_module 和 ngx_epoll_module 是 NGX_EVENT_MODULE类型的,所以这里进行的是 epoll 模块的初始化

for (m = 0; ngx_modules[m]; m++) { // 只有 ngx_event_core_module 和 ngx_epoll_module 是 NGX_EVENT_MODULE类型的 if (ngx_modules[m]->type != NGX_EVENT_MODULE) { continue; } if (ngx_modules[m]->ctx_index != ecf->use) { continue; } module = ngx_modules[m]->ctx; // 初始化 epoll 模块 // 在 src/event/modules/ngx_epoll_module.c 中的 ngx_epoll_init 函数 if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { /* fatal */ exit(2); } break; }

 

// union epoll_data_t // epoll 信息记录联合体 {{{ typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; // }}} // struct epoll_event // epoll 描述结构 {{{ struct epoll_event { uint32_t events; epoll_data_t data; }; // }}} // static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) // epoll 模块初始化 {{{ static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (ep == -1) { // 创建 epoll fd ep = epoll_create(cycle->connection_n / 2); if (ep == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } #if (NGX_HAVE_FILE_AIO) // 设置为异步输出 ngx_epoll_aio_init(cycle, epcf); #endif } if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list == NULL) { return NGX_ERROR; } } nevents = epcf->events; ngx_io = ngx_os_io; ngx_event_actions = ngx_epoll_module_ctx.actions; #if (NGX_HAVE_CLEAR_EVENT) ngx_event_flags = NGX_USE_CLEAR_EVENT #else ngx_event_flags = NGX_USE_LEVEL_EVENT #endif |NGX_USE_GREEDY_EVENT |NGX_USE_EPOLL_EVENT; return NGX_OK; } // }}}

 

这段代码创建了一个 epoll fd(全局变量 ep),监听数目为 cycle->connection_n / 2

同时,创建了一个监听事件列表(全局变量 event_list)

 

有关 epoll 的使用可以参看:

epoll 的使用

 

连接池、事件描述结构的创建和初始化

// 创建连接池 cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections == NULL) { return NGX_ERROR; } c = cycle->connections; // 创建事件描述结构 cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events == NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events == NULL) { return NGX_ERROR; } wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; #if (NGX_THREADS) c[i].lock = 0; #endif } while (i); cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n;

 

 

// struct ngx_event_s // 事件描述结构体 {{{ struct ngx_event_s { void *data; // 事件上下文数据 unsigned write:1; unsigned accept:1; /* used to detect the stale events in kqueue, rtsig, and epoll */ unsigned instance:1; /* * the event was passed or would be passed to a kernel; * in aio mode - operation was posted. */ unsigned active:1; unsigned disabled:1; /* the ready event; in aio mode 0 means that no operation can be posted */ unsigned ready:1; // 用于异步IO,当有请求需要处理时置位 unsigned oneshot:1; /* aio operation is complete */ unsigned complete:1; unsigned eof:1; unsigned error:1; unsigned timedout:1; unsigned timer_set:1; unsigned delayed:1; unsigned deferred_accept:1; /* the pending eof reported by kqueue, epoll or in aio chain operation */ unsigned pending_eof:1; unsigned posted:1; #if (NGX_WIN32) /* setsockopt(SO_UPDATE_ACCEPT_CONTEXT) was successful */ unsigned accept_context_updated:1; #endif #if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1; /* the pending errno reported by kqueue */ int kq_errno; #endif /* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise */ #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) int available; #else unsigned available:1; #endif ngx_event_handler_pt handler; #if (NGX_HAVE_AIO) #if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp; #else struct aiocb aiocb; #endif #endif ngx_uint_t index; ngx_log_t *log; ngx_rbtree_node_t timer; /* the posted queue */ ngx_queue_t queue; unsigned closed:1; /* to test on worker exit */ unsigned channel:1; unsigned resolver:1; #if 0 /* the threads support */ /* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */ void *thr_ctx; #if (NGX_EVENT_T_PADDING) /* event should not cross cache line in SMP */ uint32_t padding[NGX_EVENT_T_PADDING]; #endif #endif }; // }}}

 

 

这段代码创建了连接池,连接池由已分配连接池 connections、connection_n 与空闲连接池 free_connections、free_connection_n 四个描述字段一起构成,用来分配和维护所有的连接

 

而读写事件则由 read_events 和 write_events 两个数组存储

 

这个函数给 ngx_channel 注册了一个读事件处理函数

// ngx_int_t // ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, // ngx_event_handler_pt handler) // 为 fd 注册回调函数 {{{ ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler) { ngx_event_t *ev, *rev, *wev; ngx_connection_t *c; c = ngx_get_connection(fd, cycle->log); if (c == NULL) { return NGX_ERROR; } c->pool = cycle->pool; rev = c->read; wev = c->write; rev->log = cycle->log; wev->log = cycle->log; rev->channel = 1; wev->channel = 1; ev = (event == NGX_READ_EVENT) ? rev : wev; ev->handler = handler; if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_free_connection(c); return NGX_ERROR; } } else { if (ngx_add_event(ev, event, 0) == NGX_ERROR) { ngx_free_connection(c); return NGX_ERROR; } } return NGX_OK; } // }}}

 

 

在连接池中分配并初始化连接 -- ngx_get_connection

函数首先调用 ngx_get_connection 函数进行了连接在连接池中的分配和初始化

// ngx_connection_t * ngx_get_connection(ngx_socket_t s, ngx_log_t *log) // 从空闲 connection 中取出一个关联 sockfd,并初始化 {{{ ngx_connection_t * ngx_get_connection(ngx_socket_t s, ngx_log_t *log) { ngx_uint_t instance; ngx_event_t *rev, *wev; ngx_connection_t *c; /* disable warning: Win32 SOCKET is u_int while UNIX socket is int */ // fd 大于了系统最大fd限制 if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) { ngx_log_error(NGX_LOG_ALERT, log, 0, "the new socket has number %d, " "but only %ui files are available", s, ngx_cycle->files_n); return NULL; } /* ngx_mutex_lock */ // 连接池 c = ngx_cycle->free_connections; if (c == NULL) { // 连接池已经分配完,释放长连接 ngx_drain_connections(); c = ngx_cycle->free_connections; } if (c == NULL) { ngx_log_error(NGX_LOG_ALERT, log, 0, "%ui worker_connections are not enough", ngx_cycle->connection_n); /* ngx_mutex_unlock */ return NULL; } ngx_cycle->free_connections = c->data; ngx_cycle->free_connection_n--; /* ngx_mutex_unlock */ if (ngx_cycle->files) { ngx_cycle->files[s] = c; } rev = c->read; wev = c->write; ngx_memzero(c, sizeof(ngx_connection_t)); c->read = rev; c->write = wev; c->fd = s; c->log = log; instance = rev->instance; ngx_memzero(rev, sizeof(ngx_event_t)); ngx_memzero(wev, sizeof(ngx_event_t)); rev->instance = !instance; wev->instance = !instance; rev->index = NGX_INVALID_INDEX; wev->index = NGX_INVALID_INDEX; rev->data = c; wev->data = c; wev->write = 1; return c; } // }}}

 

进行了如下步骤:

  1. 判断 fd 是否符合系统要求
  2. 取出连接,如连接池已满,则调用 ngx_drain_connections 函数释放最先进入链表的长连接
  3. 初始化连接结构

 

nginx 连接是 ngx_connection_t 结构类型

// struct ngx_connection_s // nginx 连接结构 {{{ struct ngx_connection_s { // 连接未使用时,充当连接池空闲链表中的 next 指针 // 连接使用后,由模块定义其意义 // HTTP 模块中,data 指向 ngx_http_request_t void *data; // 连接对应的读事件 ngx_event_t *read; // 连接对应的写事件 ngx_event_t *write; // 连接 fd ngx_socket_t fd; // 直接接收网络字符流的方法 ngx_recv_pt recv; // 直接发送网络字符流的方法 ngx_send_pt send; // 以链表来接收网络字符流的方法 ngx_recv_chain_pt recv_chain; // 以链表来发送网络字符流的方法 ngx_send_chain_pt send_chain; // 监听对象,此连接由listening监听端口的事件建立 ngx_listening_t *listening; // 这个连接上已发送的字节数 off_t sent; ngx_log_t *log; // 一般在accept一个新的连接时,会创建一个内存池 // 而在这个连接结束时会销毁内存池 // 内存池大小是由 listening 成员的 pool_size 决定的 ngx_pool_t *pool; // 连接客户端的sockaddr struct sockaddr *sockaddr; // sockaddr结构体的长度 socklen_t socklen; // 连接客户段字符串形式的IP地址 ngx_str_t addr_text; // 代理协议地址 ngx_str_t proxy_protocol_addr; #if (NGX_SSL) ngx_ssl_connection_t *ssl; #endif // 本机监听端口对应的sockaddr结构体 struct sockaddr *local_sockaddr; // sockaddr结构体的长度 socklen_t local_socklen; // 用户接受、缓存客户端发来的字符流,分配在连接池中 ngx_buf_t *buffer; // 用来将当前连接以双向链表元素的形式添加到 ngx_cycle_t 核心结构体的 // reuseable_connection_queue 双向链表中,表示可以重用的连接 ngx_queue_t queue; // 连接使用次数 ngx_atomic_uint_t number; // 处理的请求次数 ngx_uint_t requests; // 缓存中业务类型 unsigned buffered:8; // 日志级别 unsigned log_error:3; /* ngx_connection_log_error_e */ // 为1时表示独立的连接,为0表示依靠其他连接行为而建立起来的非独立连接 unsigned unexpected_eof:1; // 为1表示连接已经超时 unsigned timedout:1; // 为1表示连接处理过程中出现错误 unsigned error:1; // 为1表示连接已经销毁 unsigned destroyed:1; // 为1表示连接处于空闲状态,如 keepalive 两次请求中间的状态 unsigned idle:1; // 为1表示连接可重用,与 queue 字段对应使用 unsigned reusable:1; // 为1表示连接关闭 unsigned close:1; // 为1表示正在将文件中的数据发往连接的另一端 unsigned sendfile:1; // 为1表示只有连接套接字对应的发送缓冲区必须满足最低设置的大小阀值时, // 事件驱动模块才会分发该事件 // 这与ngx_handle_write_event方法中的lowat参数是对应的 unsigned sndlowat:1; unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */ unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */ unsigned need_last_buf:1; #if (NGX_HAVE_IOCP) unsigned accept_context_updated:1; #endif #if (NGX_HAVE_AIO_SENDFILE) unsigned aio_sendfile:1; unsigned busy_count:2; ngx_buf_t *busy_sendfile; #endif #if (NGX_THREADS) ngx_atomic_t lock; #endif }; // }}}

 

 

它说明了连接的状态、回调函数等等所有连接相关的信息

 

为连接关联事件 -- ngx_add_event

#define ngx_add_event ngx_event_actions.add

 

而在初始化 epoll 模块配置时(ngx_epoll_init 函数中),调用

ngx_event_actions = ngx_epoll_module_ctx.actions;

 

而 epoll 模块上下文:

//epoll 模块的上下文 ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ //action函数 { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } };

 

说明此处初始化为 ngx_epoll_add_event 函数

// static ngx_int_t // ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) // 将事件加入 epoll 监听 {{{ static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { int op; uint32_t events, prev; ngx_event_t *e; ngx_connection_t *c; struct epoll_event ee; c = ev->data; events = (uint32_t) event; if (event == NGX_READ_EVENT) { e = c->write; prev = EPOLLOUT; #if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP) events = EPOLLIN|EPOLLRDHUP; #endif } else { e = c->read; prev = EPOLLIN|EPOLLRDHUP; #if (NGX_WRITE_EVENT != EPOLLOUT) events = EPOLLOUT; #endif } if (e->active) { op = EPOLL_CTL_MOD; events |= prev; } else { op = EPOLL_CTL_ADD; } ee.events = events | (uint32_t) flags; ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "epoll add event: fd:%d op:%d ev:%08XD", c->fd, op, ee.events); if (epoll_ctl(ep, op, c->fd, &ee) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "epoll_ctl(%d, %d) failed", op, c->fd); return NGX_ERROR; } ev->active = 1; #if 0 ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; #endif return NGX_OK; } // }}}

 

这个函数调用 epoll_ctl 函数绑定了指定事件

 






技术帖      linux      unix      龙潭书斋      nginx      worker      源码      sourcecode      ngx_worker_process_init      连接池      ngx_add_channel_event      ngx_channel     


京ICP备15018585号