Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/format_helpers.py
12 views
1
import functools
2
import inspect
3
import reprlib
4
import sys
5
import traceback
6
7
from . import constants
8
9
10
def _get_function_source(func):
11
func = inspect.unwrap(func)
12
if inspect.isfunction(func):
13
code = func.__code__
14
return (code.co_filename, code.co_firstlineno)
15
if isinstance(func, functools.partial):
16
return _get_function_source(func.func)
17
if isinstance(func, functools.partialmethod):
18
return _get_function_source(func.func)
19
return None
20
21
22
def _format_callback_source(func, args):
23
func_repr = _format_callback(func, args, None)
24
source = _get_function_source(func)
25
if source:
26
func_repr += f' at {source[0]}:{source[1]}'
27
return func_repr
28
29
30
def _format_args_and_kwargs(args, kwargs):
31
"""Format function arguments and keyword arguments.
32
33
Special case for a single parameter: ('hello',) is formatted as ('hello').
34
"""
35
# use reprlib to limit the length of the output
36
items = []
37
if args:
38
items.extend(reprlib.repr(arg) for arg in args)
39
if kwargs:
40
items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
41
return '({})'.format(', '.join(items))
42
43
44
def _format_callback(func, args, kwargs, suffix=''):
45
if isinstance(func, functools.partial):
46
suffix = _format_args_and_kwargs(args, kwargs) + suffix
47
return _format_callback(func.func, func.args, func.keywords, suffix)
48
49
if hasattr(func, '__qualname__') and func.__qualname__:
50
func_repr = func.__qualname__
51
elif hasattr(func, '__name__') and func.__name__:
52
func_repr = func.__name__
53
else:
54
func_repr = repr(func)
55
56
func_repr += _format_args_and_kwargs(args, kwargs)
57
if suffix:
58
func_repr += suffix
59
return func_repr
60
61
62
def extract_stack(f=None, limit=None):
63
"""Replacement for traceback.extract_stack() that only does the
64
necessary work for asyncio debug mode.
65
"""
66
if f is None:
67
f = sys._getframe().f_back
68
if limit is None:
69
# Limit the amount of work to a reasonable amount, as extract_stack()
70
# can be called for each coroutine and future in debug mode.
71
limit = constants.DEBUG_STACK_DEPTH
72
stack = traceback.StackSummary.extract(traceback.walk_stack(f),
73
limit=limit,
74
lookup_lines=False)
75
stack.reverse()
76
return stack
77
78