python 线程同步(四) -- 事件对象与栅栏

2019-05-14 18:29:15   最后更新: 2019-05-14 18:51:59   访问数量:82




我们已经介绍了 python 的几种线程同步工具

Python 线程同步(一) -- 竞争条件与线程锁

python 线程同步(二) -- 条件对象

python 线程同步(三) -- 信号量

 

本文介绍的线程同步工具相比上面已经介绍过的三类工具来说,更加简单实用

 

事件的使用是线程间通信的最简单机制之一 -- 一个线程发出事件信号,另一个线程等待并响应该信号

python threading 包中提供的事件对象 Event 就是用来做这件事的

当事件对象中的标志位由 True 变为 False,所有等待在该事件上的线程都将被唤醒

因此,python 中的事件对象 Event 提供了以下方法供调用:

 

is_set

is_set()

 

返回事件标志是否为 True

 

set

set()

 

将事件内部标志位设置为 True,接着唤醒所有等待在该事件上的线程

 

clear

clear()

 

清除标志,将事件标志重置为 False,此后若干个线程又可以重新阻塞在该事件对象上

 

wait

wait(timeout=None)

 

阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时

如果是因为超时返回,则会返回 False,否则会返回 True

 

示例

下面的例子展示了所有5个线程均阻塞在一个事件对象上,直到3秒后,主线程调用 set 方法触发事件信号,可以看到所有 5 个线程均立即开始执行

import logging from threading import Thread, Event from time import sleep class EventThread(Thread): def __init__(self, event, id): super().__init__() self._event = event self._id = id def run(self): logging.info('%r start running' % self) self._event.wait() logging.info('%r continue running after event' % self) def __repr__(self): return 'EventThread(%s)' % self._id if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') event = Event() for i in range(5): thread = EventThread(event, i) thread.start() logging.info('main start sleeping') sleep(3) logging.info('main set event') event.set()

 

 

打印出了:

2019-05-14 09:15:50,626 - INFO: EventThread(0) start running

2019-05-14 09:15:50,626 - INFO: EventThread(1) start running

2019-05-14 09:15:50,626 - INFO: EventThread(2) start running

2019-05-14 09:15:50,626 - INFO: EventThread(3) start running

2019-05-14 09:15:50,626 - INFO: EventThread(4) start running

2019-05-14 09:15:50,626 - INFO: main start sleeping

2019-05-14 09:15:53,639 - INFO: main set event

2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event

2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event

2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event

2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event

2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event

 

栅栏类是另一个简单的同步原语,此前我们已经介绍过 Linux 与 Java 中的栅栏

java 线程同步工具类

 

栅栏对象用于让多个线程互相等待

他维护了一个内部的计数器,值由构造方法默认传入,每当有一个线程调用 wait 方法,则该值原子地减 1,直到减到 0,则让所有阻塞 wait 在该栅栏对象上的线程继续执行

 

 

构造方法

Barrier(parties, action=None, timeout=None)

 

  • 构造方法必须传入一个值,就是我们上文所说的计数初始值
  • 如果提供了可调用的 action 参数,它会在所有线程被释放时在其中一个线程中自动调用 action 方法
  • timeout 是默认的超时时间,如果没有在 wait() 方法中指定超时时间,则会使用该值作为超时时间

 

wait

wait(timeout=None)

 

栅栏对象中最重要的方法就是 wait 方法了

线程阻塞等待,直到构造方法传入的 parties 个线程均阻塞等待在 wait 方法或超时,如果该方法传入的超时时间为 None,则使用构造方法传入的默认超时

一旦超时发生,栅栏将立即进入破损状态,此时其他仍阻塞等待该栅栏的线程将收到 wait 方法抛出的 BrokenBarrierError 异常

如果试图在已破损的栅栏对象上调用 wait 方法,也会立即抛出 BrokenBarrierError 异常

返回一个数字,值为 0 到 parties - 1,解释器保证了所有等待在同一个栅栏上的线程中,每一个的返回值都不同,以便让你可以依赖 wait 方法的返回值来做一些处理

如果创建栅栏对象时在构造函数中提供了 action 参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入破损状态

 

reset

reset()

 

重置栅栏为默认的初始态

如果栅栏中仍有线程等待释放,这些线程将会收到 BrokenBarrierError 异常

除非非常必要,否则并不建议使用该方法,很多时候与其重用一个状态未知的栅栏,不如新建一个

 

abort

abort()

 

使栅栏进入破损态

这将导致所有已经调用和未来调用的 wait() 方法中引发 BrokenBarrierError 异常

 

栅栏对象的属性

  • parties -- 冲出栅栏所需要的线程数量
  • n_waiting -- 当前时刻正在栅栏中阻塞的线程数量
  • broken -- 一个布尔值,值为 True 表明栅栏为破损态

 

示例

栅栏的使用虽然简单,但却十分实用,在实际环境中,我们通常需要并发调用很多业务方的接口,并收集他们的返回,然后在所有接口均返回后再进行下一步处理

但并不是所有接口的调用都是必须的,因此对于该场景,一个必要的优化方式是一旦收集到必要接口的返回,立即中断其他接口的调用,并开始这之后的操作

上述需求如果使用栅栏来解决会显得非常简单而优雅,虽然 Python 中我们并不能在线程外终止线程,但我们可以通过栅栏的 abort 方法让那些尚未执行结束的线程一旦执行结束即抛出异常,从而让我们不需要去关注他们

下面的例子模拟了上面描述的过程

import logging import random from threading import Thread, Barrier from time import sleep, time class InterfaceThread(Thread): def __init__(self, majorbarrier, minorbarrier, id, major): super().__init__() self._majorbarrier = majorbarrier self._minorbarrier = minorbarrier self._id = id self._major = major def run(self): nsec = random.uniform(0, 4) logging.info('%r start running sleep %s' % (self, nsec)) sleep(nsec) logging.info('%r after sleeping' % self) if self._major: try: result = self._majorbarrier.wait() if result == 0: self._minorbarrier.abort() except: logging.error('%s waitting on majorbarrier aborted' % self) return else: try: self._minorbarrier.wait() except: logging.warning('%s watting on minorbarrier aborted' % self) return logging.info('%r continue running after barrier' % self) def __repr__(self): return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') start = time() majorbarrier = Barrier(4) minorbarrier = Barrier(3) threads = list() for i in range(6): threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3))) for thread in threads: thread.start() result = majorbarrier.wait() if result == 0: minorbarrier.abort() logging.info('run by %s' % (time() - start))

 

 

上面的例子中创建了两个栅栏对象,分别用来同步必要接口调用与非必要接口调用,我们通过随机 sleep 0 到 4 秒来模拟接口调用

一旦必要栅栏的 wait 方法返回 0,则意味着必要接口已全部返回,此时可以通过调用非必要栅栏的 abort 方法来破坏非必要栅栏,同时程序继续执行,从而实现整体运行时间的最大限度缩短

 

打印出了:

2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334

2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607

2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644

2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875

2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653

2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372

2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping

2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping

2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping

2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping

2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier

2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535

2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier

2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier

2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted

2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping

2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted

2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping

2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted

 

可以看到,并发调用六个线程,按照 sleep 时间,应该在 3.5451267 秒以上

而实际上,由于重要线程均以完成,主线程只用 2.284111976623535 秒便已返回

这样,我们就实现了接口性能的大幅提升,但线程 1、2 由于 sleep 时间过长,没有能够在主线程返回前返回

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






thread      线程同步      事件      并发      event      barrier     


京ICP备15018585号