Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/contextlib.py
12 views
1
"""Utilities for with-statement contexts. See PEP 343."""
2
import abc
3
import os
4
import sys
5
import _collections_abc
6
from collections import deque
7
from functools import wraps
8
from types import MethodType, GenericAlias
9
10
__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext",
11
"AbstractContextManager", "AbstractAsyncContextManager",
12
"AsyncExitStack", "ContextDecorator", "ExitStack",
13
"redirect_stdout", "redirect_stderr", "suppress", "aclosing",
14
"chdir"]
15
16
17
class AbstractContextManager(abc.ABC):
18
19
"""An abstract base class for context managers."""
20
21
__class_getitem__ = classmethod(GenericAlias)
22
23
def __enter__(self):
24
"""Return `self` upon entering the runtime context."""
25
return self
26
27
@abc.abstractmethod
28
def __exit__(self, exc_type, exc_value, traceback):
29
"""Raise any exception triggered within the runtime context."""
30
return None
31
32
@classmethod
33
def __subclasshook__(cls, C):
34
if cls is AbstractContextManager:
35
return _collections_abc._check_methods(C, "__enter__", "__exit__")
36
return NotImplemented
37
38
39
class AbstractAsyncContextManager(abc.ABC):
40
41
"""An abstract base class for asynchronous context managers."""
42
43
__class_getitem__ = classmethod(GenericAlias)
44
45
async def __aenter__(self):
46
"""Return `self` upon entering the runtime context."""
47
return self
48
49
@abc.abstractmethod
50
async def __aexit__(self, exc_type, exc_value, traceback):
51
"""Raise any exception triggered within the runtime context."""
52
return None
53
54
@classmethod
55
def __subclasshook__(cls, C):
56
if cls is AbstractAsyncContextManager:
57
return _collections_abc._check_methods(C, "__aenter__",
58
"__aexit__")
59
return NotImplemented
60
61
62
class ContextDecorator(object):
63
"A base class or mixin that enables context managers to work as decorators."
64
65
def _recreate_cm(self):
66
"""Return a recreated instance of self.
67
68
Allows an otherwise one-shot context manager like
69
_GeneratorContextManager to support use as
70
a decorator via implicit recreation.
71
72
This is a private interface just for _GeneratorContextManager.
73
See issue #11647 for details.
74
"""
75
return self
76
77
def __call__(self, func):
78
@wraps(func)
79
def inner(*args, **kwds):
80
with self._recreate_cm():
81
return func(*args, **kwds)
82
return inner
83
84
85
class AsyncContextDecorator(object):
86
"A base class or mixin that enables async context managers to work as decorators."
87
88
def _recreate_cm(self):
89
"""Return a recreated instance of self.
90
"""
91
return self
92
93
def __call__(self, func):
94
@wraps(func)
95
async def inner(*args, **kwds):
96
async with self._recreate_cm():
97
return await func(*args, **kwds)
98
return inner
99
100
101
class _GeneratorContextManagerBase:
102
"""Shared functionality for @contextmanager and @asynccontextmanager."""
103
104
def __init__(self, func, args, kwds):
105
self.gen = func(*args, **kwds)
106
self.func, self.args, self.kwds = func, args, kwds
107
# Issue 19330: ensure context manager instances have good docstrings
108
doc = getattr(func, "__doc__", None)
109
if doc is None:
110
doc = type(self).__doc__
111
self.__doc__ = doc
112
# Unfortunately, this still doesn't provide good help output when
113
# inspecting the created context manager instances, since pydoc
114
# currently bypasses the instance docstring and shows the docstring
115
# for the class instead.
116
# See http://bugs.python.org/issue19404 for more details.
117
118
def _recreate_cm(self):
119
# _GCMB instances are one-shot context managers, so the
120
# CM must be recreated each time a decorated function is
121
# called
122
return self.__class__(self.func, self.args, self.kwds)
123
124
125
class _GeneratorContextManager(
126
_GeneratorContextManagerBase,
127
AbstractContextManager,
128
ContextDecorator,
129
):
130
"""Helper for @contextmanager decorator."""
131
132
def __enter__(self):
133
# do not keep args and kwds alive unnecessarily
134
# they are only needed for recreation, which is not possible anymore
135
del self.args, self.kwds, self.func
136
try:
137
return next(self.gen)
138
except StopIteration:
139
raise RuntimeError("generator didn't yield") from None
140
141
def __exit__(self, typ, value, traceback):
142
if typ is None:
143
try:
144
next(self.gen)
145
except StopIteration:
146
return False
147
else:
148
raise RuntimeError("generator didn't stop")
149
else:
150
if value is None:
151
# Need to force instantiation so we can reliably
152
# tell if we get the same exception back
153
value = typ()
154
try:
155
self.gen.throw(value)
156
except StopIteration as exc:
157
# Suppress StopIteration *unless* it's the same exception that
158
# was passed to throw(). This prevents a StopIteration
159
# raised inside the "with" statement from being suppressed.
160
return exc is not value
161
except RuntimeError as exc:
162
# Don't re-raise the passed in exception. (issue27122)
163
if exc is value:
164
exc.__traceback__ = traceback
165
return False
166
# Avoid suppressing if a StopIteration exception
167
# was passed to throw() and later wrapped into a RuntimeError
168
# (see PEP 479 for sync generators; async generators also
169
# have this behavior). But do this only if the exception wrapped
170
# by the RuntimeError is actually Stop(Async)Iteration (see
171
# issue29692).
172
if (
173
isinstance(value, StopIteration)
174
and exc.__cause__ is value
175
):
176
value.__traceback__ = traceback
177
return False
178
raise
179
except BaseException as exc:
180
# only re-raise if it's *not* the exception that was
181
# passed to throw(), because __exit__() must not raise
182
# an exception unless __exit__() itself failed. But throw()
183
# has to raise the exception to signal propagation, so this
184
# fixes the impedance mismatch between the throw() protocol
185
# and the __exit__() protocol.
186
if exc is not value:
187
raise
188
exc.__traceback__ = traceback
189
return False
190
raise RuntimeError("generator didn't stop after throw()")
191
192
class _AsyncGeneratorContextManager(
193
_GeneratorContextManagerBase,
194
AbstractAsyncContextManager,
195
AsyncContextDecorator,
196
):
197
"""Helper for @asynccontextmanager decorator."""
198
199
async def __aenter__(self):
200
# do not keep args and kwds alive unnecessarily
201
# they are only needed for recreation, which is not possible anymore
202
del self.args, self.kwds, self.func
203
try:
204
return await anext(self.gen)
205
except StopAsyncIteration:
206
raise RuntimeError("generator didn't yield") from None
207
208
async def __aexit__(self, typ, value, traceback):
209
if typ is None:
210
try:
211
await anext(self.gen)
212
except StopAsyncIteration:
213
return False
214
else:
215
raise RuntimeError("generator didn't stop")
216
else:
217
if value is None:
218
# Need to force instantiation so we can reliably
219
# tell if we get the same exception back
220
value = typ()
221
try:
222
await self.gen.athrow(value)
223
except StopAsyncIteration as exc:
224
# Suppress StopIteration *unless* it's the same exception that
225
# was passed to throw(). This prevents a StopIteration
226
# raised inside the "with" statement from being suppressed.
227
return exc is not value
228
except RuntimeError as exc:
229
# Don't re-raise the passed in exception. (issue27122)
230
if exc is value:
231
exc.__traceback__ = traceback
232
return False
233
# Avoid suppressing if a Stop(Async)Iteration exception
234
# was passed to athrow() and later wrapped into a RuntimeError
235
# (see PEP 479 for sync generators; async generators also
236
# have this behavior). But do this only if the exception wrapped
237
# by the RuntimeError is actually Stop(Async)Iteration (see
238
# issue29692).
239
if (
240
isinstance(value, (StopIteration, StopAsyncIteration))
241
and exc.__cause__ is value
242
):
243
value.__traceback__ = traceback
244
return False
245
raise
246
except BaseException as exc:
247
# only re-raise if it's *not* the exception that was
248
# passed to throw(), because __exit__() must not raise
249
# an exception unless __exit__() itself failed. But throw()
250
# has to raise the exception to signal propagation, so this
251
# fixes the impedance mismatch between the throw() protocol
252
# and the __exit__() protocol.
253
if exc is not value:
254
raise
255
exc.__traceback__ = traceback
256
return False
257
raise RuntimeError("generator didn't stop after athrow()")
258
259
260
def contextmanager(func):
261
"""@contextmanager decorator.
262
263
Typical usage:
264
265
@contextmanager
266
def some_generator(<arguments>):
267
<setup>
268
try:
269
yield <value>
270
finally:
271
<cleanup>
272
273
This makes this:
274
275
with some_generator(<arguments>) as <variable>:
276
<body>
277
278
equivalent to this:
279
280
<setup>
281
try:
282
<variable> = <value>
283
<body>
284
finally:
285
<cleanup>
286
"""
287
@wraps(func)
288
def helper(*args, **kwds):
289
return _GeneratorContextManager(func, args, kwds)
290
return helper
291
292
293
def asynccontextmanager(func):
294
"""@asynccontextmanager decorator.
295
296
Typical usage:
297
298
@asynccontextmanager
299
async def some_async_generator(<arguments>):
300
<setup>
301
try:
302
yield <value>
303
finally:
304
<cleanup>
305
306
This makes this:
307
308
async with some_async_generator(<arguments>) as <variable>:
309
<body>
310
311
equivalent to this:
312
313
<setup>
314
try:
315
<variable> = <value>
316
<body>
317
finally:
318
<cleanup>
319
"""
320
@wraps(func)
321
def helper(*args, **kwds):
322
return _AsyncGeneratorContextManager(func, args, kwds)
323
return helper
324
325
326
class closing(AbstractContextManager):
327
"""Context to automatically close something at the end of a block.
328
329
Code like this:
330
331
with closing(<module>.open(<arguments>)) as f:
332
<block>
333
334
is equivalent to this:
335
336
f = <module>.open(<arguments>)
337
try:
338
<block>
339
finally:
340
f.close()
341
342
"""
343
def __init__(self, thing):
344
self.thing = thing
345
def __enter__(self):
346
return self.thing
347
def __exit__(self, *exc_info):
348
self.thing.close()
349
350
351
class aclosing(AbstractAsyncContextManager):
352
"""Async context manager for safely finalizing an asynchronously cleaned-up
353
resource such as an async generator, calling its ``aclose()`` method.
354
355
Code like this:
356
357
async with aclosing(<module>.fetch(<arguments>)) as agen:
358
<block>
359
360
is equivalent to this:
361
362
agen = <module>.fetch(<arguments>)
363
try:
364
<block>
365
finally:
366
await agen.aclose()
367
368
"""
369
def __init__(self, thing):
370
self.thing = thing
371
async def __aenter__(self):
372
return self.thing
373
async def __aexit__(self, *exc_info):
374
await self.thing.aclose()
375
376
377
class _RedirectStream(AbstractContextManager):
378
379
_stream = None
380
381
def __init__(self, new_target):
382
self._new_target = new_target
383
# We use a list of old targets to make this CM re-entrant
384
self._old_targets = []
385
386
def __enter__(self):
387
self._old_targets.append(getattr(sys, self._stream))
388
setattr(sys, self._stream, self._new_target)
389
return self._new_target
390
391
def __exit__(self, exctype, excinst, exctb):
392
setattr(sys, self._stream, self._old_targets.pop())
393
394
395
class redirect_stdout(_RedirectStream):
396
"""Context manager for temporarily redirecting stdout to another file.
397
398
# How to send help() to stderr
399
with redirect_stdout(sys.stderr):
400
help(dir)
401
402
# How to write help() to a file
403
with open('help.txt', 'w') as f:
404
with redirect_stdout(f):
405
help(pow)
406
"""
407
408
_stream = "stdout"
409
410
411
class redirect_stderr(_RedirectStream):
412
"""Context manager for temporarily redirecting stderr to another file."""
413
414
_stream = "stderr"
415
416
417
class suppress(AbstractContextManager):
418
"""Context manager to suppress specified exceptions
419
420
After the exception is suppressed, execution proceeds with the next
421
statement following the with statement.
422
423
with suppress(FileNotFoundError):
424
os.remove(somefile)
425
# Execution still resumes here if the file was already removed
426
"""
427
428
def __init__(self, *exceptions):
429
self._exceptions = exceptions
430
431
def __enter__(self):
432
pass
433
434
def __exit__(self, exctype, excinst, exctb):
435
# Unlike isinstance and issubclass, CPython exception handling
436
# currently only looks at the concrete type hierarchy (ignoring
437
# the instance and subclass checking hooks). While Guido considers
438
# that a bug rather than a feature, it's a fairly hard one to fix
439
# due to various internal implementation details. suppress provides
440
# the simpler issubclass based semantics, rather than trying to
441
# exactly reproduce the limitations of the CPython interpreter.
442
#
443
# See http://bugs.python.org/issue12029 for more details
444
if exctype is None:
445
return
446
if issubclass(exctype, self._exceptions):
447
return True
448
if issubclass(exctype, ExceptionGroup):
449
match, rest = excinst.split(self._exceptions)
450
if rest is None:
451
return True
452
raise rest
453
return False
454
455
456
class _BaseExitStack:
457
"""A base class for ExitStack and AsyncExitStack."""
458
459
@staticmethod
460
def _create_exit_wrapper(cm, cm_exit):
461
return MethodType(cm_exit, cm)
462
463
@staticmethod
464
def _create_cb_wrapper(callback, /, *args, **kwds):
465
def _exit_wrapper(exc_type, exc, tb):
466
callback(*args, **kwds)
467
return _exit_wrapper
468
469
def __init__(self):
470
self._exit_callbacks = deque()
471
472
def pop_all(self):
473
"""Preserve the context stack by transferring it to a new instance."""
474
new_stack = type(self)()
475
new_stack._exit_callbacks = self._exit_callbacks
476
self._exit_callbacks = deque()
477
return new_stack
478
479
def push(self, exit):
480
"""Registers a callback with the standard __exit__ method signature.
481
482
Can suppress exceptions the same way __exit__ method can.
483
Also accepts any object with an __exit__ method (registering a call
484
to the method instead of the object itself).
485
"""
486
# We use an unbound method rather than a bound method to follow
487
# the standard lookup behaviour for special methods.
488
_cb_type = type(exit)
489
490
try:
491
exit_method = _cb_type.__exit__
492
except AttributeError:
493
# Not a context manager, so assume it's a callable.
494
self._push_exit_callback(exit)
495
else:
496
self._push_cm_exit(exit, exit_method)
497
return exit # Allow use as a decorator.
498
499
def enter_context(self, cm):
500
"""Enters the supplied context manager.
501
502
If successful, also pushes its __exit__ method as a callback and
503
returns the result of the __enter__ method.
504
"""
505
# We look up the special methods on the type to match the with
506
# statement.
507
cls = type(cm)
508
try:
509
_enter = cls.__enter__
510
_exit = cls.__exit__
511
except AttributeError:
512
raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
513
f"not support the context manager protocol") from None
514
result = _enter(cm)
515
self._push_cm_exit(cm, _exit)
516
return result
517
518
def callback(self, callback, /, *args, **kwds):
519
"""Registers an arbitrary callback and arguments.
520
521
Cannot suppress exceptions.
522
"""
523
_exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds)
524
525
# We changed the signature, so using @wraps is not appropriate, but
526
# setting __wrapped__ may still help with introspection.
527
_exit_wrapper.__wrapped__ = callback
528
self._push_exit_callback(_exit_wrapper)
529
return callback # Allow use as a decorator
530
531
def _push_cm_exit(self, cm, cm_exit):
532
"""Helper to correctly register callbacks to __exit__ methods."""
533
_exit_wrapper = self._create_exit_wrapper(cm, cm_exit)
534
self._push_exit_callback(_exit_wrapper, True)
535
536
def _push_exit_callback(self, callback, is_sync=True):
537
self._exit_callbacks.append((is_sync, callback))
538
539
540
# Inspired by discussions on http://bugs.python.org/issue13585
541
class ExitStack(_BaseExitStack, AbstractContextManager):
542
"""Context manager for dynamic management of a stack of exit callbacks.
543
544
For example:
545
with ExitStack() as stack:
546
files = [stack.enter_context(open(fname)) for fname in filenames]
547
# All opened files will automatically be closed at the end of
548
# the with statement, even if attempts to open files later
549
# in the list raise an exception.
550
"""
551
552
def __enter__(self):
553
return self
554
555
def __exit__(self, *exc_details):
556
received_exc = exc_details[0] is not None
557
558
# We manipulate the exception state so it behaves as though
559
# we were actually nesting multiple with statements
560
frame_exc = sys.exc_info()[1]
561
def _fix_exception_context(new_exc, old_exc):
562
# Context may not be correct, so find the end of the chain
563
while 1:
564
exc_context = new_exc.__context__
565
if exc_context is None or exc_context is old_exc:
566
# Context is already set correctly (see issue 20317)
567
return
568
if exc_context is frame_exc:
569
break
570
new_exc = exc_context
571
# Change the end of the chain to point to the exception
572
# we expect it to reference
573
new_exc.__context__ = old_exc
574
575
# Callbacks are invoked in LIFO order to match the behaviour of
576
# nested context managers
577
suppressed_exc = False
578
pending_raise = False
579
while self._exit_callbacks:
580
is_sync, cb = self._exit_callbacks.pop()
581
assert is_sync
582
try:
583
if cb(*exc_details):
584
suppressed_exc = True
585
pending_raise = False
586
exc_details = (None, None, None)
587
except:
588
new_exc_details = sys.exc_info()
589
# simulate the stack of exceptions by setting the context
590
_fix_exception_context(new_exc_details[1], exc_details[1])
591
pending_raise = True
592
exc_details = new_exc_details
593
if pending_raise:
594
try:
595
# bare "raise exc_details[1]" replaces our carefully
596
# set-up context
597
fixed_ctx = exc_details[1].__context__
598
raise exc_details[1]
599
except BaseException:
600
exc_details[1].__context__ = fixed_ctx
601
raise
602
return received_exc and suppressed_exc
603
604
def close(self):
605
"""Immediately unwind the context stack."""
606
self.__exit__(None, None, None)
607
608
609
# Inspired by discussions on https://bugs.python.org/issue29302
610
class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager):
611
"""Async context manager for dynamic management of a stack of exit
612
callbacks.
613
614
For example:
615
async with AsyncExitStack() as stack:
616
connections = [await stack.enter_async_context(get_connection())
617
for i in range(5)]
618
# All opened connections will automatically be released at the
619
# end of the async with statement, even if attempts to open a
620
# connection later in the list raise an exception.
621
"""
622
623
@staticmethod
624
def _create_async_exit_wrapper(cm, cm_exit):
625
return MethodType(cm_exit, cm)
626
627
@staticmethod
628
def _create_async_cb_wrapper(callback, /, *args, **kwds):
629
async def _exit_wrapper(exc_type, exc, tb):
630
await callback(*args, **kwds)
631
return _exit_wrapper
632
633
async def enter_async_context(self, cm):
634
"""Enters the supplied async context manager.
635
636
If successful, also pushes its __aexit__ method as a callback and
637
returns the result of the __aenter__ method.
638
"""
639
cls = type(cm)
640
try:
641
_enter = cls.__aenter__
642
_exit = cls.__aexit__
643
except AttributeError:
644
raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
645
f"not support the asynchronous context manager protocol"
646
) from None
647
result = await _enter(cm)
648
self._push_async_cm_exit(cm, _exit)
649
return result
650
651
def push_async_exit(self, exit):
652
"""Registers a coroutine function with the standard __aexit__ method
653
signature.
654
655
Can suppress exceptions the same way __aexit__ method can.
656
Also accepts any object with an __aexit__ method (registering a call
657
to the method instead of the object itself).
658
"""
659
_cb_type = type(exit)
660
try:
661
exit_method = _cb_type.__aexit__
662
except AttributeError:
663
# Not an async context manager, so assume it's a coroutine function
664
self._push_exit_callback(exit, False)
665
else:
666
self._push_async_cm_exit(exit, exit_method)
667
return exit # Allow use as a decorator
668
669
def push_async_callback(self, callback, /, *args, **kwds):
670
"""Registers an arbitrary coroutine function and arguments.
671
672
Cannot suppress exceptions.
673
"""
674
_exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds)
675
676
# We changed the signature, so using @wraps is not appropriate, but
677
# setting __wrapped__ may still help with introspection.
678
_exit_wrapper.__wrapped__ = callback
679
self._push_exit_callback(_exit_wrapper, False)
680
return callback # Allow use as a decorator
681
682
async def aclose(self):
683
"""Immediately unwind the context stack."""
684
await self.__aexit__(None, None, None)
685
686
def _push_async_cm_exit(self, cm, cm_exit):
687
"""Helper to correctly register coroutine function to __aexit__
688
method."""
689
_exit_wrapper = self._create_async_exit_wrapper(cm, cm_exit)
690
self._push_exit_callback(_exit_wrapper, False)
691
692
async def __aenter__(self):
693
return self
694
695
async def __aexit__(self, *exc_details):
696
received_exc = exc_details[0] is not None
697
698
# We manipulate the exception state so it behaves as though
699
# we were actually nesting multiple with statements
700
frame_exc = sys.exc_info()[1]
701
def _fix_exception_context(new_exc, old_exc):
702
# Context may not be correct, so find the end of the chain
703
while 1:
704
exc_context = new_exc.__context__
705
if exc_context is None or exc_context is old_exc:
706
# Context is already set correctly (see issue 20317)
707
return
708
if exc_context is frame_exc:
709
break
710
new_exc = exc_context
711
# Change the end of the chain to point to the exception
712
# we expect it to reference
713
new_exc.__context__ = old_exc
714
715
# Callbacks are invoked in LIFO order to match the behaviour of
716
# nested context managers
717
suppressed_exc = False
718
pending_raise = False
719
while self._exit_callbacks:
720
is_sync, cb = self._exit_callbacks.pop()
721
try:
722
if is_sync:
723
cb_suppress = cb(*exc_details)
724
else:
725
cb_suppress = await cb(*exc_details)
726
727
if cb_suppress:
728
suppressed_exc = True
729
pending_raise = False
730
exc_details = (None, None, None)
731
except:
732
new_exc_details = sys.exc_info()
733
# simulate the stack of exceptions by setting the context
734
_fix_exception_context(new_exc_details[1], exc_details[1])
735
pending_raise = True
736
exc_details = new_exc_details
737
if pending_raise:
738
try:
739
# bare "raise exc_details[1]" replaces our carefully
740
# set-up context
741
fixed_ctx = exc_details[1].__context__
742
raise exc_details[1]
743
except BaseException:
744
exc_details[1].__context__ = fixed_ctx
745
raise
746
return received_exc and suppressed_exc
747
748
749
class nullcontext(AbstractContextManager, AbstractAsyncContextManager):
750
"""Context manager that does no additional processing.
751
752
Used as a stand-in for a normal context manager, when a particular
753
block of code is only sometimes used with a normal context manager:
754
755
cm = optional_cm if condition else nullcontext()
756
with cm:
757
# Perform operation, using optional_cm if condition is True
758
"""
759
760
def __init__(self, enter_result=None):
761
self.enter_result = enter_result
762
763
def __enter__(self):
764
return self.enter_result
765
766
def __exit__(self, *excinfo):
767
pass
768
769
async def __aenter__(self):
770
return self.enter_result
771
772
async def __aexit__(self, *excinfo):
773
pass
774
775
776
class chdir(AbstractContextManager):
777
"""Non thread-safe context manager to change the current working directory."""
778
779
def __init__(self, path):
780
self.path = path
781
self._old_cwd = []
782
783
def __enter__(self):
784
self._old_cwd.append(os.getcwd())
785
os.chdir(self.path)
786
787
def __exit__(self, *excinfo):
788
os.chdir(self._old_cwd.pop())
789
790