经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Nginx » 查看文章
详解nginx进程锁的实现
来源:jb51  时间:2021/6/15 9:23:33  对本文有异议

一、 nginx进程锁的作用

nginx是多进程并发模型应用,直白点就是:有多个worker都在监听网络请求,谁接收某个请求,那么后续的事务就由它来完成。如果没有锁的存在,那么就是这种场景,当一个请求被系统接入后,所以可以监听该端口的进程,就会同时去处理该事务。当然了,系统会避免这种糟糕事情的发生,但也就出现了所谓的惊群。(不知道说得对不对,大概是那么个意思吧)

所以,为了避免出现同一时刻,有许多进程监听,就应该该多个worker间有序地监听socket. 为了让多个worker有序,所以就有了本文要讲的进程锁的出现了,只有抢到锁的进程才可以进行网络请求的接入操作。

即如下过程:

  1. // worker 核心事务框架
  2. // ngx_event.c
  3. void
  4. ngx_process_events_and_timers(ngx_cycle_t *cycle)
  5. {
  6. ngx_uint_t flags;
  7. ngx_msec_t timer, delta;
  8.  
  9. if (ngx_timer_resolution) {
  10. timer = NGX_TIMER_INFINITE;
  11. flags = 0;
  12.  
  13. } else {
  14. timer = ngx_event_find_timer();
  15. flags = NGX_UPDATE_TIME;
  16.  
  17. #if (NGX_WIN32)
  18.  
  19. /* handle signals from master in case of network inactivity */
  20.  
  21. if (timer == NGX_TIMER_INFINITE || timer > 500) {
  22. timer = 500;
  23. }
  24.  
  25. #endif
  26. }
  27.  
  28. if (ngx_use_accept_mutex) {
  29. // 为了一定的公平性,避免反复争抢锁
  30. if (ngx_accept_disabled > 0) {
  31. ngx_accept_disabled--;
  32.  
  33. } else {
  34. // 只有抢到锁的进程,进行 socket 的 accept() 操作
  35. // 其他worker则处理之前接入的请求,read/write操作
  36. if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
  37. return;
  38. }
  39.  
  40. if (ngx_accept_mutex_held) {
  41. flags |= NGX_POST_EVENTS;
  42.  
  43. } else {
  44. if (timer == NGX_TIMER_INFINITE
  45. || timer > ngx_accept_mutex_delay)
  46. {
  47. timer = ngx_accept_mutex_delay;
  48. }
  49. }
  50. }
  51. }
  52. // 其他核心事务处理
  53. if (!ngx_queue_empty(&ngx_posted_next_events)) {
  54. ngx_event_move_posted_next(cycle);
  55. timer = 0;
  56. }
  57.  
  58. delta = ngx_current_msec;
  59.  
  60. (void) ngx_process_events(cycle, timer, flags);
  61.  
  62. delta = ngx_current_msec - delta;
  63.  
  64. ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
  65. "timer delta: %M", delta);
  66.  
  67. ngx_event_process_posted(cycle, &ngx_posted_accept_events);
  68.  
  69. if (ngx_accept_mutex_held) {
  70. ngx_shmtx_unlock(&ngx_accept_mutex);
  71. }
  72.  
  73. if (delta) {
  74. ngx_event_expire_timers();
  75. }
  76.  
  77. ngx_event_process_posted(cycle, &ngx_posted_events);
  78. }
  79. // 获取锁,并注册socket accept() 过程如下
  80. ngx_int_t
  81. ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
  82. {
  83. if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
  84.  
  85. ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
  86. "accept mutex locked");
  87.  
  88. if (ngx_accept_mutex_held && ngx_accept_events == 0) {
  89. return NGX_OK;
  90. }
  91.  
  92. if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
  93. // 解锁操作
  94. ngx_shmtx_unlock(&ngx_accept_mutex);
  95. return NGX_ERROR;
  96. }
  97.  
  98. ngx_accept_events = 0;
  99. ngx_accept_mutex_held = 1;
  100.  
  101. return NGX_OK;
  102. }
  103.  
  104. ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
  105. "accept mutex lock failed: %ui", ngx_accept_mutex_held);
  106.  
  107. if (ngx_accept_mutex_held) {
  108. if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
  109. return NGX_ERROR;
  110. }
  111.  
  112. ngx_accept_mutex_held = 0;
  113. }
  114.  
  115. return NGX_OK;
  116. }

其他的不必多说,核心即抢到锁的worker,才可以进行accept操作。而没有抢到锁的worker, 则要主动释放之前的accept()权力。从而达到,同一时刻,只有一个worker在处理accept事件。

二、入门级锁使用

锁这种东西,一般都是编程语言自己定义好的接口,或者固定用法。

比如 java 中的 synchronized xxx, Lock 相关并发包锁如 CountDownLatch, CyclicBarrier, ReentrantLock, ReentrantReadWriteLock, Semaphore...

比如 python 中的 threading.Lock(), threading.RLock()...

比如 php 中的 flock()...

之所以说是入门级,是因为这都是些接口api, 你只要按照使用规范,调一下就可以了,无需更多知识。但要想用好各细节,则实际不简单。

三、nginx进程锁的实现

nginx因为是使用C语言编写的,所以肯定是更接近底层些的。能够通过它的实现,来看锁如何实现,应该能够让我们更能理解锁的深层次含义。

一般地,锁包含这么几个大方向:锁数据结构定义,上锁逻辑,解锁逻辑,以及一些通知机制,超时机制什么的。下面我们就其中几个方向,看下nginx 实现:

3.1、锁的数据结构

首先要定义出锁有些什么变量,然后实例化一个值,共享给多进程使用。

  1. // event/ngx_event.c
  2. // 全局accept锁变量定义
  3. ngx_shmtx_t ngx_accept_mutex;
  4. // 这个锁有一个
  5. // atomic 使用 volatile 修饰实现
  6. typedef volatile ngx_atomic_uint_t ngx_atomic_t;
  7. typedef struct {
  8. #if (NGX_HAVE_ATOMIC_OPS)
  9. // 有使用原子更新变量实现锁,其背后是共享内存区域
  10. ngx_atomic_t *lock;
  11. #if (NGX_HAVE_POSIX_SEM)
  12. ngx_atomic_t *wait;
  13. ngx_uint_t semaphore;
  14. sem_t sem;
  15. #endif
  16. #else
  17. // 有使用fd实现锁,fd的背后是一个文件实例
  18. ngx_fd_t fd;
  19. u_char *name;
  20. #endif
  21. ngx_uint_t spin;
  22. } ngx_shmtx_t;
  23. // 共享内存数据结构定义
  24. typedef struct {
  25. u_char *addr;
  26. size_t size;
  27. ngx_str_t name;
  28. ngx_log_t *log;
  29. ngx_uint_t exists; /* unsigned exists:1; */
  30. } ngx_shm_t;

3.2、基于fd的上锁/解锁实现

有了锁实例,就可以对其进行上锁解锁了。nginx有两种锁实现,主要是基于平台的差异性决定的:基于文件或者基于共享内在实现。基于fd即基于文件的实现,这个还是有点重的操作。如下:

  1. // ngx_shmtx.c
  2. ngx_uint_t
  3. ngx_shmtx_trylock(ngx_shmtx_t *mtx)
  4. {
  5. ngx_err_t err;
  6.  
  7. err = ngx_trylock_fd(mtx->fd);
  8.  
  9. if (err == 0) {
  10. return 1;
  11. }
  12.  
  13. if (err == NGX_EAGAIN) {
  14. return 0;
  15. }
  16.  
  17. #if __osf__ /* Tru64 UNIX */
  18.  
  19. if (err == NGX_EACCES) {
  20. return 0;
  21. }
  22.  
  23. #endif
  24.  
  25. ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);
  26.  
  27. return 0;
  28. }
  29. // core/ngx_shmtx.c
  30. // 1. 上锁过程
  31. ngx_err_t
  32. ngx_trylock_fd(ngx_fd_t fd)
  33. {
  34. struct flock fl;
  35.  
  36. ngx_memzero(&fl, sizeof(struct flock));
  37. fl.l_type = F_WRLCK;
  38. fl.l_whence = SEEK_SET;
  39.  
  40. if (fcntl(fd, F_SETLK, &fl) == -1) {
  41. return ngx_errno;
  42. }
  43.  
  44. return 0;
  45. }
  46. // os/unix/ngx_file.c
  47. ngx_err_t
  48. ngx_lock_fd(ngx_fd_t fd)
  49. {
  50. struct flock fl;
  51.  
  52. ngx_memzero(&fl, sizeof(struct flock));
  53. fl.l_type = F_WRLCK;
  54. fl.l_whence = SEEK_SET;
  55. // 调用系统提供的上锁方法
  56. if (fcntl(fd, F_SETLKW, &fl) == -1) {
  57. return ngx_errno;
  58. }
  59.  
  60. return 0;
  61. }
  62.  
  63. // 2. 解锁实现
  64. // core/ngx_shmtx.c
  65. void
  66. ngx_shmtx_unlock(ngx_shmtx_t *mtx)
  67. {
  68. ngx_err_t err;
  69.  
  70. err = ngx_unlock_fd(mtx->fd);
  71.  
  72. if (err == 0) {
  73. return;
  74. }
  75.  
  76. ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);
  77. }
  78. // os/unix/ngx_file.c
  79. ngx_err_t
  80. ngx_unlock_fd(ngx_fd_t fd)
  81. {
  82. struct flock fl;
  83.  
  84. ngx_memzero(&fl, sizeof(struct flock));
  85. fl.l_type = F_UNLCK;
  86. fl.l_whence = SEEK_SET;
  87.  
  88. if (fcntl(fd, F_SETLK, &fl) == -1) {
  89. return ngx_errno;
  90. }
  91.  
  92. return 0;
  93. }

重点就是 fcntl() 这个系统api的调用,无他。当然,站在一个旁观者角度来看,实际就是因为多进程对文件的操作是可见的,所以达到进程锁的目的。其中,tryLock 和 lock 存在一定的语义差异,即try时,会得到一些是否成功的标识,而直接进行lock时,则不能得到标识。一般会要求阻塞住请求

3.3、nginx锁实例的初始化

也许在有些地方,一个锁实例的初始化,就是一个变量的简单赋值而已。但在nginx有些不同。首先,需要保证各worker能看到相同的实例或者相当的实例。因为worker是从master处fork()出来的进程,所以只要在master中实例化好的锁,必然可以保证各worker能拿到一样的值。那么,到底是不是只是这样呢?

  1. // 共享锁的初始化,在ngx master 中进行,后fork()到worker进程
  2. // event/ngx_event.c
  3. static ngx_int_t
  4. ngx_event_module_init(ngx_cycle_t *cycle)
  5. {
  6. void ***cf;
  7. u_char *shared;
  8. size_t size, cl;
  9. // 定义一段共享内存
  10. ngx_shm_t shm;
  11. ngx_time_t *tp;
  12. ngx_core_conf_t *ccf;
  13. ngx_event_conf_t *ecf;
  14.  
  15. cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module);
  16. ecf = (*cf)[ngx_event_core_module.ctx_index];
  17.  
  18. if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) {
  19. ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
  20. "using the \"%s\" event method", ecf->name);
  21. }
  22.  
  23. ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
  24.  
  25. ngx_timer_resolution = ccf->timer_resolution;
  26.  
  27. #if !(NGX_WIN32)
  28. {
  29. ngx_int_t limit;
  30. struct rlimit rlmt;
  31.  
  32. if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
  33. ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
  34. "getrlimit(RLIMIT_NOFILE) failed, ignored");
  35.  
  36. } else {
  37. if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur
  38. && (ccf->rlimit_nofile == NGX_CONF_UNSET
  39. || ecf->connections > (ngx_uint_t) ccf->rlimit_nofile))
  40. {
  41. limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ?
  42. (ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;
  43.  
  44. ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
  45. "%ui worker_connections exceed "
  46. "open file resource limit: %i",
  47. ecf->connections, limit);
  48. }
  49. }
  50. }
  51. #endif /* !(NGX_WIN32) */
  52.  
  53.  
  54. if (ccf->master == 0) {
  55. return NGX_OK;
  56. }
  57.  
  58. if (ngx_accept_mutex_ptr) {
  59. return NGX_OK;
  60. }
  61.  
  62.  
  63. /* cl should be equal to or greater than cache line size */
  64.  
  65. cl = 128;
  66.  
  67. size = cl /* ngx_accept_mutex */
  68. + cl /* ngx_connection_counter */
  69. + cl; /* ngx_temp_number */
  70.  
  71. #if (NGX_STAT_STUB)
  72.  
  73. size += cl /* ngx_stat_accepted */
  74. + cl /* ngx_stat_handled */
  75. + cl /* ngx_stat_requests */
  76. + cl /* ngx_stat_active */
  77. + cl /* ngx_stat_reading */
  78. + cl /* ngx_stat_writing */
  79. + cl; /* ngx_stat_waiting */
  80.  
  81. #endif
  82.  
  83. shm.size = size;
  84. ngx_str_set(&shm.name, "nginx_shared_zone");
  85. shm.log = cycle->log;
  86. // 分配共享内存空间, 使用 mmap 实现
  87. if (ngx_shm_alloc(&shm) != NGX_OK) {
  88. return NGX_ERROR;
  89. }
  90.  
  91. shared = shm.addr;
  92.  
  93. ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
  94. ngx_accept_mutex.spin = (ngx_uint_t) -1;
  95. // 基于共享文件或者内存赋值进程锁,从而实现多进程控制
  96. if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
  97. cycle->lock_file.data)
  98. != NGX_OK)
  99. {
  100. return NGX_ERROR;
  101. }
  102.  
  103. ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);
  104.  
  105. (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);
  106.  
  107. ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
  108. "counter: %p, %uA",
  109. ngx_connection_counter, *ngx_connection_counter);
  110.  
  111. ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);
  112.  
  113. tp = ngx_timeofday();
  114.  
  115. ngx_random_number = (tp->msec << 16) + ngx_pid;
  116.  
  117. #if (NGX_STAT_STUB)
  118.  
  119. ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl);
  120. ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl);
  121. ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl);
  122. ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl);
  123. ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl);
  124. ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl);
  125. ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl);
  126.  
  127. #endif
  128.  
  129. return NGX_OK;
  130. }
  131. // core/ngx_shmtx.c
  132. // 1. 基于文件进程共享空间, 使用 fd
  133. ngx_int_t
  134. ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
  135. {
  136. // 由master进程创建,所以是进程安全的操作,各worker直接使用即可
  137. if (mtx->name) {
  138. // 如果已经创建好了,则 fd 已被赋值,不能创建了,直接共享fd即可
  139. // fd 的背后是一个文件实例
  140. if (ngx_strcmp(name, mtx->name) == 0) {
  141. mtx->name = name;
  142. return NGX_OK;
  143. }
  144.  
  145. ngx_shmtx_destroy(mtx);
  146. }
  147. // 使用文件创建的方式锁共享
  148. mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN,
  149. NGX_FILE_DEFAULT_ACCESS);
  150.  
  151. if (mtx->fd == NGX_INVALID_FILE) {
  152. ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno,
  153. ngx_open_file_n " \"%s\" failed", name);
  154. return NGX_ERROR;
  155. }
  156. // 创建完成即可删除,后续只基于该fd实例做锁操作
  157. if (ngx_delete_file(name) == NGX_FILE_ERROR) {
  158. ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
  159. ngx_delete_file_n " \"%s\" failed", name);
  160. }
  161.  
  162. mtx->name = name;
  163.  
  164. return NGX_OK;
  165. }
  166.  
  167. // 2. 基于共享内存的共享锁的创建
  168. // ngx_shmtx.c
  169. ngx_int_t
  170. ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
  171. {
  172. mtx->lock = &addr->lock;
  173.  
  174. if (mtx->spin == (ngx_uint_t) -1) {
  175. return NGX_OK;
  176. }
  177.  
  178. mtx->spin = 2048;
  179.  
  180. #if (NGX_HAVE_POSIX_SEM)
  181.  
  182. mtx->wait = &addr->wait;
  183.  
  184. if (sem_init(&mtx->sem, 1, 0) == -1) {
  185. ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
  186. "sem_init() failed");
  187. } else {
  188. mtx->semaphore = 1;
  189. }
  190.  
  191. #endif
  192.  
  193. return NGX_OK;
  194. }
  195. // os/unix/ngx_shmem.c
  196. ngx_int_t
  197. ngx_shm_alloc(ngx_shm_t *shm)
  198. {
  199. shm->addr = (u_char *) mmap(NULL, shm->size,
  200. PROT_READ|PROT_WRITE,
  201. MAP_ANON|MAP_SHARED, -1, 0);
  202.  
  203. if (shm->addr == MAP_FAILED) {
  204. ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
  205. "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
  206. return NGX_ERROR;
  207. }
  208.  
  209. return NGX_OK;
  210. }

基于fd的锁实现,本质是基于其背后的文件系统的实现,因为文件系统是进程可见的,所以对于相同fd控制,就是对共同的锁的控制了。

3.4、基于共享内存的上锁/解锁实现

所谓共享内存,实际就是一块公共的内存区域,它超出了进程的范围(受操作系统管理)。就是前面我们看到的mmap()的创建,就是一块共享内存。

  1. // ngx_shmtx.c
  2. ngx_uint_t
  3. ngx_shmtx_trylock(ngx_shmtx_t *mtx)
  4. {
  5. // 直接对共享内存区域的值进行改变
  6. // cas 改变成功即是上锁成功。
  7. return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
  8. }
  9.  
  10. // shm版本的解锁操作, cas 解析,带通知
  11. void
  12. ngx_shmtx_unlock(ngx_shmtx_t *mtx)
  13. {
  14. if (mtx->spin != (ngx_uint_t) -1) {
  15. ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
  16. }
  17.  
  18. if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
  19. ngx_shmtx_wakeup(mtx);
  20. }
  21. }
  22. // 通知等待进程
  23. static void
  24. ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
  25. {
  26. #if (NGX_HAVE_POSIX_SEM)
  27. ngx_atomic_uint_t wait;
  28.  
  29. if (!mtx->semaphore) {
  30. return;
  31. }
  32.  
  33. for ( ;; ) {
  34.  
  35. wait = *mtx->wait;
  36.  
  37. if ((ngx_atomic_int_t) wait <= 0) {
  38. return;
  39. }
  40.  
  41. if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
  42. break;
  43. }
  44. }
  45.  
  46. ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
  47. "shmtx wake %uA", wait);
  48.  
  49. if (sem_post(&mtx->sem) == -1) {
  50. ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
  51. "sem_post() failed while wake shmtx");
  52. }
  53.  
  54. #endif
  55. }

共享内存版本的锁的实现,基本就是cas的对内存变量的设置。只是这个面向的内存,是共享区域的内存。

四、 说到底锁的含义是什么

见过了许多的锁,依然过不好这一关。

锁到底是什么呢?事实上,锁就是一个标识位。当有人看到这个标识位后,就主动停止操作,或者进行等等,从而使其看起来起到了锁的作用。这个标识位,可以设置在某个对象中,也可以为设置在某个全局值中,还可以借助于各种存在介质,比如文件,比如redis,比如zk 。 这都没有差别。因为问题关键不在存放在哪里,而在于如何安全地设置这个标识位。

要实现锁,一般都需要要一个强有力的底层含义保证,比如cpu层面的cas操作,应用级别的队列串行原子操作。。。
至于什么,内存锁,文件锁,高级锁,都是有各自的应用场景。而要选好各种锁,则变成了评价高低地关键。此时此刻,你应该能判断出来的!

以上就是详解nginx进程锁的实现的详细内容,更多关于nginx 进程锁的资料请关注w3xue其它相关文章!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号