python 线程同步(三) -- 信号量

2019-05-12 23:46:06   最后更新: 2019-05-13 00:09:14   访问数量:103




上两篇文章中,我们详细介绍了 Python 中的两种线程同步方式 -- 锁与条件对象

Python 线程同步(一) -- 竞争条件与线程锁

python 线程同步(二) -- 条件对象

 

本文我们来介绍一个计算机科学史上最为古老和经典的线程同步原语之一 -- 信号量

 

我们此前已经介绍过 Linux 的信号量与 Java 中的信号量

信号量

java 线程同步工具类

 

 

信号量是操作系统中的一个经典线程同步原语,实际上他是带有计数功能的互斥锁,用来保护某个只允许指定数量操作的资源

信号量与锁机制非常类似,但他维护了一个内部的计数值,每次加锁原子性的将计数值减1,返回不为负则意味着加锁成功,否则加回1并阻塞等待直到被唤醒,而解锁时则在信号量计数上进行加1操作

一般来说,对计数值的修改是通过 CAS 操作实现的

CAS 思想与 java 原子操作的实现

 

python 标准库中的 threading 包中实现了信号量对象

 

构造方法

该对象的构造方法有一个参数 value 用于初始化上文所述的信号量内的计数值,默认为 1

threading.Semaphore(value=1)

 

value 的取值

  • 当 value 传入大于 1,这是最为常用的用法,用来限制最多 value 个线程可以同时共享资源
  • 当 value 传入为 1 时,信号量退化为了一个普通的线程锁,虽然这是默认行为,但与 threading 中提供的锁对象相比,通过信号量实现基本的线程锁虽然在使用方式上是一样的,但其执行效率要低一些,因此不建议这样使用
  • 当 value 传入 0 时,所有试图加锁的线程都将阻塞在该信号量对象上,但 Python 允许不经加锁直接调用解锁方法来增加计数值,但这通常是错误的用法,应该避免这样使用
  • 当 value 传入小于 0 时,会抛出 ValueError 异常

 

加锁

acquire(blocking=True, timeout=None)

 

加锁方法的执行逻辑我们已经在上面有过详细介绍

Python 信号量的加锁方法允许传入两个参数,分别表示是否阻塞,与最长等待时间(秒数)

加锁成功则返回 True

 

解锁

release()

 

解锁方法就是将信号量中的计数器加 1,如果计数器的原值为 0,则唤醒所有阻塞在该信号量上的线程

与普通的锁对象不同,Python 中的信号量允许在未加锁的情况下调用 release 方法来让计数器加 1

import logging from threading import Thread, Semaphore class SemaphoreTestThread(Thread): def __init__(self, id, semaphore): super().__init__() self.id = id self.semaphore = semaphore def run(self) -> None: logging.info('%r start running' % self) try: while self.semaphore.acquire(): logging.info('%r hold the semaphore' % self) finally: self.semaphore.release() def __repr__(self): return 'SemaphoreTestThread(%s)' % self.id if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') semaphore = Semaphore(0) for i in range(10): thread = SemaphoreTestThread(i, semaphore) thread.start() logging.info('all the threads are running') for i in range(5): logging.info('add 1 on the semaphore') semaphore.release()

 

 

打印出了:

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running

2019-05-12 22:12:24,016 - INFO: all the threads are running

2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore

2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore

2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore

2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore

2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore

2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore

 

可以看到,我们创建了 10 个线程并启动,但由于信号量的初始计数为 0,因此所有 10 个线程在启动后均立即阻塞等待在信号量上

我们的主线程在未对信号量加锁的情况下直接调用了 release 方法,这并没有报错,而是激活了 10 个线程中的某个线程运行

 

上面的例子中,我们看到,Python 中的信号量允许我们在未加锁的情况下直接调用解锁方法来让信号量内计数器值加 1,这似乎让构造方法传入的 value 值失去了他的价值

Python 中存在另一种信号量,他与我们上面讲解的信号量仅有一点区别,那就是当 release 方法试图将计数器增加到大于构造方法传入的 value 值时,会抛出 ValueError 异常

因此,在通常使用中 Semaphore 与 BoundedSemaphore 并没有什么区别

 

我们把上文未经加锁即解锁例子中的信号量改为 BoundedSemaphore 再来试一下:

import logging from threading import Thread, BoundedSemaphore class SemaphoreTestThread(Thread): def __init__(self, id, semaphore): super().__init__() self.id = id self.semaphore = semaphore def run(self) -> None: logging.info('%r start running' % self) try: while self.semaphore.acquire(): logging.info('%r hold the semaphore' % self) finally: self.semaphore.release() def __repr__(self): return 'SemaphoreTestThread(%s)' % self.id if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') semaphore = BoundedSemaphore(0) for i in range(10): thread = SemaphoreTestThread(i, semaphore) thread.start() logging.info('all the threads are running') for i in range(5): logging.info('add 1 on the semaphore') semaphore.release()

 

 

打印出了:

2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running

2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running

2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running

2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running

2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running

2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running

2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running

2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running

2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running

2019-05-13 00:08:35,060 - INFO: all the threads are running

2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore

Traceback (most recent call last):

  File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module>

2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running

    main()

  File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main

    globals = debugger.run(setup['file'], None, None, is_module)

  File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run

    pydev_imports.execfile(file, globals, locals)  # execute the script

  File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module>

    semaphore.release()

  File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release

    raise ValueError("Semaphore released too many times")

ValueError: Semaphore released too many times

 

from threading import BoundedSemaphore, Semaphore class PooledDB: def __init__(self, creator, minconnections, maxconnections, *args, **kwargs): self._args, self._kwargs = args, kwargs self._creator = creator self._minconnections = minconnections self._maxconnections = maxconnections self._max_semaphore = Semaphore(maxconnections) self._min_semaphore = BoundedSemaphore(minconnections) self._idle_cache = [] idle = [self.get_connection() for _ in range(minconnections)] while idle: idle.pop().close() def get_connection(self, timeout=None): hold = self._max_semaphore.acquire(timeout=timeout) if hold: hold = self._min_semaphore.acquire(blocking=False) if hold: return self._idle_cache.pop(0) else: return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs) return None def returnConnect(self, connection): try: self._min_semaphore.release() self._idle_cache.append(connection) except ValueError: connection.close(True) finally: self._max_semaphore.release() class PooledConnection: def __init__(self, creator, pool, *args, **kwargs): self._pool = pool self._creator = creator self._con = self._creator.connect(args, kwargs) def close(self, force_close=False): if force_close: self._con.close() else: self._pool.returnConnect(self)

 

 

这只是一个用于示例的简易 DB 连接池实现,同时,对于连接类 PooledConnection 我们省略了 begin、commit、rollback、cursor、ping 等方法的实现,因为这些与我们本节的内容并没什么关系,只实现了连接创建方法与 close 方法,同时省略了所有的参数、边界判断,只为了让示例更加精简,很快我会写一篇详细介绍生产环境可用的 DB 连接池的源码解析,敬请期待

上面的例子中,我们的连接池构造方法拥有两个参数 -- 最大连接数和最小连接数

我们创建了两个 BoundedSemaphore 对象,分别用来限制并发环境中的最大、最小连接数

 

创建连接

初始状态下我们就已经向空闲队列中添加了最小连接数个数个空闲连接,我们看到,在创建连接时,我们先试图对最大连接数信号量进行加锁,从而保证并发环境下连接池连接数不会超过 maxconnections 值

然后,对最小连接数信号量进行了加锁,加锁成功则从空闲队列中获取连接,否则新建连接

 

关闭连接

关闭连接时,我们首先试图释放最小连接数信号量,这里就体现出了 BoundedSemaphore 的价值,一旦释放次数超过构造参数传入的 minconnections 则意味着我们的释放次数大于了加锁次数,也就是说,这个被释放连接并不是从空闲队列 _idle_cache 中取出的,而 BoundedSemaphore 在此时抛出 ValueError 异常让我们可以直接强制关闭该连接,而不是让他回到连接池

与最小连接数信号量相比,最大连接数信号量使用 Semaphore 就可以了

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






python      线程      thread      线程同步      连接池      db      信号量     


京ICP备15018585号