Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/futures.py
12 views
1
"""A Future class similar to the one in PEP 3148."""
2
3
__all__ = (
4
'Future', 'wrap_future', 'isfuture',
5
)
6
7
import concurrent.futures
8
import contextvars
9
import logging
10
import sys
11
from types import GenericAlias
12
13
from . import base_futures
14
from . import events
15
from . import exceptions
16
from . import format_helpers
17
18
19
isfuture = base_futures.isfuture
20
21
22
_PENDING = base_futures._PENDING
23
_CANCELLED = base_futures._CANCELLED
24
_FINISHED = base_futures._FINISHED
25
26
27
STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
28
29
30
class Future:
31
"""This class is *almost* compatible with concurrent.futures.Future.
32
33
Differences:
34
35
- This class is not thread-safe.
36
37
- result() and exception() do not take a timeout argument and
38
raise an exception when the future isn't done yet.
39
40
- Callbacks registered with add_done_callback() are always called
41
via the event loop's call_soon().
42
43
- This class is not compatible with the wait() and as_completed()
44
methods in the concurrent.futures package.
45
46
(In Python 3.4 or later we may be able to unify the implementations.)
47
"""
48
49
# Class variables serving as defaults for instance variables.
50
_state = _PENDING
51
_result = None
52
_exception = None
53
_loop = None
54
_source_traceback = None
55
_cancel_message = None
56
# A saved CancelledError for later chaining as an exception context.
57
_cancelled_exc = None
58
59
# This field is used for a dual purpose:
60
# - Its presence is a marker to declare that a class implements
61
# the Future protocol (i.e. is intended to be duck-type compatible).
62
# The value must also be not-None, to enable a subclass to declare
63
# that it is not compatible by setting this to None.
64
# - It is set by __iter__() below so that Task._step() can tell
65
# the difference between
66
# `await Future()` or`yield from Future()` (correct) vs.
67
# `yield Future()` (incorrect).
68
_asyncio_future_blocking = False
69
70
__log_traceback = False
71
72
def __init__(self, *, loop=None):
73
"""Initialize the future.
74
75
The optional event_loop argument allows explicitly setting the event
76
loop object used by the future. If it's not provided, the future uses
77
the default event loop.
78
"""
79
if loop is None:
80
self._loop = events.get_event_loop()
81
else:
82
self._loop = loop
83
self._callbacks = []
84
if self._loop.get_debug():
85
self._source_traceback = format_helpers.extract_stack(
86
sys._getframe(1))
87
88
def __repr__(self):
89
return base_futures._future_repr(self)
90
91
def __del__(self):
92
if not self.__log_traceback:
93
# set_exception() was not called, or result() or exception()
94
# has consumed the exception
95
return
96
exc = self._exception
97
context = {
98
'message':
99
f'{self.__class__.__name__} exception was never retrieved',
100
'exception': exc,
101
'future': self,
102
}
103
if self._source_traceback:
104
context['source_traceback'] = self._source_traceback
105
self._loop.call_exception_handler(context)
106
107
__class_getitem__ = classmethod(GenericAlias)
108
109
@property
110
def _log_traceback(self):
111
return self.__log_traceback
112
113
@_log_traceback.setter
114
def _log_traceback(self, val):
115
if val:
116
raise ValueError('_log_traceback can only be set to False')
117
self.__log_traceback = False
118
119
def get_loop(self):
120
"""Return the event loop the Future is bound to."""
121
loop = self._loop
122
if loop is None:
123
raise RuntimeError("Future object is not initialized.")
124
return loop
125
126
def _make_cancelled_error(self):
127
"""Create the CancelledError to raise if the Future is cancelled.
128
129
This should only be called once when handling a cancellation since
130
it erases the saved context exception value.
131
"""
132
if self._cancelled_exc is not None:
133
exc = self._cancelled_exc
134
self._cancelled_exc = None
135
return exc
136
137
if self._cancel_message is None:
138
exc = exceptions.CancelledError()
139
else:
140
exc = exceptions.CancelledError(self._cancel_message)
141
exc.__context__ = self._cancelled_exc
142
# Remove the reference since we don't need this anymore.
143
self._cancelled_exc = None
144
return exc
145
146
def cancel(self, msg=None):
147
"""Cancel the future and schedule callbacks.
148
149
If the future is already done or cancelled, return False. Otherwise,
150
change the future's state to cancelled, schedule the callbacks and
151
return True.
152
"""
153
self.__log_traceback = False
154
if self._state != _PENDING:
155
return False
156
self._state = _CANCELLED
157
self._cancel_message = msg
158
self.__schedule_callbacks()
159
return True
160
161
def __schedule_callbacks(self):
162
"""Internal: Ask the event loop to call all callbacks.
163
164
The callbacks are scheduled to be called as soon as possible. Also
165
clears the callback list.
166
"""
167
callbacks = self._callbacks[:]
168
if not callbacks:
169
return
170
171
self._callbacks[:] = []
172
for callback, ctx in callbacks:
173
self._loop.call_soon(callback, self, context=ctx)
174
175
def cancelled(self):
176
"""Return True if the future was cancelled."""
177
return self._state == _CANCELLED
178
179
# Don't implement running(); see http://bugs.python.org/issue18699
180
181
def done(self):
182
"""Return True if the future is done.
183
184
Done means either that a result / exception are available, or that the
185
future was cancelled.
186
"""
187
return self._state != _PENDING
188
189
def result(self):
190
"""Return the result this future represents.
191
192
If the future has been cancelled, raises CancelledError. If the
193
future's result isn't yet available, raises InvalidStateError. If
194
the future is done and has an exception set, this exception is raised.
195
"""
196
if self._state == _CANCELLED:
197
exc = self._make_cancelled_error()
198
raise exc
199
if self._state != _FINISHED:
200
raise exceptions.InvalidStateError('Result is not ready.')
201
self.__log_traceback = False
202
if self._exception is not None:
203
raise self._exception.with_traceback(self._exception_tb)
204
return self._result
205
206
def exception(self):
207
"""Return the exception that was set on this future.
208
209
The exception (or None if no exception was set) is returned only if
210
the future is done. If the future has been cancelled, raises
211
CancelledError. If the future isn't done yet, raises
212
InvalidStateError.
213
"""
214
if self._state == _CANCELLED:
215
exc = self._make_cancelled_error()
216
raise exc
217
if self._state != _FINISHED:
218
raise exceptions.InvalidStateError('Exception is not set.')
219
self.__log_traceback = False
220
return self._exception
221
222
def add_done_callback(self, fn, *, context=None):
223
"""Add a callback to be run when the future becomes done.
224
225
The callback is called with a single argument - the future object. If
226
the future is already done when this is called, the callback is
227
scheduled with call_soon.
228
"""
229
if self._state != _PENDING:
230
self._loop.call_soon(fn, self, context=context)
231
else:
232
if context is None:
233
context = contextvars.copy_context()
234
self._callbacks.append((fn, context))
235
236
# New method not in PEP 3148.
237
238
def remove_done_callback(self, fn):
239
"""Remove all instances of a callback from the "call when done" list.
240
241
Returns the number of callbacks removed.
242
"""
243
filtered_callbacks = [(f, ctx)
244
for (f, ctx) in self._callbacks
245
if f != fn]
246
removed_count = len(self._callbacks) - len(filtered_callbacks)
247
if removed_count:
248
self._callbacks[:] = filtered_callbacks
249
return removed_count
250
251
# So-called internal methods (note: no set_running_or_notify_cancel()).
252
253
def set_result(self, result):
254
"""Mark the future done and set its result.
255
256
If the future is already done when this method is called, raises
257
InvalidStateError.
258
"""
259
if self._state != _PENDING:
260
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
261
self._result = result
262
self._state = _FINISHED
263
self.__schedule_callbacks()
264
265
def set_exception(self, exception):
266
"""Mark the future done and set an exception.
267
268
If the future is already done when this method is called, raises
269
InvalidStateError.
270
"""
271
if self._state != _PENDING:
272
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
273
if isinstance(exception, type):
274
exception = exception()
275
if type(exception) is StopIteration:
276
raise TypeError("StopIteration interacts badly with generators "
277
"and cannot be raised into a Future")
278
self._exception = exception
279
self._exception_tb = exception.__traceback__
280
self._state = _FINISHED
281
self.__schedule_callbacks()
282
self.__log_traceback = True
283
284
def __await__(self):
285
if not self.done():
286
self._asyncio_future_blocking = True
287
yield self # This tells Task to wait for completion.
288
if not self.done():
289
raise RuntimeError("await wasn't used with future")
290
return self.result() # May raise too.
291
292
__iter__ = __await__ # make compatible with 'yield from'.
293
294
295
# Needed for testing purposes.
296
_PyFuture = Future
297
298
299
def _get_loop(fut):
300
# Tries to call Future.get_loop() if it's available.
301
# Otherwise fallbacks to using the old '_loop' property.
302
try:
303
get_loop = fut.get_loop
304
except AttributeError:
305
pass
306
else:
307
return get_loop()
308
return fut._loop
309
310
311
def _set_result_unless_cancelled(fut, result):
312
"""Helper setting the result only if the future was not cancelled."""
313
if fut.cancelled():
314
return
315
fut.set_result(result)
316
317
318
def _convert_future_exc(exc):
319
exc_class = type(exc)
320
if exc_class is concurrent.futures.CancelledError:
321
return exceptions.CancelledError(*exc.args)
322
elif exc_class is concurrent.futures.TimeoutError:
323
return exceptions.TimeoutError(*exc.args)
324
elif exc_class is concurrent.futures.InvalidStateError:
325
return exceptions.InvalidStateError(*exc.args)
326
else:
327
return exc
328
329
330
def _set_concurrent_future_state(concurrent, source):
331
"""Copy state from a future to a concurrent.futures.Future."""
332
assert source.done()
333
if source.cancelled():
334
concurrent.cancel()
335
if not concurrent.set_running_or_notify_cancel():
336
return
337
exception = source.exception()
338
if exception is not None:
339
concurrent.set_exception(_convert_future_exc(exception))
340
else:
341
result = source.result()
342
concurrent.set_result(result)
343
344
345
def _copy_future_state(source, dest):
346
"""Internal helper to copy state from another Future.
347
348
The other Future may be a concurrent.futures.Future.
349
"""
350
assert source.done()
351
if dest.cancelled():
352
return
353
assert not dest.done()
354
if source.cancelled():
355
dest.cancel()
356
else:
357
exception = source.exception()
358
if exception is not None:
359
dest.set_exception(_convert_future_exc(exception))
360
else:
361
result = source.result()
362
dest.set_result(result)
363
364
365
def _chain_future(source, destination):
366
"""Chain two futures so that when one completes, so does the other.
367
368
The result (or exception) of source will be copied to destination.
369
If destination is cancelled, source gets cancelled too.
370
Compatible with both asyncio.Future and concurrent.futures.Future.
371
"""
372
if not isfuture(source) and not isinstance(source,
373
concurrent.futures.Future):
374
raise TypeError('A future is required for source argument')
375
if not isfuture(destination) and not isinstance(destination,
376
concurrent.futures.Future):
377
raise TypeError('A future is required for destination argument')
378
source_loop = _get_loop(source) if isfuture(source) else None
379
dest_loop = _get_loop(destination) if isfuture(destination) else None
380
381
def _set_state(future, other):
382
if isfuture(future):
383
_copy_future_state(other, future)
384
else:
385
_set_concurrent_future_state(future, other)
386
387
def _call_check_cancel(destination):
388
if destination.cancelled():
389
if source_loop is None or source_loop is dest_loop:
390
source.cancel()
391
else:
392
source_loop.call_soon_threadsafe(source.cancel)
393
394
def _call_set_state(source):
395
if (destination.cancelled() and
396
dest_loop is not None and dest_loop.is_closed()):
397
return
398
if dest_loop is None or dest_loop is source_loop:
399
_set_state(destination, source)
400
else:
401
if dest_loop.is_closed():
402
return
403
dest_loop.call_soon_threadsafe(_set_state, destination, source)
404
405
destination.add_done_callback(_call_check_cancel)
406
source.add_done_callback(_call_set_state)
407
408
409
def wrap_future(future, *, loop=None):
410
"""Wrap concurrent.futures.Future object."""
411
if isfuture(future):
412
return future
413
assert isinstance(future, concurrent.futures.Future), \
414
f'concurrent.futures.Future is expected, got {future!r}'
415
if loop is None:
416
loop = events.get_event_loop()
417
new_future = loop.create_future()
418
_chain_future(future, new_future)
419
return new_future
420
421
422
try:
423
import _asyncio
424
except ImportError:
425
pass
426
else:
427
# _CFuture is needed for tests.
428
Future = _CFuture = _asyncio.Future
429
430