worker 进程的创建

2015-01-06 17:14:33   最后更新: 2015-01-08 17:12:07   访问数量:821




// ngx_pid_t // ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, // char *name, ngx_int_t respawn) // 创建子进程 {{{ ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { // 如果进程已退出,则使用该节点存储新的进程信息 for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } } if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; pid = fork(); switch (pid) { case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; case 0: ngx_pid = ngx_getpid(); proc(cycle, data); break; default: break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; if (respawn >= 0) { return pid; } ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; } // }}}

 

这个函数创建子进程,并用 proc 初始化子进程地址空间,子进程的信息保存在 ngx_processes 这个结构体数组中

子进程共有一下五种模式,即 respawn 参数有以下取值:

#define NGX_PROCESS_NORESPAWN -1 // 子进程退出时,父进程不再创建 #define NGX_PROCESS_JUST_SPAWN -2 //用于在子进程退出并重新创建后标记是刚刚创建的新进程,防止被父进程意外终止 #define NGX_PROCESS_RESPAWN -3 // 子进程退出时,父进程需要重新创建 #define NGX_PROCESS_JUST_RESPAWN -4 // 该标记用来标记进程数组中哪些是新创建的子进程 #define NGX_PROCESS_DETACHED -5 // 热代码替换

 

这里用到了 unix 的域套接字,用来在父子进程之间传递信息,在《UNIX网络编程》第15章中,对该套接字及系统调用有所介绍

参见:

UNIX 进程间通信 -- socketpair 函数

这里通过 socketpair 创建了两个用于通信的 socket 描述符,保存在 ngx_processes[s].channel 中,并设置为非阻塞 IO 模式

// unix 域套接字,用于进程间通信 // 用第四个参数作为返回值,返回2个用于通信的sockfd if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }

 

ngx_nonblocking 这个函数我们已经了解过

// int ngx_nonblocking(ngx_socket_t s) // 设置 IO 为非阻塞方式 {{{ int ngx_nonblocking(ngx_socket_t s) { int nb; nb = 1; return ioctl(s, FIONBIO, &nb); } // }}}

 

用来设置描述符支持非阻塞IO

 

随后,进行了一系列设置,设置 ngx_processes[s].channel[0] 支持异步IO、执行 exec 时自动关闭 fd,及 ngx_processes[s].channel[1] 执行 exec 时自动关闭 fd

on = 1; // 设置支持异步IO if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 设置将接收 SIGIO 和 SIGURG 信号的进程 id 或进程组 id if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 调用 exec 后自动关闭 fd if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 调用 exec 后自动关闭 fd if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1];

 

接下来的工作就是 fork 子进程,并进行一些必要的初始化工作,这里我们暂且跟着守护进程的步伐,只关注父进程的执行过程

关于多进程程序的 GDB 调试,可以参看:

使用 GDB 调试多进程程序

// 父进程记录子进程 pid ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0;

 

 

proc 这里传入的是 ngx_worker_process_cycle,这个函数我们在讲解 worker 进程的时候再深入讲解

 

全局变量 ngx_processes 是一个 ngx_process_t 结构的数组,用来保存子进程的必要信息,用全局变量 ngx_process_slot 作为末节点索引

子进程创建后初始化了这一结构的相应节点

typedef int ngx_socket_t; typedef void (*ngx_spawn_proc_pt) (ngx_cycle_t *cycle, void *data); // struct ngx_process_t // 进程描述结构 {{{ typedef struct { ngx_pid_t pid; // pid int status; // 当前进程的退出状态 ngx_socket_t channel[2]; // 用于进程间通信的两个 socketfd ngx_spawn_proc_pt proc; // 进程创建后执行的函数 void *data; // proc 参数 char *name; // 进程名 unsigned respawn:1; // 退出后是否重建 unsigned just_spawn:1; // 第一次创建 unsigned detached:1; // 分离进程 unsigned exiting:1; // 正在退出的进程 unsigned exited:1; // 已经退出的进程 } ngx_process_t; // }}}

 

 

此时,ngx_processes[0] 的值为

{ pid = 32753, status = 0, channel = {3, 7}, proc = 0x8069422 <ngx_worker_process_cycle>, data = 0x0, name = 0x80bd8be "worker process", respawn = 1, just_spawn = 0, detached = 0, exiting = 0, exited = 0 }

 

ch.command = NGX_CMD_OPEN_CHANNEL; ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; // 通知其他子进程,不要打开这两个已打开的 channel ? ngx_pass_open_channel(cycle, &ch);

 

ch 是 ngx_channel_t 类型的变量,用来传递通信的信息

父子进程通讯结构

// struct ngx_channel_t // master和worker之间传递的指令,只能master向worker传递指令 {{{ typedef struct { ngx_uint_t command; // 传递的指令 ngx_pid_t pid; // worker 进程 id ngx_int_t slot; // worker进程在ngx_process中的索引 ngx_fd_t fd; // 可能用到的文件描述符 } ngx_channel_t; // }}}

 

 






技术帖      linux      socket      龙潭书斋      服务器      进程间通信      fork      ipc      process      nginx      server      worker      daemon      ngx_processes      socketpair     


京ICP备15018585号