Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/base_futures.py
12 views
1
__all__ = ()
2
3
import reprlib
4
5
from . import format_helpers
6
7
# States for Future.
8
_PENDING = 'PENDING'
9
_CANCELLED = 'CANCELLED'
10
_FINISHED = 'FINISHED'
11
12
13
def isfuture(obj):
14
"""Check for a Future.
15
16
This returns True when obj is a Future instance or is advertising
17
itself as duck-type compatible by setting _asyncio_future_blocking.
18
See comment in Future for more details.
19
"""
20
return (hasattr(obj.__class__, '_asyncio_future_blocking') and
21
obj._asyncio_future_blocking is not None)
22
23
24
def _format_callbacks(cb):
25
"""helper function for Future.__repr__"""
26
size = len(cb)
27
if not size:
28
cb = ''
29
30
def format_cb(callback):
31
return format_helpers._format_callback_source(callback, ())
32
33
if size == 1:
34
cb = format_cb(cb[0][0])
35
elif size == 2:
36
cb = '{}, {}'.format(format_cb(cb[0][0]), format_cb(cb[1][0]))
37
elif size > 2:
38
cb = '{}, <{} more>, {}'.format(format_cb(cb[0][0]),
39
size - 2,
40
format_cb(cb[-1][0]))
41
return f'cb=[{cb}]'
42
43
44
def _future_repr_info(future):
45
# (Future) -> str
46
"""helper function for Future.__repr__"""
47
info = [future._state.lower()]
48
if future._state == _FINISHED:
49
if future._exception is not None:
50
info.append(f'exception={future._exception!r}')
51
else:
52
# use reprlib to limit the length of the output, especially
53
# for very long strings
54
result = reprlib.repr(future._result)
55
info.append(f'result={result}')
56
if future._callbacks:
57
info.append(_format_callbacks(future._callbacks))
58
if future._source_traceback:
59
frame = future._source_traceback[-1]
60
info.append(f'created at {frame[0]}:{frame[1]}')
61
return info
62
63
64
@reprlib.recursive_repr()
65
def _future_repr(future):
66
info = ' '.join(_future_repr_info(future))
67
return f'<{future.__class__.__name__} {info}>'
68
69