nginx 锁的实现

2015-08-11 23:21:33   最后更新: 2015-08-13 16:39:50   访问数量:1790




nginx 自己实现了互斥锁,这样做的目的主要是为了兼容性,在不支持锁甚至不支持原子操作的环境下实现锁操作

 

ngx_accept_mutex 是一个 ngx_shmtx_t 结构体,ngx_shmtx_t 描述了 nginx 的锁结构

// struct ngx_shmtx_t // nginx 锁结构 {{{ typedef struct { #if (NGX_HAVE_ATOMIC_OPS) // 是否支持原子操作 ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) // 是否支持信号量 ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; #endif #else // 不支持原子操作则使用文件操作 ngx_fd_t fd; u_char *name; #endif ngx_uint_t spin; // 自旋锁标识 } ngx_shmtx_t; // }}}

 

 

nginx 使用这个结构实现了 nginx 中唯一使用的一个锁,accept 自旋锁

通过这个结构,实现了对各种环境的兼容,包括不支持信号量的环境、甚至不支持原子操作的环境

针对不同的环境,实现了具体的锁结构:

  1. 原子锁
  2. 信号量
  3. 共享文件锁

 

nginx 模块初始化 中,nginx 对自旋锁进行了初始化工作

 

// 实现互斥锁 ngx_accept_mutex {{{ /* cl should be equal to or greater than cache line size */ cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ #if (NGX_STAT_STUB) size += cl /* ngx_stat_accepted */ + cl /* ngx_stat_handled */ + cl /* ngx_stat_requests */ + cl /* ngx_stat_active */ + cl /* ngx_stat_reading */ + cl /* ngx_stat_writing */ + cl; /* ngx_stat_waiting */ #endif shm.size = size; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } shared = shm.addr; ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; // 初始化互斥锁 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl); (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "counter: %p, %d", ngx_connection_counter, *ngx_connection_counter); ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl); tp = ngx_timeofday(); ngx_random_number = (tp->msec << 16) + ngx_pid; #if (NGX_STAT_STUB) ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl); ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl); ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl); ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl); ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl); ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl); ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl); #endif // }}}

 

 

初始化中最重要的一个环节就是 ngx_shmtx_create

// ngx_int_t // ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) // 初始化自旋锁 {{{ ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock; if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } mtx->spin = 2048; #if (NGX_HAVE_POSIX_SEM) mtx->wait = &addr->wait; // 初始化信号量 if (sem_init(&mtx->sem, 1, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); } else { mtx->semaphore = 1; } #endif return NGX_OK; } // }}}

 

这个操作其实很简单,将共享内存地址赋值给 lock 域

 

nginx 实现了两个用来获取锁的函数,分别是非阻塞版本的 trylock 和阻塞版本的 lock

 

尝试获取锁 -- ngx_shmtx_trylock

// ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) // 尝试获取自旋锁 {{{ ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); } // }}}

 

 

这里调用了一个函数 ngx_atomic_cmp_set,他其实是一个宏

#define ngx_atomic_cmp_set(lock, old, set) \ __sync_bool_compare_and_swap(lock, old, set)

 

他实现了原子性的比较和替换,原子性的实现了 *lock == old && *lock = set;

完成了原子性的比较和上锁

 

阻塞获取锁 -- ngx_shmtx_lock

// void ngx_shmtx_lock(ngx_shmtx_t *mtx) // 阻塞并获取锁 {{{ void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock"); for ( ;; ) { if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } if (ngx_ncpu > 1) { for (n = 1; n < mtx->spin; n <<= 1) { for (i = 0; i < n; i++) { ngx_cpu_pause(); } if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } } #if (NGX_HAVE_POSIX_SEM) // 等待信号量 if (mtx->semaphore) { (void) ngx_atomic_fetch_add(mtx->wait, 1); if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wait %uA", *mtx->wait); while (sem_wait(&mtx->sem) == -1) { ngx_err_t err; err = ngx_errno; if (err != NGX_EINTR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "sem_wait() failed while waiting on shmtx"); break; } } ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke"); continue; } #endif // 主动放弃一轮 CPU ngx_sched_yield(); } } // }}}

 

 

这里使用死循环持续等待,或者阻塞在信号量上

 

针对不支持原子操作的环境,上述代码自然是无法运行的,nginx 对这样的环境也进行了适配,就是上述两个函数的锁文件版本

 

尝试获取锁 -- ngx_shmtx_trylock

// ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) // 尝试获取锁 {{{ ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { ngx_err_t err; // 尝试获取锁文件 err = ngx_trylock_fd(mtx->fd); if (err == 0) { return 1; } if (err == NGX_EAGAIN) { return 0; } #if __osf__ /* Tru64 UNIX */ if (err == NGX_EACCESS) { return 0; } #endif ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name); return 0; } // }}}

 

 

这里调用了 ngx_trylock_fd 这个函数就是用来尝试锁文件的

// ngx_err_t ngx_trylock_fd(ngx_fd_t fd) // 尝试锁文件 {{{ ngx_err_t ngx_trylock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; // 尝试锁定 if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; } // }}}

 

 

阻塞获取锁 -- ngx_shmtx_lock

// void ngx_shmtx_lock(ngx_shmtx_t *mtx) // 阻塞直到获取锁 {{{ void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_err_t err; // 锁文件 err = ngx_lock_fd(mtx->fd); if (err == 0) { return; } ngx_log_abort(err, ngx_lock_fd_n " %s failed", mtx->name); } // }}}

 

 

锁文件版本的 ngx_shmtx_lock 比之前版本的 ngx_shmtx_lock 简洁多了,因为 fcntl 原生实现了阻塞并等待文件锁的操作,因此无需用户再写死循环

// ngx_err_t ngx_lock_fd(ngx_fd_t fd) // 锁文件 {{{ ngx_err_t ngx_lock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; // 阻塞直到锁定 if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; } return 0; } // }}}

 

 

ngx_trylock_fd 与 ngx_lock_fd 都是锁文件操作,而代码又极其相似,唯一的区别在于调用 fcntl 分别传递了 F_SETLK 和 F_SETLKW

传递 F_SETLK 会使函数在无法获取到锁的情况下立即返回,而 F_SETLKW 则会阻塞直到函数获取文件锁为止

 

同样的,ngx_shmtx_unlock 也有两个版本 -- 原子锁版本与文件锁版本

 

原子锁解锁

// void ngx_shmtx_unlock(ngx_shmtx_t *mtx) // 原子自旋锁解锁 {{{ void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } } // }}}

 

操作很简单,将原子锁变量置为 0,使用的还是 gcc 提供的原子操作 __sync_bool_compare_and_swap,原子的检查并赋值

针对使用信号量的情况,在解锁后,必须调用 sem_post 让信号量加 1,以表示一个新的进程加入了等待队列,于是,在调用 ngx_atomic_cmp_set 后马上调用了 ngx_shmtx_wakeup

 

锁文件解锁

锁文件的解锁同样是调用 fcntl 实现的

// void ngx_shmtx_unlock(ngx_shmtx_t *mtx) // 锁文件解锁 {{{ void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { ngx_err_t err; err = ngx_unlock_fd(mtx->fd); if (err == 0) { return; } ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name); } // }}}

 

 

// ngx_err_t ngx_unlock_fd(ngx_fd_t fd) // 解锁锁文件 {{{ ngx_err_t ngx_unlock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_UNLCK; fl.l_whence = SEEK_SET; if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; } // }}}

 

 

上述实现的锁机制存在一个问题,那就是如果已经获取到锁的进程突然异常退出,锁变量仍然非 0,那么其他所有等待这个锁变量的进程都会陷入死锁,这个问题如何解决呢?

针对上面描述的文件锁,当进程退出后,其他进程可以顺利打开锁文件,因此不会存在这个问题,但是针对原子锁和信号量,这个问题却显然存在,如果不将原子锁置为 0 或不显式 sem_post 让信号量加 1,其他进程就会因为无法获取到锁而永远等待了

 

回归代码,其实 nginx 有很优雅的解决方案

worker 进程退出后,父进程会受到 SIGCHLD 信号,随之执行预设的信号响应函数 ngx_signal_handler

// void ngx_signal_handler(int signo) // 信号的默认处理函数 {{{ void ngx_signal_handler(int signo) { char *action; ngx_int_t ignore; ngx_err_t err; ngx_signal_t *sig; ignore = 0; err = ngx_errno; for (sig = signals; sig->signo != 0; sig++) { if (sig->signo == signo) { break; } } // 更新 cached_err_log_time ngx_time_sigsafe_update(); action = ""; switch (ngx_process) { case NGX_PROCESS_MASTER: case NGX_PROCESS_SINGLE: switch (signo) { case ngx_signal_value(NGX_SHUTDOWN_SIGNAL): ngx_quit = 1; action = ", shutting down"; break; case ngx_signal_value(NGX_TERMINATE_SIGNAL): case SIGINT: ngx_terminate = 1; action = ", exiting"; break; case ngx_signal_value(NGX_NOACCEPT_SIGNAL): if (ngx_daemonized) { ngx_noaccept = 1; action = ", stop accepting connections"; } break; case ngx_signal_value(NGX_RECONFIGURE_SIGNAL): ngx_reconfigure = 1; action = ", reconfiguring"; break; case ngx_signal_value(NGX_REOPEN_SIGNAL): ngx_reopen = 1; action = ", reopening logs"; break; case ngx_signal_value(NGX_CHANGEBIN_SIGNAL): if (getppid() > 1 || ngx_new_binary > 0) { /* * Ignore the signal in the new binary if its parent is * not the init process, i.e. the old binary's process * is still running. Or ignore the signal in the old binary's * process if the new binary's process is already running. */ action = ", ignoring"; ignore = 1; break; } ngx_change_binary = 1; action = ", changing binary"; break; case SIGALRM: ngx_sigalrm = 1; break; case SIGIO: ngx_sigio = 1; break; case SIGCHLD: ngx_reap = 1; break; } break; case NGX_PROCESS_WORKER: case NGX_PROCESS_HELPER: switch (signo) { case ngx_signal_value(NGX_NOACCEPT_SIGNAL): if (!ngx_daemonized) { break; } ngx_debug_quit = 1; case ngx_signal_value(NGX_SHUTDOWN_SIGNAL): ngx_quit = 1; action = ", shutting down"; break; case ngx_signal_value(NGX_TERMINATE_SIGNAL): case SIGINT: ngx_terminate = 1; action = ", exiting"; break; case ngx_signal_value(NGX_REOPEN_SIGNAL): ngx_reopen = 1; action = ", reopening logs"; break; case ngx_signal_value(NGX_RECONFIGURE_SIGNAL): case ngx_signal_value(NGX_CHANGEBIN_SIGNAL): case SIGIO: action = ", ignoring"; break; } break; } ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0, "signal %d (%s) received%s", signo, sig->signame, action); if (ignore) { ngx_log_error(NGX_LOG_CRIT, ngx_cycle->log, 0, "the changing binary signal is ignored: " "you should shutdown or terminate " "before either old or new binary's process"); } if (signo == SIGCHLD) { // 清理子进程,包括解锁 ngx_process_get_status(); } ngx_set_errno(err); } // }}}

 

 

在信号响应函数中调用了子进程状态清理函数 ngx_process_get_status

// static void ngx_process_get_status(void) // 已死子进程清理 {{{ static void ngx_process_get_status(void) { int status; char *process; ngx_pid_t pid; ngx_err_t err; ngx_int_t i; ngx_uint_t one; one = 0; for ( ;; ) { // 获取退出子进程 pid pid = waitpid(-1, &status, WNOHANG); if (pid == 0) { return; } if (pid == -1) { err = ngx_errno; if (err == NGX_EINTR) { continue; } if (err == NGX_ECHILD && one) { return; } /* * Solaris always calls the signal handler for each exited process * despite waitpid() may be already called for this process. * * When several processes exit at the same time FreeBSD may * erroneously call the signal handler for exited process * despite waitpid() may be already called for this process. */ if (err == NGX_ECHILD) { ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, err, "waitpid() failed"); return; } ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "waitpid() failed"); return; } one = 1; process = "unknown process"; for (i = 0; i < ngx_last_process; i++) { if (ngx_processes[i].pid == pid) { ngx_processes[i].status = status; ngx_processes[i].exited = 1; process = ngx_processes[i].name; break; } } if (WTERMSIG(status)) { #ifdef WCOREDUMP ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0, "%s %P exited on signal %d%s", process, pid, WTERMSIG(status), WCOREDUMP(status) ? " (core dumped)" : ""); #else ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0, "%s %P exited on signal %d", process, pid, WTERMSIG(status)); #endif } else { ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0, "%s %P exited with code %d", process, pid, WEXITSTATUS(status)); } if (WEXITSTATUS(status) == 2 && ngx_processes[i].respawn) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0, "%s %P exited with fatal code %d " "and cannot be respawned", process, pid, WEXITSTATUS(status)); ngx_processes[i].respawn = 0; } // 如果已退出子进程拥有锁,则解锁 ngx_unlock_mutexes(pid); } } // }}}

 

在这个函数中调用 waitpid 获取了已退出进程的 pid,检测了该进程的相关状态,最终执行了 ngx_unlock_mutexes(pid)

 

在 nginx 中,原子锁的取值并不是 0 或 1,而是 0 或 pid,因此所有进程都可以获取到目前是哪个进程在占用锁

这个信息显然是至关重要的,在父进程收到 SIGCHLD 信号后,只需要对比 mutex 变量中的值,即可知道是否需要对其解锁

 

// static void ngx_unlock_mutexes(ngx_pid_t pid) // 如果已退出子进程拥有锁,则解锁 {{{ static void ngx_unlock_mutexes(ngx_pid_t pid) { ngx_uint_t i; ngx_shm_zone_t *shm_zone; ngx_list_part_t *part; ngx_slab_pool_t *sp; /* * unlock the accept mutex if the abnormally exited process * held it */ if (ngx_accept_mutex_ptr) { // 如果 pid 持有锁则解锁 (void) ngx_shmtx_force_unlock(&ngx_accept_mutex, pid); } /* * unlock shared memory mutexes if held by the abnormally exited * process */ part = (ngx_list_part_t *) &ngx_cycle->shared_memory.part; shm_zone = part->elts; for (i = 0; /* void */ ; i++) { if (i >= part->nelts) { if (part->next == NULL) { break; } part = part->next; shm_zone = part->elts; i = 0; } sp = (ngx_slab_pool_t *) shm_zone[i].shm.addr; // 如果 pid 持有锁则解锁 if (ngx_shmtx_force_unlock(&sp->mutex, pid)) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0, "shared memory zone \"%V\" was locked by %P", &shm_zone[i].shm.name, pid); } } } // }}}

 

 

进而调用了:

// ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid) // 如果 pid 持有锁则解锁 {{{ ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx forced unlock"); if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) { // 用于调用 sem_post ngx_shmtx_wakeup(mtx); return 1; } return 0; } // }}}

 

 

问题圆满解决

 






技术帖      龙潭书斋      fcntl      进程            process      多进程      nginx      源码      opensource      sourcecode      惊群现象      开源      自旋锁      互斥锁      lock      trylock      文件锁      共享内存     


1#weaver: (回复)2018-10-11 18:41:28

有个问题请教下,在系统只支持信号量的情况下,在创建锁的时候信号量初始化为0,在加锁的时候直接wait(), 不就阻塞住获取不到锁了吗?

2#博主: (回复)2018-10-15 11:31:24

回复:1#没错,所以在 ngx_init_zone_pool 调用以后,调用了 shm_zone[i].init(&shm_zone[i], NULL) 来初始化信号量,这个回调方法里面调用了 ngx_shmtx_wakeup 从而让信号量在使用前初始化为正确的值

京ICP备15018585号