Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/_py_abc.py
12 views
1
from _weakrefset import WeakSet
2
3
4
def get_cache_token():
5
"""Returns the current ABC cache token.
6
7
The token is an opaque object (supporting equality testing) identifying the
8
current version of the ABC cache for virtual subclasses. The token changes
9
with every call to ``register()`` on any ABC.
10
"""
11
return ABCMeta._abc_invalidation_counter
12
13
14
class ABCMeta(type):
15
"""Metaclass for defining Abstract Base Classes (ABCs).
16
17
Use this metaclass to create an ABC. An ABC can be subclassed
18
directly, and then acts as a mix-in class. You can also register
19
unrelated concrete classes (even built-in classes) and unrelated
20
ABCs as 'virtual subclasses' -- these and their descendants will
21
be considered subclasses of the registering ABC by the built-in
22
issubclass() function, but the registering ABC won't show up in
23
their MRO (Method Resolution Order) nor will method
24
implementations defined by the registering ABC be callable (not
25
even via super()).
26
"""
27
28
# A global counter that is incremented each time a class is
29
# registered as a virtual subclass of anything. It forces the
30
# negative cache to be cleared before its next use.
31
# Note: this counter is private. Use `abc.get_cache_token()` for
32
# external code.
33
_abc_invalidation_counter = 0
34
35
def __new__(mcls, name, bases, namespace, /, **kwargs):
36
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
37
# Compute set of abstract method names
38
abstracts = {name
39
for name, value in namespace.items()
40
if getattr(value, "__isabstractmethod__", False)}
41
for base in bases:
42
for name in getattr(base, "__abstractmethods__", set()):
43
value = getattr(cls, name, None)
44
if getattr(value, "__isabstractmethod__", False):
45
abstracts.add(name)
46
cls.__abstractmethods__ = frozenset(abstracts)
47
# Set up inheritance registry
48
cls._abc_registry = WeakSet()
49
cls._abc_cache = WeakSet()
50
cls._abc_negative_cache = WeakSet()
51
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
52
return cls
53
54
def register(cls, subclass):
55
"""Register a virtual subclass of an ABC.
56
57
Returns the subclass, to allow usage as a class decorator.
58
"""
59
if not isinstance(subclass, type):
60
raise TypeError("Can only register classes")
61
if issubclass(subclass, cls):
62
return subclass # Already a subclass
63
# Subtle: test for cycles *after* testing for "already a subclass";
64
# this means we allow X.register(X) and interpret it as a no-op.
65
if issubclass(cls, subclass):
66
# This would create a cycle, which is bad for the algorithm below
67
raise RuntimeError("Refusing to create an inheritance cycle")
68
cls._abc_registry.add(subclass)
69
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
70
return subclass
71
72
def _dump_registry(cls, file=None):
73
"""Debug helper to print the ABC registry."""
74
print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
75
print(f"Inv. counter: {get_cache_token()}", file=file)
76
for name in cls.__dict__:
77
if name.startswith("_abc_"):
78
value = getattr(cls, name)
79
if isinstance(value, WeakSet):
80
value = set(value)
81
print(f"{name}: {value!r}", file=file)
82
83
def _abc_registry_clear(cls):
84
"""Clear the registry (for debugging or testing)."""
85
cls._abc_registry.clear()
86
87
def _abc_caches_clear(cls):
88
"""Clear the caches (for debugging or testing)."""
89
cls._abc_cache.clear()
90
cls._abc_negative_cache.clear()
91
92
def __instancecheck__(cls, instance):
93
"""Override for isinstance(instance, cls)."""
94
# Inline the cache checking
95
subclass = instance.__class__
96
if subclass in cls._abc_cache:
97
return True
98
subtype = type(instance)
99
if subtype is subclass:
100
if (cls._abc_negative_cache_version ==
101
ABCMeta._abc_invalidation_counter and
102
subclass in cls._abc_negative_cache):
103
return False
104
# Fall back to the subclass check.
105
return cls.__subclasscheck__(subclass)
106
return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
107
108
def __subclasscheck__(cls, subclass):
109
"""Override for issubclass(subclass, cls)."""
110
if not isinstance(subclass, type):
111
raise TypeError('issubclass() arg 1 must be a class')
112
# Check cache
113
if subclass in cls._abc_cache:
114
return True
115
# Check negative cache; may have to invalidate
116
if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
117
# Invalidate the negative cache
118
cls._abc_negative_cache = WeakSet()
119
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
120
elif subclass in cls._abc_negative_cache:
121
return False
122
# Check the subclass hook
123
ok = cls.__subclasshook__(subclass)
124
if ok is not NotImplemented:
125
assert isinstance(ok, bool)
126
if ok:
127
cls._abc_cache.add(subclass)
128
else:
129
cls._abc_negative_cache.add(subclass)
130
return ok
131
# Check if it's a direct subclass
132
if cls in getattr(subclass, '__mro__', ()):
133
cls._abc_cache.add(subclass)
134
return True
135
# Check if it's a subclass of a registered class (recursive)
136
for rcls in cls._abc_registry:
137
if issubclass(subclass, rcls):
138
cls._abc_cache.add(subclass)
139
return True
140
# Check if it's a subclass of a subclass (recursive)
141
for scls in cls.__subclasses__():
142
if issubclass(subclass, scls):
143
cls._abc_cache.add(subclass)
144
return True
145
# No dice; update negative cache
146
cls._abc_negative_cache.add(subclass)
147
return False
148
149