继承未关闭连接

2014-12-30 10:29:20   最后更新: 2014-12-30 10:29:20   访问数量:985




在这一过程中,nginx 将所有继承得到的连接加入到新的监听数组中,以便之后继续监听和使用这些连接

// 获取所有继承连接fd的相关信息 if (ngx_add_inherited_sockets(&init_cycle) != NGX_OK) { return 1; }

 

// static ngx_int_t ngx_add_inherited_sockets(ngx_cycle_t *cycle) // 继承环境变量中保留的连接 fd {{{ static ngx_int_t ngx_add_inherited_sockets(ngx_cycle_t *cycle) { u_char *p, *v, *inherited; ngx_int_t s; ngx_listening_t *ls; inherited = (u_char *) getenv(NGINX_VAR); if (inherited == NULL) { return NGX_OK; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "using inherited sockets from \"%s\"", inherited); // 创建监听数组,默认使用 10 个监听连接 if (ngx_array_init(&cycle->listening, cycle->pool, 10, sizeof(ngx_listening_t)) != NGX_OK) { return NGX_ERROR; } for (p = inherited, v = p; *p; p++) { if (*p == ':' || *p == ';') { s = ngx_atoi(v, p - v); if (s == NGX_ERROR) { ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "invalid socket number \"%s\" in " NGINX_VAR " environment variable, ignoring the rest" " of the variable", v); break; } v = p + 1; ls = ngx_array_push(&cycle->listening); if (ls == NULL) { return NGX_ERROR; } ngx_memzero(ls, sizeof(ngx_listening_t)); ls->fd = (ngx_socket_t) s; } } ngx_inherited = 1; return ngx_set_inherited_sockets(cycle); } // }}}

 

在 nginx 中,每个建立的连接 fd 都会放到 "NGINX" 这个环境变量中,用 : 或 ; 分割,所以当新的进程启动,检测到 NGINX 这个环境变量中依然有 fd 存在,说明有连接存在需要继承,于是从环境变量中读取 fd 并加入监听数组成为了初始化过程中的一个必要环节

 

cycle->listening 就是这个监听数组,他是一个 ngx_array_t 结构,就是 nginx 内核封装的数组结构,这个数组的元素是 ngx_listening_s 结构,用来保存每一个打开的连接 fd 等信息

 

nginx 内核封装的数组结构

// struct ngx_array_t // nginx 数组结构 {{{ typedef struct { void *elts; // 数组起始位置 ngx_uint_t nelts; // 当前数组中元素个数 size_t size; // 单个元素大小 ngx_uint_t nalloc; // 数组容量 ngx_pool_t *pool; // 内存池地址 } ngx_array_t; // }}}

 

elts 是在 pool 中分配的真正的数组,里面的每个元素大小都是 size,共有 nelts 个元素,内存池空间中能够容纳的元素总计 nalloc 个

 

这里用到了数组初始化函数

// static ngx_inline ngx_int_t ngx_array_init( // ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size // ) // 数组结构初始化 {{{ static ngx_inline ngx_int_t ngx_array_init(ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size) { /* * set "array->nelts" before "array->elts", otherwise MSVC thinks * that "array->nelts" may be used without having been initialized */ array->nelts = 0; array->size = size; array->nalloc = n; array->pool = pool; array->elts = ngx_palloc(pool, n * size); if (array->elts == NULL) { return NGX_ERROR; } return NGX_OK; } // }}}

 

该函数根据参数,在内存池中分配内存,并创建指定维度的数组

ngx_palloc 保证分配 n * size 的数组

 

ngx_palloc

在内存池中分配内存空间

// void * ngx_palloc(ngx_pool_t *pool, size_t size) // 在内存池中分配内存 {{{ void * ngx_palloc(ngx_pool_t *pool, size_t size) { u_char *m; ngx_pool_t *p; if (size <= pool->max) { p = pool->current; do { m = ngx_align_ptr(p->d.last, NGX_ALIGNMENT); if ((size_t) (p->d.end - m) >= size) { p->d.last = m + size; return m; } p = p->d.next; } while (p); return ngx_palloc_block(pool, size); } return ngx_palloc_large(pool, size); } // }}}

 

在这个函数中使用了一个宏: ngx_align_ptr

#define ngx_align_ptr(p, a) \ (u_char *) (((uintptr_t) (p) + ((uintptr_t) a - 1)) & ~((uintptr_t) a - 1))

 

这个宏实现了按照 a 大小对齐内存

如果内存池充足,则直接分配空间并返回,否则在内存池结构组成的链表中分配

 

ngx_palloc_block

如果内存池链表全部空间不足,则调用 ngx_palloc_block 创建新的节点并添加到当前内存池列表的尾端

// static void * ngx_palloc_block(ngx_pool_t *pool, size_t size) // 在内存池列表尾端添加新的内存池 {{{ static void * ngx_palloc_block(ngx_pool_t *pool, size_t size) { u_char *m; size_t psize; ngx_pool_t *p, *new; // 内存池列表的每个元素的大小相同 psize = (size_t) (pool->d.end - (u_char *) pool); m = ngx_memalign(NGX_POOL_ALIGNMENT, psize, pool->log); if (m == NULL) { return NULL; } new = (ngx_pool_t *) m; new->d.end = m + psize; new->d.next = NULL; new->d.failed = 0; m += sizeof(ngx_pool_data_t); m = ngx_align_ptr(m, NGX_ALIGNMENT); new->d.last = m + size; for (p = pool->current; p->d.next; p = p->d.next) { // 如果连续4次以上分配失败,说明内存池中可用空间已经很小 // 则下一次直接用该内存池的下一内存池中的空间开辟新的空间 // 而不是浪费时间去检查这个剩余空间很小的内存池是否还有空间 if (p->d.failed++ > 4) { pool->current = p->d.next; } } p->d.next = new; return m; } // }}}

 

 

ngx_pool_large_s

如果需要分配的内存比当前内存池整体容量还大或者在 4KB 以上(大于一页),这个时候就要用到 ngx_pool_large_t 这个结构了

// struct ngx_pool_large_s // 大数据块结构 {{{ struct ngx_pool_large_s { ngx_pool_large_t *next; void *alloc; }; // }}}

 

这个结构看起来很简单,这个结构中只有两个指针,分别是用来将多个大内存结构连接在一起的next指针,和指向大内存首地址的 alloc 指针

 

ngx_palloc_large

使用 ngx_palloc_large 函数在原内存池中开辟空间存储大内存结构,并且初始化该存储区

由于 alloc 指向的内存只分配一次,所以不需要记录他的 size

// static void * ngx_palloc_large(ngx_pool_t *pool, size_t size) // 扩大内存池 {{{ static void * ngx_palloc_large(ngx_pool_t *pool, size_t size) { void *p; ngx_uint_t n; ngx_pool_large_t *large; p = ngx_alloc(size, pool->log); if (p == NULL) { return NULL; } n = 0; for (large = pool->large; large; large = large->next) { if (large->alloc == NULL) { large->alloc = p; return p; } if (n++ > 3) { break; } } large = ngx_palloc(pool, sizeof(ngx_pool_large_t)); if (large == NULL) { ngx_free(p); return NULL; } large->alloc = p; large->next = pool->large; pool->large = large; return p; } // }}}

 

 

// ngx_int_t ngx_set_inherited_sockets(ngx_cycle_t *cycle) // 根据继承的连接 fd 获取所有连接相关信息 {{{ ngx_int_t ngx_set_inherited_sockets(ngx_cycle_t *cycle) { size_t len; ngx_uint_t i; ngx_listening_t *ls; socklen_t olen; #if (NGX_HAVE_DEFERRED_ACCEPT || NGX_HAVE_TCP_FASTOPEN) ngx_err_t err; #endif #if (NGX_HAVE_DEFERRED_ACCEPT && defined SO_ACCEPTFILTER) struct accept_filter_arg af; #endif #if (NGX_HAVE_DEFERRED_ACCEPT && defined TCP_DEFER_ACCEPT) int timeout; #endif ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ls[i].sockaddr = ngx_palloc(cycle->pool, NGX_SOCKADDRLEN); if (ls[i].sockaddr == NULL) { return NGX_ERROR; } ls[i].socklen = NGX_SOCKADDRLEN; // 根据连接fd获取sock地址结构 if (getsockname(ls[i].fd, ls[i].sockaddr, &ls[i].socklen) == -1) { ngx_log_error(NGX_LOG_CRIT, cycle->log, ngx_socket_errno, "getsockname() of the inherited " "socket #%d failed", ls[i].fd); ls[i].ignore = 1; continue; } switch (ls[i].sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: ls[i].addr_text_max_len = NGX_INET6_ADDRSTRLEN; len = NGX_INET6_ADDRSTRLEN + sizeof("[]:65535") - 1; break; #endif #if (NGX_HAVE_UNIX_DOMAIN) case AF_UNIX: ls[i].addr_text_max_len = NGX_UNIX_ADDRSTRLEN; len = NGX_UNIX_ADDRSTRLEN; break; #endif case AF_INET: ls[i].addr_text_max_len = NGX_INET_ADDRSTRLEN; len = NGX_INET_ADDRSTRLEN + sizeof(":65535") - 1; break; default: ngx_log_error(NGX_LOG_CRIT, cycle->log, ngx_socket_errno, "the inherited socket #%d has " "an unsupported protocol family", ls[i].fd); ls[i].ignore = 1; continue; } ls[i].addr_text.data = ngx_pnalloc(cycle->pool, len); if (ls[i].addr_text.data == NULL) { return NGX_ERROR; } // 保存点分十进制字符串的IP地址 len = ngx_sock_ntop(ls[i].sockaddr, ls[i].socklen, ls[i].addr_text.data, len, 1); if (len == 0) { return NGX_ERROR; } ls[i].addr_text.len = len; ls[i].backlog = NGX_LISTEN_BACKLOG; olen = sizeof(int); // 获取接收缓冲区大小 if (getsockopt(ls[i].fd, SOL_SOCKET, SO_RCVBUF, (void *) &ls[i].rcvbuf, &olen) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, "getsockopt(SO_RCVBUF) %V failed, ignored", &ls[i].addr_text); ls[i].rcvbuf = -1; } olen = sizeof(int); // 获取发送缓冲区大小 if (getsockopt(ls[i].fd, SOL_SOCKET, SO_SNDBUF, (void *) &ls[i].sndbuf, &olen) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, "getsockopt(SO_SNDBUF) %V failed, ignored", &ls[i].addr_text); ls[i].sndbuf = -1; } #if 0 /* SO_SETFIB is currently a set only option */ #if (NGX_HAVE_SETFIB) olen = sizeof(int); if (getsockopt(ls[i].fd, SOL_SOCKET, SO_SETFIB, (void *) &ls[i].setfib, &olen) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, "getsockopt(SO_SETFIB) %V failed, ignored", &ls[i].addr_text); ls[i].setfib = -1; } #endif #endif #if (NGX_HAVE_TCP_FASTOPEN) olen = sizeof(int); // 获取 TFO if (getsockopt(ls[i].fd, IPPROTO_TCP, TCP_FASTOPEN, (void *) &ls[i].fastopen, &olen) == -1) { err = ngx_socket_errno; if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, err, "getsockopt(TCP_FASTOPEN) %V failed, ignored", &ls[i].addr_text); } ls[i].fastopen = -1; } #endif #if (NGX_HAVE_DEFERRED_ACCEPT && defined SO_ACCEPTFILTER) ngx_memzero(&af, sizeof(struct accept_filter_arg)); olen = sizeof(struct accept_filter_arg); // 获取服务器是否不等待最后的ACK包而仅仅等待携带数据负载的包 if (getsockopt(ls[i].fd, SOL_SOCKET, SO_ACCEPTFILTER, &af, &olen) == -1) { err = ngx_socket_errno; if (err == NGX_EINVAL) { continue; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, err, "getsockopt(SO_ACCEPTFILTER) for %V failed, ignored", &ls[i].addr_text); continue; } if (olen < sizeof(struct accept_filter_arg) || af.af_name[0] == '\0') { continue; } ls[i].accept_filter = ngx_palloc(cycle->pool, 16); if (ls[i].accept_filter == NULL) { return NGX_ERROR; } (void) ngx_cpystrn((u_char *) ls[i].accept_filter, (u_char *) af.af_name, 16); #endif #if (NGX_HAVE_DEFERRED_ACCEPT && defined TCP_DEFER_ACCEPT) timeout = 0; olen = sizeof(int); if (getsockopt(ls[i].fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &timeout, &olen) == -1) { err = ngx_socket_errno; if (err == NGX_EOPNOTSUPP) { continue; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, err, "getsockopt(TCP_DEFER_ACCEPT) for %V failed, ignored", &ls[i].addr_text); continue; } if (olen < sizeof(int) || timeout == 0) { continue; } ls[i].deferred_accept = 1; #endif } return NGX_OK; } // }}}

 

代码非常清晰,分别获取了 sock 地址结构、点分十进制IP字符串、接收缓冲区大小、发送缓冲区大小、TFO 等等到 cycle->listening 中,cycle->listening 是一个存储 ngx_listening_t 类型元素的数组

 

// struct ngx_listening_s // socket 属性结构 {{{ struct ngx_listening_s { ngx_socket_t fd; struct sockaddr *sockaddr; socklen_t socklen; /* size of sockaddr */ size_t addr_text_max_len; ngx_str_t addr_text; // 点分十进制IP字符串 int type; int backlog; int rcvbuf; int sndbuf; #if (NGX_HAVE_KEEPALIVE_TUNABLE) int keepidle; int keepintvl; int keepcnt; #endif /* handler of accepted connection */ ngx_connection_handler_pt handler; void *servers; /* array of ngx_http_in_addr_t, for example */ ngx_log_t log; ngx_log_t *logp; size_t pool_size; /* should be here because of the AcceptEx() preread */ size_t post_accept_buffer_size; /* should be here because of the deferred accept */ ngx_msec_t post_accept_timeout; ngx_listening_t *previous; ngx_connection_t *connection; unsigned open:1; unsigned remain:1; unsigned ignore:1; unsigned bound:1; /* already bound */ unsigned inherited:1; /* inherited from previous process */ unsigned nonblocking_accept:1; unsigned listen:1; unsigned nonblocking:1; unsigned shared:1; /* shared between threads or processes */ unsigned addr_ntop:1; #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) unsigned ipv6only:1; #endif unsigned keepalive:2; #if (NGX_HAVE_DEFERRED_ACCEPT) unsigned deferred_accept:1; unsigned delete_deferred:1; unsigned add_deferred:1; #ifdef SO_ACCEPTFILTER char *accept_filter; #endif #endif #if (NGX_HAVE_SETFIB) int setfib; #endif #if (NGX_HAVE_TCP_FASTOPEN) int fastopen; #endif }; // }}}

 

这个结构体保存了 socket 连接的常用属性,同时还保存了当前连接状态等信息,每一个数据域的命名都做到了可以自注释的程度,所以也就没有做过多的注释

 






技术帖      network      计算机网络      龙潭书斋      服务器      nginx      array      ngx_array_t      ngx_array_init      ngx_palloc      ngx_palloc_block      ngx_pool_large_s      ngx_palloc_large      ngx_listening_t      内存池     


京ICP备15018585号