将 python 生成器改造为上下文管理器

2019-04-28 13:42:42   最后更新: 2019-04-30 09:12:45   访问数量:112




上一篇文章中,我们介绍了 python 中的迭代器与生成器

python 中的迭代器与生成器

 

此前的文章中,我们已经看过上下文管理器的例子

python 魔术方法(四)非常用方法与运算符重载方法大合集

 

本文我们通过分析标准库中 contextlib.contextmanager 装饰器的源码,来看看如何让他们结合起来生成更加优雅的代码

 

 

class Test: def __enter__(self): print('now in __enter__') return 'Hello World' def __exit__(self, exc_type, exc_val, exc_tb): print('now exit') return True if __name__ == '__main__': test = Test() with test as teststr: print(teststr) print('end of main')

 

 

调用打印出了:

now in __enter__

Hello World

now exit

end of main

 

当 with 块被执行时,解释器会自动调用对象的 __enter__ 方法

而在 with 块结束时,解释器则会自动调用对象的 __exit__ 方法,__exit__ 方法最终可以选择返回 True 或抛出异常

 

标准库中,contextlib.contextmanager 装饰器通过 yield 关键字可以减少创建上下文管理器的样板代码量

上面的例子可以改造为:

import contextlib class Test: @contextlib.contextmanager def contextmanager(self): print('now in __enter__') yield 'Hello World' print('now exit') return True if __name__ == '__main__': test = Test() with test.contextmanager() as teststr: print(teststr) print('end of main')

 

 

同样打印出了:

now in __enter__

Hello World

now exit

end of main

 

本质上 contextlib.contextmanager 仍然是利用了 yield 生成器的特性,他将函数包装并增加了 __enter__ 与 __exit__ 两个方法

def contextmanager(func): @wraps(func) def helper(*args, **kwds): return _GeneratorContextManager(func, args, kwds) return helper class _GeneratorContextManager(): def __init__(self, func, args, kwds): self.gen = func(*args, **kwds) self.func, self.args, self.kwds = func, args, kwds # Issue 19330: ensure context manager instances have good docstrings doc = getattr(func, "__doc__", None) if doc is None: doc = type(self).__doc__ self.__doc__ = doc def __enter__(self): try: return next(self.gen) except StopIteration: raise RuntimeError("generator didn't yield") from None def __exit__(self, type, value, traceback): if type is None: try: next(self.gen) except StopIteration: return False else: raise RuntimeError("generator didn't stop") else: if value is None: # Need to force instantiation so we can reliably # tell if we get the same exception back value = type() try: self.gen.throw(type, value, traceback) except StopIteration as exc: # Suppress StopIteration *unless* it's the same exception that # was passed to throw(). This prevents a StopIteration # raised inside the "with" statement from being suppressed. return exc is not value except RuntimeError as exc: # Don't re-raise the passed in exception. (issue27122) if exc is value: return False # Likewise, avoid suppressing if a StopIteration exception # was passed to throw() and later wrapped into a RuntimeError # (see PEP 479). if type is StopIteration and exc.__cause__ is value: return False raise except: # only re-raise if it's *not* the exception that was # passed to throw(), because __exit__() must not raise # an exception unless __exit__() itself failed. But throw() # has to raise the exception to signal propagation, so this # fixes the impedance mismatch between the throw() protocol # and the __exit__() protocol. # if sys.exc_info()[1] is value: return False raise raise RuntimeError("generator didn't stop after throw()")

 

 

可以看到,__enter__ 方法实现的比较简单,仅仅是通过 next 方法获取了生成器的首个生成的数据

__exit__ 方法则相对复杂:

  1. 检查有没有把异常传给 exc_type;如果有,调用 gen.throw(exception),在生成器函数定义体中包含 yield 关键字的那一行抛出异常
  2. 通过 next 方法调用生成器,执行接下来的任务
  3. 如果生成器未终止,则抛出 RuntimeError("generator didn't stop")

 

从上述代码我们可以看到一个严重的问题:__enter__ 代码是未捕获异常的,一旦我们在 with 块中抛出异常,则会导致 __exit__ 中的清理代码无法被执行

import contextlib class Test: @contextlib.contextmanager def contextmanager(self): print('now in __enter__') yield self.raiseexc(1) print('now exit') return True def raiseexc(self, param): if param < 5: raise Exception('test exception') if __name__ == '__main__': test = Test() with test.contextmanager() as teststr: print(teststr) print('end of main')

 

 

执行,打印出了:

now in __enter__

Traceback (most recent call last):

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1741, in <module>

    main()

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1735, in main

    globals = debugger.run(setup['file'], None, None, is_module)

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1135, in run

    pydev_imports.execfile(file, globals, locals)  # execute the script

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "D:/Workspace/code/python/testpython/fluentpython/contextmanager.py", line 19, in <module>

    with test.contextmanager() as teststr:

  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 81, in __enter__

    return next(self.gen)

  File "D:/Workspace/code/python/testpython/fluentpython/contextmanager.py", line 8, in contextmanager

    yield self.raiseexc(1)

  File "D:/Workspace/code/python/testpython/fluentpython/contextmanager.py", line 14, in raiseexc

    raise Exception('test exception')

Exception: test exception

 

所以,在使用 @contextlib.contextmanager 时千万要注意,不能在 yield 执行时抛出异常

import contextlib class Test: @contextlib.contextmanager def contextmanager(self): print('now in __enter__') try: yield self.raiseexc(1) except Exception: print('exception happened') print('now exit') return True def raiseexc(self, param): if param < 5: raise Exception('test exception') if __name__ == '__main__': test = Test() with test.contextmanager() as teststr: print(teststr) print('end of main')

 

 

打印出了:

now in __enter__

exception happened

now exit

Traceback (most recent call last):

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1741, in <module>

    main()

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1735, in main

    globals = debugger.run(setup['file'], None, None, is_module)

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\pydevd.py", line 1135, in run

    pydev_imports.execfile(file, globals, locals)  # execute the script

  File "C:\Program Files\JetBrains\PyCharm 2018.3.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "D:/Workspace/code/python/testpython/fluentpython/contextmanager.py", line 22, in <module>

    with test.contextmanager() as teststr:

  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 83, in __enter__

    raise RuntimeError("generator didn't yield") from None

RuntimeError: generator didn't yield

 

虽然仍然抛出了异常,但我们看到 __exit__ 方法中的清理代码仍然得以被执行

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






技术帖      python      技术分享      生成器      迭代器      上下文管理器      yield     


京ICP备15018585号