upstream 机制的启动与初始化及请求的发送

2015-11-02 00:30:03   最后更新: 2016-02-14 17:53:13   访问数量:1570




上一篇日志中,我们介绍了作为 nginx http 模块的组成部分 nginx upstream 启动函数 ngx_http_proxy_handler 的执行

upstream 的启动 -- ngx_http_proxy_handler

 

本篇日志中,我们就要深入到 upstream 真正的启动过程 ngx_http_upstream_init

 

nginx upstream 访问上游服务器的流程分为六个阶段:

  1. 启动 upstream
  2. 连接上游服务器
  3. 向上游服务器发送请求
  4. 接收上游服务器响应HEADER
  5. 处理上游服务器响应包体
  6. 结束请求

 

在调用 ngx_http_upstream_create 创建 ngx_http_upstream_t 结构并进行了一系列初始化以后,就要开始真正的 upstream 的执行了

ngx_http_proxy_handler 调用了 ngx_http_read_client_request_body 函数通过执行 ngx_http_upstream_init 实现 upstream 的执行

// ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r, // ngx_http_client_body_handler_pt post_handler) // 通过执行 post_handler 回调函数处理请求 {{{ ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler) { size_t preread; ssize_t size; ngx_int_t rc; ngx_buf_t *b; ngx_chain_t out, *cl; ngx_http_request_body_t *rb; ngx_http_core_loc_conf_t *clcf; r->main->count++; #if (NGX_HTTP_SPDY) if (r->spdy_stream && r == r->main) { rc = ngx_http_spdy_read_request_body(r, post_handler); goto done; } #endif if (r != r->main || r->request_body || r->discard_body) { post_handler(r); return NGX_OK; } // 处理 expect 请求 if (ngx_http_test_expect(r) != NGX_OK) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); if (rb == NULL) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } /* * set by ngx_pcalloc(): * * rb->bufs = NULL; * rb->buf = NULL; * rb->free = NULL; * rb->busy = NULL; * rb->chunked = NULL; */ rb->rest = -1; rb->post_handler = post_handler; r->request_body = rb; if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) { // 调用传入的函数 post_handler(r); return NGX_OK; } preread = r->header_in->last - r->header_in->pos; if (preread) { /* there is the pre-read part of the request body */ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http client request body preread %uz", preread); out.buf = r->header_in; out.next = NULL; rc = ngx_http_request_body_filter(r, &out); if (rc != NGX_OK) { goto done; } r->request_length += preread - (r->header_in->last - r->header_in->pos); if (!r->headers_in.chunked && rb->rest > 0 && rb->rest <= (off_t) (r->header_in->end - r->header_in->last)) { /* the whole request body may be placed in r->header_in */ b = ngx_calloc_buf(r->pool); if (b == NULL) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } b->temporary = 1; b->start = r->header_in->pos; b->pos = r->header_in->pos; b->last = r->header_in->last; b->end = r->header_in->end; rb->buf = b; r->read_event_handler = ngx_http_read_client_request_body_handler; r->write_event_handler = ngx_http_request_empty_handler; rc = ngx_http_do_read_client_request_body(r); goto done; } } else { /* set rb->rest */ if (ngx_http_request_body_filter(r, NULL) != NGX_OK) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } } if (rb->rest == 0) { /* the whole request body was pre-read */ if (r->request_body_in_file_only) { if (ngx_http_write_request_body(r) != NGX_OK) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } if (rb->temp_file->file.offset != 0) { cl = ngx_chain_get_free_buf(r->pool, &rb->free); if (cl == NULL) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } b = cl->buf; ngx_memzero(b, sizeof(ngx_buf_t)); b->in_file = 1; b->file_last = rb->temp_file->file.offset; b->file = &rb->temp_file->file; rb->bufs = cl; } else { rb->bufs = NULL; } } post_handler(r); return NGX_OK; } if (rb->rest < 0) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "negative request body rest"); rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); size = clcf->client_body_buffer_size; size += size >> 2; /* TODO: honor r->request_body_in_single_buf */ if (!r->headers_in.chunked && rb->rest < size) { size = (ssize_t) rb->rest; if (r->request_body_in_single_buf) { size += preread; } } else { size = clcf->client_body_buffer_size; } rb->buf = ngx_create_temp_buf(r->pool, size); if (rb->buf == NULL) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; goto done; } r->read_event_handler = ngx_http_read_client_request_body_handler; r->write_event_handler = ngx_http_request_empty_handler; rc = ngx_http_do_read_client_request_body(r); done: if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { r->main->count--; } return rc; } // }}}

 

这个关键的函数的工作就是就是通过调用 post_handler 函数实现返回包体的生成

 

// static void ngx_http_upstream_init_request(ngx_http_request_t *r) // 初始化 upstream {{{ static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; #if (NGX_HTTP_CACHE) if (u->conf->cache) { ngx_int_t rc; rc = ngx_http_upstream_cache(r, u); if (rc == NGX_BUSY) { r->write_event_handler = ngx_http_upstream_init_request; return; } r->write_event_handler = ngx_http_request_empty_handler; if (rc == NGX_DONE) { return; } if (rc == NGX_ERROR) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (rc != NGX_DECLINED) { ngx_http_finalize_request(r, rc); return; } } #endif u->store = (u->conf->store || u->conf->store_lengths); // ignore_client_abort 不为 1 则在上游服务器交互时需检查下游客户端是否断开连接 // 这里赋值读写的检查函数 if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } if (r->request_body) { u->request_bufs = r->request_body->bufs; } // 构造发往上游服务器的请求 if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->peer.local = ngx_http_upstream_get_local(r, u->conf->local); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // 赋值清理回调函数 cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; if (u->resolved == NULL) { uscf = u->conf->upstream; } else { #if (NGX_HTTP_SSL) u->ssl_name = u->resolved->host; #endif if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; // 初始化 resolver ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } ctx->name = *host; // 赋值回调函数 ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; // 校验上游主机名 if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: if (uscf == NULL) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "no upstream configuration"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } #if (NGX_HTTP_SSL) u->ssl_name = uscf->host; #endif if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->peer.start_time = ngx_current_msec; if (u->conf->next_upstream_tries && u->peer.tries > u->conf->next_upstream_tries) { u->peer.tries = u->conf->next_upstream_tries; } // 建立到上游主机的连接,注册读写事件 ngx_http_upstream_connect(r, u); } // }}}

 

这个函数根据 ngx_http_upstream_conf_t 中的成员初始化了 upstream

最后,他调用了 ngx_http_upstream_init_request,这也是最关键的一步

 

// static void ngx_http_upstream_init_request(ngx_http_request_t *r) // 初始化 upstream {{{ static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; #if (NGX_HTTP_CACHE) if (u->conf->cache) { ngx_int_t rc; rc = ngx_http_upstream_cache(r, u); if (rc == NGX_BUSY) { r->write_event_handler = ngx_http_upstream_init_request; return; } r->write_event_handler = ngx_http_request_empty_handler; if (rc == NGX_DONE) { return; } if (rc == NGX_ERROR) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (rc != NGX_DECLINED) { ngx_http_finalize_request(r, rc); return; } } #endif u->store = (u->conf->store || u->conf->store_lengths); if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } if (r->request_body) { u->request_bufs = r->request_body->bufs; } if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->peer.local = ngx_http_upstream_get_local(r, u->conf->local); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; if (u->resolved == NULL) { uscf = u->conf->upstream; } else { #if (NGX_HTTP_SSL) u->ssl_name = u->resolved->host; #endif if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; // 初始化 resolver ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } ctx->name = *host; // 赋值回调函数 ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; // 校验上游主机名 if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: if (uscf == NULL) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "no upstream configuration"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } #if (NGX_HTTP_SSL) u->ssl_name = uscf->host; #endif if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->peer.start_time = ngx_current_msec; if (u->conf->next_upstream_tries && u->peer.tries > u->conf->next_upstream_tries) { u->peer.tries = u->conf->next_upstream_tries; } // 建立到上游主机的连接 ngx_http_upstream_connect(r, u); } // }}}

 

在这个函数中调用 ngx_resolve_start 初始化了用于存储上游主机信息的 resolver 域

 

// ngx_resolver_ctx_t * // ngx_resolve_start(ngx_resolver_t *r, ngx_resolver_ctx_t *temp) // 初始化 upstream 的 resolver {{{ ngx_resolver_ctx_t * ngx_resolve_start(ngx_resolver_t *r, ngx_resolver_ctx_t *temp) { in_addr_t addr; ngx_resolver_ctx_t *ctx; if (temp) { addr = ngx_inet_addr(temp->name.data, temp->name.len); if (addr != INADDR_NONE) { temp->resolver = r; temp->state = NGX_OK; temp->naddrs = 1; temp->addrs = &temp->addr; temp->addr.sockaddr = (struct sockaddr *) &temp->sin; temp->addr.socklen = sizeof(struct sockaddr_in); ngx_memzero(&temp->sin, sizeof(struct sockaddr_in)); temp->sin.sin_family = AF_INET; temp->sin.sin_addr.s_addr = addr; temp->quick = 1; return temp; } } if (r->udp_connections.nelts == 0) { return NGX_NO_RESOLVER; } ctx = ngx_resolver_calloc(r, sizeof(ngx_resolver_ctx_t)); if (ctx) { ctx->resolver = r; } return ctx; } // }}}

 

 

然后他调用 ngx_http_upstream_connect 建立了连接

 

upstream 与上游服务器是通过 TCP 连接通信的,众所周知,建立 TCP 连接需要的三次握手时间是不可控的,因此,为了保证建立 TCP 连接的过程不会造成服务器的阻塞,nginx 使用了非阻塞的 socket 实现了上游服务器的连接

// static void // ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) // 建立到上游主机的连接,注册读写事件 {{{ static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_time_t *tp; ngx_connection_t *c; r->connection->log->action = "connecting to upstream"; if (u->state && u->state->response_sec) { tp = ngx_timeofday(); u->state->response_sec = tp->sec - u->state->response_sec; u->state->response_msec = tp->msec - u->state->response_msec; } u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); tp = ngx_timeofday(); u->state->response_sec = tp->sec; u->state->response_msec = tp->msec; // 建立非阻塞socket连接 rc = ngx_event_connect_peer(&u->peer); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->state->peer = u->peer.name; // 上游服务器忙或连接失败,则调用 ngx_http_upstream_next 尝试重新连接 if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */ c = u->peer.connection; c->data = r; // 设置连接的读写回调函数 c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; // 设置连接上游服务器的读写回调函数 u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; if (c->pool == NULL) { /* we need separate pool here to be able to cache SSL connections */ c->pool = ngx_create_pool(128, r->connection->log); if (c->pool == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } c->log = r->connection->log; c->pool->log = c->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; if (u->request_sent) { if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporary bufs */ u->output.free = ngx_alloc_chain_link(r->pool); if (u->output.free == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->output.free->buf = r->request_body->buf; u->output.free->next = NULL; u->output.allocated = 1; r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } u->request_sent = 0; // 需要等待上游服务器的返回则将连接加入到计时器 if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->connect_timeout); return; } #if (NGX_HTTP_SSL) if (u->ssl && c->ssl == NULL) { ngx_http_upstream_ssl_init_connection(r, u, c); return; } #endif // 发送请求 ngx_http_upstream_send_request(r, u); } // }}}

 

 

这段代码的关键在于 ngx_event_connect_peer 的调用,他创建了 socket 并调用 ngx_nonblocking(ioctl)将 socket 设置为了非阻塞模式

// ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) // 建立非阻塞的 socket 连接 {{{ ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) { int rc; ngx_int_t event; ngx_err_t err; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_connection_t *c; // 调用回调函数获取状态 rc = pc->get(pc, pc->data); if (rc != NGX_OK) { return rc; } // 创建 socket s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s); if (s == (ngx_socket_t) -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_socket_n " failed"); return NGX_ERROR; } // 为socket分配一个空闲的连接 c = ngx_get_connection(s, pc->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n "failed"); } return NGX_ERROR; } if (pc->rcvbuf) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_RCVBUF) failed"); goto failed; } } if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_nonblocking_n " failed"); goto failed; } if (pc->local) { if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno, "bind(%V) failed", &pc->local->name); goto failed; } } c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->sendfile = 1; c->log_error = pc->log_error; if (pc->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } rev = c->read; wev = c->write; rev->log = pc->log; wev->log = pc->log; pc->connection = c; c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); // 将连接添加到 epoll if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { goto failed; } } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connect to %V, fd:%d #%uA", pc->name, s, c->number); // 建立连接 rc = connect(s, pc->sockaddr, pc->socklen); if (rc == -1) { err = ngx_socket_errno; if (err != NGX_EINPROGRESS #if (NGX_WIN32) /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */ && err != NGX_EAGAIN #endif ) { if (err == NGX_ECONNREFUSED #if (NGX_LINUX) /* * Linux returns EAGAIN instead of ECONNREFUSED * for unix sockets if listen queue is full */ || err == NGX_EAGAIN #endif || err == NGX_ECONNRESET || err == NGX_ENETDOWN || err == NGX_ENETUNREACH || err == NGX_EHOSTDOWN || err == NGX_EHOSTUNREACH) { level = NGX_LOG_ERR; } else { level = NGX_LOG_CRIT; } ngx_log_error(level, c->log, err, "connect() to %V failed", pc->name); ngx_close_connection(c); pc->connection = NULL; return NGX_DECLINED; } } if (ngx_add_conn) { if (rc == -1) { /* NGX_EINPROGRESS */ return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; } // 如果使用 AIO 作相应的校验 if (ngx_event_flags & NGX_USE_AIO_EVENT) { ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno, "connect(): %d", rc); /* aio, iocp */ if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_blocking_n " failed"); goto failed; } /* * FreeBSD's aio allows to post an operation on non-connected socket. * NT does not support it. * * TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT */ rev->ready = 1; wev->ready = 1; return NGX_OK; } if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ event = NGX_CLEAR_EVENT; } else { /* select, poll, /dev/poll */ event = NGX_LEVEL_EVENT; } // 添加一个read事件 if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { goto failed; } if (rc == -1) { /* NGX_EINPROGRESS */ // 如果没有可用连接,则添加一个 write 事件 if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { goto failed; } return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; failed: // 调用失败,关闭连接 ngx_close_connection(c); pc->connection = NULL; return NGX_ERROR; } // }}}

 

 

由于是非阻塞的 socket 调用,因此在调用 connect 建立连接之前,首先将 socket 加入了 epoll 监听,只有当他出现可写事件时,才说明连接建立成功

 

在函数的最后,调用了 ngx_http_upstream_send_request 函数给上游服务器发送请求,执行到此说明链接已经建立成功

 

// static void // ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) // upstream 机制中,给上游服务器发送请求 {{{ static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request"); // 如果尝试连接上游服务器失败,则调用 ngx_http_upstream_next 反复尝试连接 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream"; // 发送请求 rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); u->request_sent = 1; // 发送失败则尝试重新连接 if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } // 写事件如果在定时器中则移除 if (c->write->timer_set) { ngx_del_timer(c->write); } // 未发送完成则重新将写事件添加到定时器中,然后加入 epoll if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->send_timeout); if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK */ // 发送完成 if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { // 将 socket 设置为 TCP_CORK if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } // 重设写事件回调 u->write_event_handler = ngx_http_upstream_dummy_handler; // 将写事件加入 epoll if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // 将读事件加入计时器 ngx_add_timer(c->read, u->conf->read_timeout); // 如果套接字缓冲区中有数据可读 // 则调用 ngx_http_upstream_process_header 接收响应 HEADER if (c->read->ready) { ngx_http_upstream_process_header(r, u); return; } } // }}}

 

 

 

 






技术帖      龙潭书斋      源代码      服务器      nginx      server      源码      sourcecode      webserver      source      code      upstream      反向代理      subrequest      ngx_upstream_t     


京ICP备15018585号