Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/tasks.py
12 views
1
"""Support for tasks, coroutines and the scheduler."""
2
3
__all__ = (
4
'Task', 'create_task',
5
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6
'wait', 'wait_for', 'as_completed', 'sleep',
7
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8
'current_task', 'all_tasks',
9
'create_eager_task_factory', 'eager_task_factory',
10
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
11
)
12
13
import concurrent.futures
14
import contextvars
15
import functools
16
import inspect
17
import itertools
18
import math
19
import types
20
import warnings
21
import weakref
22
from types import GenericAlias
23
24
from . import base_tasks
25
from . import coroutines
26
from . import events
27
from . import exceptions
28
from . import futures
29
from . import timeouts
30
31
# Helper to generate new task names
32
# This uses itertools.count() instead of a "+= 1" operation because the latter
33
# is not thread safe. See bpo-11866 for a longer explanation.
34
_task_name_counter = itertools.count(1).__next__
35
36
37
def current_task(loop=None):
38
"""Return a currently executed task."""
39
if loop is None:
40
loop = events.get_running_loop()
41
return _current_tasks.get(loop)
42
43
44
def all_tasks(loop=None):
45
"""Return a set of all tasks for the loop."""
46
if loop is None:
47
loop = events.get_running_loop()
48
# capturing the set of eager tasks first, so if an eager task "graduates"
49
# to a regular task in another thread, we don't risk missing it.
50
eager_tasks = list(_eager_tasks)
51
# Looping over the WeakSet isn't safe as it can be updated from another
52
# thread, therefore we cast it to list prior to filtering. The list cast
53
# itself requires iteration, so we repeat it several times ignoring
54
# RuntimeErrors (which are not very likely to occur).
55
# See issues 34970 and 36607 for details.
56
scheduled_tasks = None
57
i = 0
58
while True:
59
try:
60
scheduled_tasks = list(_scheduled_tasks)
61
except RuntimeError:
62
i += 1
63
if i >= 1000:
64
raise
65
else:
66
break
67
return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
68
if futures._get_loop(t) is loop and not t.done()}
69
70
71
class Task(futures._PyFuture): # Inherit Python Task implementation
72
# from a Python Future implementation.
73
74
"""A coroutine wrapped in a Future."""
75
76
# An important invariant maintained while a Task not done:
77
#
78
# - Either _fut_waiter is None, and _step() is scheduled;
79
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
80
#
81
# The only transition from the latter to the former is through
82
# _wakeup(). When _fut_waiter is not None, one of its callbacks
83
# must be _wakeup().
84
85
# If False, don't log a message if the task is destroyed whereas its
86
# status is still pending
87
_log_destroy_pending = True
88
89
def __init__(self, coro, *, loop=None, name=None, context=None,
90
eager_start=False):
91
super().__init__(loop=loop)
92
if self._source_traceback:
93
del self._source_traceback[-1]
94
if not coroutines.iscoroutine(coro):
95
# raise after Future.__init__(), attrs are required for __del__
96
# prevent logging for pending task in __del__
97
self._log_destroy_pending = False
98
raise TypeError(f"a coroutine was expected, got {coro!r}")
99
100
if name is None:
101
self._name = f'Task-{_task_name_counter()}'
102
else:
103
self._name = str(name)
104
105
self._num_cancels_requested = 0
106
self._must_cancel = False
107
self._fut_waiter = None
108
self._coro = coro
109
if context is None:
110
self._context = contextvars.copy_context()
111
else:
112
self._context = context
113
114
if eager_start and self._loop.is_running():
115
self.__eager_start()
116
else:
117
self._loop.call_soon(self.__step, context=self._context)
118
_register_task(self)
119
120
def __del__(self):
121
if self._state == futures._PENDING and self._log_destroy_pending:
122
context = {
123
'task': self,
124
'message': 'Task was destroyed but it is pending!',
125
}
126
if self._source_traceback:
127
context['source_traceback'] = self._source_traceback
128
self._loop.call_exception_handler(context)
129
super().__del__()
130
131
__class_getitem__ = classmethod(GenericAlias)
132
133
def __repr__(self):
134
return base_tasks._task_repr(self)
135
136
def get_coro(self):
137
return self._coro
138
139
def get_context(self):
140
return self._context
141
142
def get_name(self):
143
return self._name
144
145
def set_name(self, value):
146
self._name = str(value)
147
148
def set_result(self, result):
149
raise RuntimeError('Task does not support set_result operation')
150
151
def set_exception(self, exception):
152
raise RuntimeError('Task does not support set_exception operation')
153
154
def get_stack(self, *, limit=None):
155
"""Return the list of stack frames for this task's coroutine.
156
157
If the coroutine is not done, this returns the stack where it is
158
suspended. If the coroutine has completed successfully or was
159
cancelled, this returns an empty list. If the coroutine was
160
terminated by an exception, this returns the list of traceback
161
frames.
162
163
The frames are always ordered from oldest to newest.
164
165
The optional limit gives the maximum number of frames to
166
return; by default all available frames are returned. Its
167
meaning differs depending on whether a stack or a traceback is
168
returned: the newest frames of a stack are returned, but the
169
oldest frames of a traceback are returned. (This matches the
170
behavior of the traceback module.)
171
172
For reasons beyond our control, only one stack frame is
173
returned for a suspended coroutine.
174
"""
175
return base_tasks._task_get_stack(self, limit)
176
177
def print_stack(self, *, limit=None, file=None):
178
"""Print the stack or traceback for this task's coroutine.
179
180
This produces output similar to that of the traceback module,
181
for the frames retrieved by get_stack(). The limit argument
182
is passed to get_stack(). The file argument is an I/O stream
183
to which the output is written; by default output is written
184
to sys.stderr.
185
"""
186
return base_tasks._task_print_stack(self, limit, file)
187
188
def cancel(self, msg=None):
189
"""Request that this task cancel itself.
190
191
This arranges for a CancelledError to be thrown into the
192
wrapped coroutine on the next cycle through the event loop.
193
The coroutine then has a chance to clean up or even deny
194
the request using try/except/finally.
195
196
Unlike Future.cancel, this does not guarantee that the
197
task will be cancelled: the exception might be caught and
198
acted upon, delaying cancellation of the task or preventing
199
cancellation completely. The task may also return a value or
200
raise a different exception.
201
202
Immediately after this method is called, Task.cancelled() will
203
not return True (unless the task was already cancelled). A
204
task will be marked as cancelled when the wrapped coroutine
205
terminates with a CancelledError exception (even if cancel()
206
was not called).
207
208
This also increases the task's count of cancellation requests.
209
"""
210
self._log_traceback = False
211
if self.done():
212
return False
213
self._num_cancels_requested += 1
214
# These two lines are controversial. See discussion starting at
215
# https://github.com/python/cpython/pull/31394#issuecomment-1053545331
216
# Also remember that this is duplicated in _asynciomodule.c.
217
# if self._num_cancels_requested > 1:
218
# return False
219
if self._fut_waiter is not None:
220
if self._fut_waiter.cancel(msg=msg):
221
# Leave self._fut_waiter; it may be a Task that
222
# catches and ignores the cancellation so we may have
223
# to cancel it again later.
224
return True
225
# It must be the case that self.__step is already scheduled.
226
self._must_cancel = True
227
self._cancel_message = msg
228
return True
229
230
def cancelling(self):
231
"""Return the count of the task's cancellation requests.
232
233
This count is incremented when .cancel() is called
234
and may be decremented using .uncancel().
235
"""
236
return self._num_cancels_requested
237
238
def uncancel(self):
239
"""Decrement the task's count of cancellation requests.
240
241
This should be called by the party that called `cancel()` on the task
242
beforehand.
243
244
Returns the remaining number of cancellation requests.
245
"""
246
if self._num_cancels_requested > 0:
247
self._num_cancels_requested -= 1
248
return self._num_cancels_requested
249
250
def __eager_start(self):
251
prev_task = _swap_current_task(self._loop, self)
252
try:
253
_register_eager_task(self)
254
try:
255
self._context.run(self.__step_run_and_handle_result, None)
256
finally:
257
_unregister_eager_task(self)
258
finally:
259
try:
260
curtask = _swap_current_task(self._loop, prev_task)
261
assert curtask is self
262
finally:
263
if self.done():
264
self._coro = None
265
self = None # Needed to break cycles when an exception occurs.
266
else:
267
_register_task(self)
268
269
def __step(self, exc=None):
270
if self.done():
271
raise exceptions.InvalidStateError(
272
f'_step(): already done: {self!r}, {exc!r}')
273
if self._must_cancel:
274
if not isinstance(exc, exceptions.CancelledError):
275
exc = self._make_cancelled_error()
276
self._must_cancel = False
277
self._fut_waiter = None
278
279
_enter_task(self._loop, self)
280
try:
281
self.__step_run_and_handle_result(exc)
282
finally:
283
_leave_task(self._loop, self)
284
self = None # Needed to break cycles when an exception occurs.
285
286
def __step_run_and_handle_result(self, exc):
287
coro = self._coro
288
try:
289
if exc is None:
290
# We use the `send` method directly, because coroutines
291
# don't have `__iter__` and `__next__` methods.
292
result = coro.send(None)
293
else:
294
result = coro.throw(exc)
295
except StopIteration as exc:
296
if self._must_cancel:
297
# Task is cancelled right before coro stops.
298
self._must_cancel = False
299
super().cancel(msg=self._cancel_message)
300
else:
301
super().set_result(exc.value)
302
except exceptions.CancelledError as exc:
303
# Save the original exception so we can chain it later.
304
self._cancelled_exc = exc
305
super().cancel() # I.e., Future.cancel(self).
306
except (KeyboardInterrupt, SystemExit) as exc:
307
super().set_exception(exc)
308
raise
309
except BaseException as exc:
310
super().set_exception(exc)
311
else:
312
blocking = getattr(result, '_asyncio_future_blocking', None)
313
if blocking is not None:
314
# Yielded Future must come from Future.__iter__().
315
if futures._get_loop(result) is not self._loop:
316
new_exc = RuntimeError(
317
f'Task {self!r} got Future '
318
f'{result!r} attached to a different loop')
319
self._loop.call_soon(
320
self.__step, new_exc, context=self._context)
321
elif blocking:
322
if result is self:
323
new_exc = RuntimeError(
324
f'Task cannot await on itself: {self!r}')
325
self._loop.call_soon(
326
self.__step, new_exc, context=self._context)
327
else:
328
result._asyncio_future_blocking = False
329
result.add_done_callback(
330
self.__wakeup, context=self._context)
331
self._fut_waiter = result
332
if self._must_cancel:
333
if self._fut_waiter.cancel(
334
msg=self._cancel_message):
335
self._must_cancel = False
336
else:
337
new_exc = RuntimeError(
338
f'yield was used instead of yield from '
339
f'in task {self!r} with {result!r}')
340
self._loop.call_soon(
341
self.__step, new_exc, context=self._context)
342
343
elif result is None:
344
# Bare yield relinquishes control for one event loop iteration.
345
self._loop.call_soon(self.__step, context=self._context)
346
elif inspect.isgenerator(result):
347
# Yielding a generator is just wrong.
348
new_exc = RuntimeError(
349
f'yield was used instead of yield from for '
350
f'generator in task {self!r} with {result!r}')
351
self._loop.call_soon(
352
self.__step, new_exc, context=self._context)
353
else:
354
# Yielding something else is an error.
355
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
356
self._loop.call_soon(
357
self.__step, new_exc, context=self._context)
358
finally:
359
self = None # Needed to break cycles when an exception occurs.
360
361
def __wakeup(self, future):
362
try:
363
future.result()
364
except BaseException as exc:
365
# This may also be a cancellation.
366
self.__step(exc)
367
else:
368
# Don't pass the value of `future.result()` explicitly,
369
# as `Future.__iter__` and `Future.__await__` don't need it.
370
# If we call `_step(value, None)` instead of `_step()`,
371
# Python eval loop would use `.send(value)` method call,
372
# instead of `__next__()`, which is slower for futures
373
# that return non-generator iterators from their `__iter__`.
374
self.__step()
375
self = None # Needed to break cycles when an exception occurs.
376
377
378
_PyTask = Task
379
380
381
try:
382
import _asyncio
383
except ImportError:
384
pass
385
else:
386
# _CTask is needed for tests.
387
Task = _CTask = _asyncio.Task
388
389
390
def create_task(coro, *, name=None, context=None):
391
"""Schedule the execution of a coroutine object in a spawn task.
392
393
Return a Task object.
394
"""
395
loop = events.get_running_loop()
396
if context is None:
397
# Use legacy API if context is not needed
398
task = loop.create_task(coro)
399
else:
400
task = loop.create_task(coro, context=context)
401
402
task.set_name(name)
403
return task
404
405
406
# wait() and as_completed() similar to those in PEP 3148.
407
408
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
409
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
410
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
411
412
413
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
414
"""Wait for the Futures or Tasks given by fs to complete.
415
416
The fs iterable must not be empty.
417
418
Coroutines will be wrapped in Tasks.
419
420
Returns two sets of Future: (done, pending).
421
422
Usage:
423
424
done, pending = await asyncio.wait(fs)
425
426
Note: This does not raise TimeoutError! Futures that aren't done
427
when the timeout occurs are returned in the second set.
428
"""
429
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
430
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
431
if not fs:
432
raise ValueError('Set of Tasks/Futures is empty.')
433
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
434
raise ValueError(f'Invalid return_when value: {return_when}')
435
436
fs = set(fs)
437
438
if any(coroutines.iscoroutine(f) for f in fs):
439
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
440
441
loop = events.get_running_loop()
442
return await _wait(fs, timeout, return_when, loop)
443
444
445
def _release_waiter(waiter, *args):
446
if not waiter.done():
447
waiter.set_result(None)
448
449
450
async def wait_for(fut, timeout):
451
"""Wait for the single Future or coroutine to complete, with timeout.
452
453
Coroutine will be wrapped in Task.
454
455
Returns result of the Future or coroutine. When a timeout occurs,
456
it cancels the task and raises TimeoutError. To avoid the task
457
cancellation, wrap it in shield().
458
459
If the wait is cancelled, the task is also cancelled.
460
461
If the task supresses the cancellation and returns a value instead,
462
that value is returned.
463
464
This function is a coroutine.
465
"""
466
# The special case for timeout <= 0 is for the following case:
467
#
468
# async def test_waitfor():
469
# func_started = False
470
#
471
# async def func():
472
# nonlocal func_started
473
# func_started = True
474
#
475
# try:
476
# await asyncio.wait_for(func(), 0)
477
# except asyncio.TimeoutError:
478
# assert not func_started
479
# else:
480
# assert False
481
#
482
# asyncio.run(test_waitfor())
483
484
485
if timeout is not None and timeout <= 0:
486
fut = ensure_future(fut)
487
488
if fut.done():
489
return fut.result()
490
491
await _cancel_and_wait(fut)
492
try:
493
return fut.result()
494
except exceptions.CancelledError as exc:
495
raise TimeoutError from exc
496
497
async with timeouts.timeout(timeout):
498
return await fut
499
500
async def _wait(fs, timeout, return_when, loop):
501
"""Internal helper for wait().
502
503
The fs argument must be a collection of Futures.
504
"""
505
assert fs, 'Set of Futures is empty.'
506
waiter = loop.create_future()
507
timeout_handle = None
508
if timeout is not None:
509
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
510
counter = len(fs)
511
512
def _on_completion(f):
513
nonlocal counter
514
counter -= 1
515
if (counter <= 0 or
516
return_when == FIRST_COMPLETED or
517
return_when == FIRST_EXCEPTION and (not f.cancelled() and
518
f.exception() is not None)):
519
if timeout_handle is not None:
520
timeout_handle.cancel()
521
if not waiter.done():
522
waiter.set_result(None)
523
524
for f in fs:
525
f.add_done_callback(_on_completion)
526
527
try:
528
await waiter
529
finally:
530
if timeout_handle is not None:
531
timeout_handle.cancel()
532
for f in fs:
533
f.remove_done_callback(_on_completion)
534
535
done, pending = set(), set()
536
for f in fs:
537
if f.done():
538
done.add(f)
539
else:
540
pending.add(f)
541
return done, pending
542
543
544
async def _cancel_and_wait(fut):
545
"""Cancel the *fut* future or task and wait until it completes."""
546
547
loop = events.get_running_loop()
548
waiter = loop.create_future()
549
cb = functools.partial(_release_waiter, waiter)
550
fut.add_done_callback(cb)
551
552
try:
553
fut.cancel()
554
# We cannot wait on *fut* directly to make
555
# sure _cancel_and_wait itself is reliably cancellable.
556
await waiter
557
finally:
558
fut.remove_done_callback(cb)
559
560
561
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
562
def as_completed(fs, *, timeout=None):
563
"""Return an iterator whose values are coroutines.
564
565
When waiting for the yielded coroutines you'll get the results (or
566
exceptions!) of the original Futures (or coroutines), in the order
567
in which and as soon as they complete.
568
569
This differs from PEP 3148; the proper way to use this is:
570
571
for f in as_completed(fs):
572
result = await f # The 'await' may raise.
573
# Use result.
574
575
If a timeout is specified, the 'await' will raise
576
TimeoutError when the timeout occurs before all Futures are done.
577
578
Note: The futures 'f' are not necessarily members of fs.
579
"""
580
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
581
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
582
583
from .queues import Queue # Import here to avoid circular import problem.
584
done = Queue()
585
586
loop = events.get_event_loop()
587
todo = {ensure_future(f, loop=loop) for f in set(fs)}
588
timeout_handle = None
589
590
def _on_timeout():
591
for f in todo:
592
f.remove_done_callback(_on_completion)
593
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
594
todo.clear() # Can't do todo.remove(f) in the loop.
595
596
def _on_completion(f):
597
if not todo:
598
return # _on_timeout() was here first.
599
todo.remove(f)
600
done.put_nowait(f)
601
if not todo and timeout_handle is not None:
602
timeout_handle.cancel()
603
604
async def _wait_for_one():
605
f = await done.get()
606
if f is None:
607
# Dummy value from _on_timeout().
608
raise exceptions.TimeoutError
609
return f.result() # May raise f.exception().
610
611
for f in todo:
612
f.add_done_callback(_on_completion)
613
if todo and timeout is not None:
614
timeout_handle = loop.call_later(timeout, _on_timeout)
615
for _ in range(len(todo)):
616
yield _wait_for_one()
617
618
619
@types.coroutine
620
def __sleep0():
621
"""Skip one event loop run cycle.
622
623
This is a private helper for 'asyncio.sleep()', used
624
when the 'delay' is set to 0. It uses a bare 'yield'
625
expression (which Task.__step knows how to handle)
626
instead of creating a Future object.
627
"""
628
yield
629
630
631
async def sleep(delay, result=None):
632
"""Coroutine that completes after a given time (in seconds)."""
633
if delay <= 0:
634
await __sleep0()
635
return result
636
637
if math.isnan(delay):
638
raise ValueError("Invalid delay: NaN (not a number)")
639
640
loop = events.get_running_loop()
641
future = loop.create_future()
642
h = loop.call_later(delay,
643
futures._set_result_unless_cancelled,
644
future, result)
645
try:
646
return await future
647
finally:
648
h.cancel()
649
650
651
def ensure_future(coro_or_future, *, loop=None):
652
"""Wrap a coroutine or an awaitable in a future.
653
654
If the argument is a Future, it is returned directly.
655
"""
656
if futures.isfuture(coro_or_future):
657
if loop is not None and loop is not futures._get_loop(coro_or_future):
658
raise ValueError('The future belongs to a different loop than '
659
'the one specified as the loop argument')
660
return coro_or_future
661
should_close = True
662
if not coroutines.iscoroutine(coro_or_future):
663
if inspect.isawaitable(coro_or_future):
664
async def _wrap_awaitable(awaitable):
665
return await awaitable
666
667
coro_or_future = _wrap_awaitable(coro_or_future)
668
should_close = False
669
else:
670
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
671
'is required')
672
673
if loop is None:
674
loop = events.get_event_loop()
675
try:
676
return loop.create_task(coro_or_future)
677
except RuntimeError:
678
if should_close:
679
coro_or_future.close()
680
raise
681
682
683
class _GatheringFuture(futures.Future):
684
"""Helper for gather().
685
686
This overrides cancel() to cancel all the children and act more
687
like Task.cancel(), which doesn't immediately mark itself as
688
cancelled.
689
"""
690
691
def __init__(self, children, *, loop):
692
assert loop is not None
693
super().__init__(loop=loop)
694
self._children = children
695
self._cancel_requested = False
696
697
def cancel(self, msg=None):
698
if self.done():
699
return False
700
ret = False
701
for child in self._children:
702
if child.cancel(msg=msg):
703
ret = True
704
if ret:
705
# If any child tasks were actually cancelled, we should
706
# propagate the cancellation request regardless of
707
# *return_exceptions* argument. See issue 32684.
708
self._cancel_requested = True
709
return ret
710
711
712
def gather(*coros_or_futures, return_exceptions=False):
713
"""Return a future aggregating results from the given coroutines/futures.
714
715
Coroutines will be wrapped in a future and scheduled in the event
716
loop. They will not necessarily be scheduled in the same order as
717
passed in.
718
719
All futures must share the same event loop. If all the tasks are
720
done successfully, the returned future's result is the list of
721
results (in the order of the original sequence, not necessarily
722
the order of results arrival). If *return_exceptions* is True,
723
exceptions in the tasks are treated the same as successful
724
results, and gathered in the result list; otherwise, the first
725
raised exception will be immediately propagated to the returned
726
future.
727
728
Cancellation: if the outer Future is cancelled, all children (that
729
have not completed yet) are also cancelled. If any child is
730
cancelled, this is treated as if it raised CancelledError --
731
the outer Future is *not* cancelled in this case. (This is to
732
prevent the cancellation of one child to cause other children to
733
be cancelled.)
734
735
If *return_exceptions* is False, cancelling gather() after it
736
has been marked done won't cancel any submitted awaitables.
737
For instance, gather can be marked done after propagating an
738
exception to the caller, therefore, calling ``gather.cancel()``
739
after catching an exception (raised by one of the awaitables) from
740
gather won't cancel any other awaitables.
741
"""
742
if not coros_or_futures:
743
loop = events.get_event_loop()
744
outer = loop.create_future()
745
outer.set_result([])
746
return outer
747
748
def _done_callback(fut):
749
nonlocal nfinished
750
nfinished += 1
751
752
if outer is None or outer.done():
753
if not fut.cancelled():
754
# Mark exception retrieved.
755
fut.exception()
756
return
757
758
if not return_exceptions:
759
if fut.cancelled():
760
# Check if 'fut' is cancelled first, as
761
# 'fut.exception()' will *raise* a CancelledError
762
# instead of returning it.
763
exc = fut._make_cancelled_error()
764
outer.set_exception(exc)
765
return
766
else:
767
exc = fut.exception()
768
if exc is not None:
769
outer.set_exception(exc)
770
return
771
772
if nfinished == nfuts:
773
# All futures are done; create a list of results
774
# and set it to the 'outer' future.
775
results = []
776
777
for fut in children:
778
if fut.cancelled():
779
# Check if 'fut' is cancelled first, as 'fut.exception()'
780
# will *raise* a CancelledError instead of returning it.
781
# Also, since we're adding the exception return value
782
# to 'results' instead of raising it, don't bother
783
# setting __context__. This also lets us preserve
784
# calling '_make_cancelled_error()' at most once.
785
res = exceptions.CancelledError(
786
'' if fut._cancel_message is None else
787
fut._cancel_message)
788
else:
789
res = fut.exception()
790
if res is None:
791
res = fut.result()
792
results.append(res)
793
794
if outer._cancel_requested:
795
# If gather is being cancelled we must propagate the
796
# cancellation regardless of *return_exceptions* argument.
797
# See issue 32684.
798
exc = fut._make_cancelled_error()
799
outer.set_exception(exc)
800
else:
801
outer.set_result(results)
802
803
arg_to_fut = {}
804
children = []
805
nfuts = 0
806
nfinished = 0
807
done_futs = []
808
loop = None
809
outer = None # bpo-46672
810
for arg in coros_or_futures:
811
if arg not in arg_to_fut:
812
fut = ensure_future(arg, loop=loop)
813
if loop is None:
814
loop = futures._get_loop(fut)
815
if fut is not arg:
816
# 'arg' was not a Future, therefore, 'fut' is a new
817
# Future created specifically for 'arg'. Since the caller
818
# can't control it, disable the "destroy pending task"
819
# warning.
820
fut._log_destroy_pending = False
821
822
nfuts += 1
823
arg_to_fut[arg] = fut
824
if fut.done():
825
done_futs.append(fut)
826
else:
827
fut.add_done_callback(_done_callback)
828
829
else:
830
# There's a duplicate Future object in coros_or_futures.
831
fut = arg_to_fut[arg]
832
833
children.append(fut)
834
835
outer = _GatheringFuture(children, loop=loop)
836
# Run done callbacks after GatheringFuture created so any post-processing
837
# can be performed at this point
838
# optimization: in the special case that *all* futures finished eagerly,
839
# this will effectively complete the gather eagerly, with the last
840
# callback setting the result (or exception) on outer before returning it
841
for fut in done_futs:
842
_done_callback(fut)
843
return outer
844
845
846
def shield(arg):
847
"""Wait for a future, shielding it from cancellation.
848
849
The statement
850
851
task = asyncio.create_task(something())
852
res = await shield(task)
853
854
is exactly equivalent to the statement
855
856
res = await something()
857
858
*except* that if the coroutine containing it is cancelled, the
859
task running in something() is not cancelled. From the POV of
860
something(), the cancellation did not happen. But its caller is
861
still cancelled, so the yield-from expression still raises
862
CancelledError. Note: If something() is cancelled by other means
863
this will still cancel shield().
864
865
If you want to completely ignore cancellation (not recommended)
866
you can combine shield() with a try/except clause, as follows:
867
868
task = asyncio.create_task(something())
869
try:
870
res = await shield(task)
871
except CancelledError:
872
res = None
873
874
Save a reference to tasks passed to this function, to avoid
875
a task disappearing mid-execution. The event loop only keeps
876
weak references to tasks. A task that isn't referenced elsewhere
877
may get garbage collected at any time, even before it's done.
878
"""
879
inner = ensure_future(arg)
880
if inner.done():
881
# Shortcut.
882
return inner
883
loop = futures._get_loop(inner)
884
outer = loop.create_future()
885
886
def _inner_done_callback(inner):
887
if outer.cancelled():
888
if not inner.cancelled():
889
# Mark inner's result as retrieved.
890
inner.exception()
891
return
892
893
if inner.cancelled():
894
outer.cancel()
895
else:
896
exc = inner.exception()
897
if exc is not None:
898
outer.set_exception(exc)
899
else:
900
outer.set_result(inner.result())
901
902
903
def _outer_done_callback(outer):
904
if not inner.done():
905
inner.remove_done_callback(_inner_done_callback)
906
907
inner.add_done_callback(_inner_done_callback)
908
outer.add_done_callback(_outer_done_callback)
909
return outer
910
911
912
def run_coroutine_threadsafe(coro, loop):
913
"""Submit a coroutine object to a given event loop.
914
915
Return a concurrent.futures.Future to access the result.
916
"""
917
if not coroutines.iscoroutine(coro):
918
raise TypeError('A coroutine object is required')
919
future = concurrent.futures.Future()
920
921
def callback():
922
try:
923
futures._chain_future(ensure_future(coro, loop=loop), future)
924
except (SystemExit, KeyboardInterrupt):
925
raise
926
except BaseException as exc:
927
if future.set_running_or_notify_cancel():
928
future.set_exception(exc)
929
raise
930
931
loop.call_soon_threadsafe(callback)
932
return future
933
934
935
def create_eager_task_factory(custom_task_constructor):
936
"""Create a function suitable for use as a task factory on an event-loop.
937
938
Example usage:
939
940
loop.set_task_factory(
941
asyncio.create_eager_task_factory(my_task_constructor))
942
943
Now, tasks created will be started immediately (rather than being first
944
scheduled to an event loop). The constructor argument can be any callable
945
that returns a Task-compatible object and has a signature compatible
946
with `Task.__init__`; it must have the `eager_start` keyword argument.
947
948
Most applications will use `Task` for `custom_task_constructor` and in
949
this case there's no need to call `create_eager_task_factory()`
950
directly. Instead the global `eager_task_factory` instance can be
951
used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
952
"""
953
954
def factory(loop, coro, *, name=None, context=None):
955
return custom_task_constructor(
956
coro, loop=loop, name=name, context=context, eager_start=True)
957
958
return factory
959
960
961
eager_task_factory = create_eager_task_factory(Task)
962
963
964
# Collectively these two sets hold references to the complete set of active
965
# tasks. Eagerly executed tasks use a faster regular set as an optimization
966
# but may graduate to a WeakSet if the task blocks on IO.
967
_scheduled_tasks = weakref.WeakSet()
968
_eager_tasks = set()
969
970
# Dictionary containing tasks that are currently active in
971
# all running event loops. {EventLoop: Task}
972
_current_tasks = {}
973
974
975
def _register_task(task):
976
"""Register an asyncio Task scheduled to run on an event loop."""
977
_scheduled_tasks.add(task)
978
979
980
def _register_eager_task(task):
981
"""Register an asyncio Task about to be eagerly executed."""
982
_eager_tasks.add(task)
983
984
985
def _enter_task(loop, task):
986
current_task = _current_tasks.get(loop)
987
if current_task is not None:
988
raise RuntimeError(f"Cannot enter into task {task!r} while another "
989
f"task {current_task!r} is being executed.")
990
_current_tasks[loop] = task
991
992
993
def _leave_task(loop, task):
994
current_task = _current_tasks.get(loop)
995
if current_task is not task:
996
raise RuntimeError(f"Leaving task {task!r} does not match "
997
f"the current task {current_task!r}.")
998
del _current_tasks[loop]
999
1000
1001
def _swap_current_task(loop, task):
1002
prev_task = _current_tasks.get(loop)
1003
if task is None:
1004
del _current_tasks[loop]
1005
else:
1006
_current_tasks[loop] = task
1007
return prev_task
1008
1009
1010
def _unregister_task(task):
1011
"""Unregister a completed, scheduled Task."""
1012
_scheduled_tasks.discard(task)
1013
1014
1015
def _unregister_eager_task(task):
1016
"""Unregister a task which finished its first eager step."""
1017
_eager_tasks.discard(task)
1018
1019
1020
_py_current_task = current_task
1021
_py_register_task = _register_task
1022
_py_register_eager_task = _register_eager_task
1023
_py_unregister_task = _unregister_task
1024
_py_unregister_eager_task = _unregister_eager_task
1025
_py_enter_task = _enter_task
1026
_py_leave_task = _leave_task
1027
_py_swap_current_task = _swap_current_task
1028
1029
1030
try:
1031
from _asyncio import (_register_task, _register_eager_task,
1032
_unregister_task, _unregister_eager_task,
1033
_enter_task, _leave_task, _swap_current_task,
1034
_scheduled_tasks, _eager_tasks, _current_tasks,
1035
current_task)
1036
except ImportError:
1037
pass
1038
else:
1039
_c_current_task = current_task
1040
_c_register_task = _register_task
1041
_c_register_eager_task = _register_eager_task
1042
_c_unregister_task = _unregister_task
1043
_c_unregister_eager_task = _unregister_eager_task
1044
_c_enter_task = _enter_task
1045
_c_leave_task = _leave_task
1046
_c_swap_current_task = _swap_current_task
1047
1048