Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/concurrent/futures/_base.py
12 views
1
# Copyright 2009 Brian Quinlan. All Rights Reserved.
2
# Licensed to PSF under a Contributor Agreement.
3
4
__author__ = 'Brian Quinlan ([email protected])'
5
6
import collections
7
import logging
8
import threading
9
import time
10
import types
11
12
FIRST_COMPLETED = 'FIRST_COMPLETED'
13
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14
ALL_COMPLETED = 'ALL_COMPLETED'
15
_AS_COMPLETED = '_AS_COMPLETED'
16
17
# Possible future states (for internal use by the futures package).
18
PENDING = 'PENDING'
19
RUNNING = 'RUNNING'
20
# The future was cancelled by the user...
21
CANCELLED = 'CANCELLED'
22
# ...and _Waiter.add_cancelled() was called by a worker.
23
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24
FINISHED = 'FINISHED'
25
26
_FUTURE_STATES = [
27
PENDING,
28
RUNNING,
29
CANCELLED,
30
CANCELLED_AND_NOTIFIED,
31
FINISHED
32
]
33
34
_STATE_TO_DESCRIPTION_MAP = {
35
PENDING: "pending",
36
RUNNING: "running",
37
CANCELLED: "cancelled",
38
CANCELLED_AND_NOTIFIED: "cancelled",
39
FINISHED: "finished"
40
}
41
42
# Logger for internal use by the futures package.
43
LOGGER = logging.getLogger("concurrent.futures")
44
45
class Error(Exception):
46
"""Base class for all future-related exceptions."""
47
pass
48
49
class CancelledError(Error):
50
"""The Future was cancelled."""
51
pass
52
53
TimeoutError = TimeoutError # make local alias for the standard exception
54
55
class InvalidStateError(Error):
56
"""The operation is not allowed in this state."""
57
pass
58
59
class _Waiter(object):
60
"""Provides the event that wait() and as_completed() block on."""
61
def __init__(self):
62
self.event = threading.Event()
63
self.finished_futures = []
64
65
def add_result(self, future):
66
self.finished_futures.append(future)
67
68
def add_exception(self, future):
69
self.finished_futures.append(future)
70
71
def add_cancelled(self, future):
72
self.finished_futures.append(future)
73
74
class _AsCompletedWaiter(_Waiter):
75
"""Used by as_completed()."""
76
77
def __init__(self):
78
super(_AsCompletedWaiter, self).__init__()
79
self.lock = threading.Lock()
80
81
def add_result(self, future):
82
with self.lock:
83
super(_AsCompletedWaiter, self).add_result(future)
84
self.event.set()
85
86
def add_exception(self, future):
87
with self.lock:
88
super(_AsCompletedWaiter, self).add_exception(future)
89
self.event.set()
90
91
def add_cancelled(self, future):
92
with self.lock:
93
super(_AsCompletedWaiter, self).add_cancelled(future)
94
self.event.set()
95
96
class _FirstCompletedWaiter(_Waiter):
97
"""Used by wait(return_when=FIRST_COMPLETED)."""
98
99
def add_result(self, future):
100
super().add_result(future)
101
self.event.set()
102
103
def add_exception(self, future):
104
super().add_exception(future)
105
self.event.set()
106
107
def add_cancelled(self, future):
108
super().add_cancelled(future)
109
self.event.set()
110
111
class _AllCompletedWaiter(_Waiter):
112
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
113
114
def __init__(self, num_pending_calls, stop_on_exception):
115
self.num_pending_calls = num_pending_calls
116
self.stop_on_exception = stop_on_exception
117
self.lock = threading.Lock()
118
super().__init__()
119
120
def _decrement_pending_calls(self):
121
with self.lock:
122
self.num_pending_calls -= 1
123
if not self.num_pending_calls:
124
self.event.set()
125
126
def add_result(self, future):
127
super().add_result(future)
128
self._decrement_pending_calls()
129
130
def add_exception(self, future):
131
super().add_exception(future)
132
if self.stop_on_exception:
133
self.event.set()
134
else:
135
self._decrement_pending_calls()
136
137
def add_cancelled(self, future):
138
super().add_cancelled(future)
139
self._decrement_pending_calls()
140
141
class _AcquireFutures(object):
142
"""A context manager that does an ordered acquire of Future conditions."""
143
144
def __init__(self, futures):
145
self.futures = sorted(futures, key=id)
146
147
def __enter__(self):
148
for future in self.futures:
149
future._condition.acquire()
150
151
def __exit__(self, *args):
152
for future in self.futures:
153
future._condition.release()
154
155
def _create_and_install_waiters(fs, return_when):
156
if return_when == _AS_COMPLETED:
157
waiter = _AsCompletedWaiter()
158
elif return_when == FIRST_COMPLETED:
159
waiter = _FirstCompletedWaiter()
160
else:
161
pending_count = sum(
162
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
163
164
if return_when == FIRST_EXCEPTION:
165
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
166
elif return_when == ALL_COMPLETED:
167
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
168
else:
169
raise ValueError("Invalid return condition: %r" % return_when)
170
171
for f in fs:
172
f._waiters.append(waiter)
173
174
return waiter
175
176
177
def _yield_finished_futures(fs, waiter, ref_collect):
178
"""
179
Iterate on the list *fs*, yielding finished futures one by one in
180
reverse order.
181
Before yielding a future, *waiter* is removed from its waiters
182
and the future is removed from each set in the collection of sets
183
*ref_collect*.
184
185
The aim of this function is to avoid keeping stale references after
186
the future is yielded and before the iterator resumes.
187
"""
188
while fs:
189
f = fs[-1]
190
for futures_set in ref_collect:
191
futures_set.remove(f)
192
with f._condition:
193
f._waiters.remove(waiter)
194
del f
195
# Careful not to keep a reference to the popped value
196
yield fs.pop()
197
198
199
def as_completed(fs, timeout=None):
200
"""An iterator over the given futures that yields each as it completes.
201
202
Args:
203
fs: The sequence of Futures (possibly created by different Executors) to
204
iterate over.
205
timeout: The maximum number of seconds to wait. If None, then there
206
is no limit on the wait time.
207
208
Returns:
209
An iterator that yields the given Futures as they complete (finished or
210
cancelled). If any given Futures are duplicated, they will be returned
211
once.
212
213
Raises:
214
TimeoutError: If the entire result iterator could not be generated
215
before the given timeout.
216
"""
217
if timeout is not None:
218
end_time = timeout + time.monotonic()
219
220
fs = set(fs)
221
total_futures = len(fs)
222
with _AcquireFutures(fs):
223
finished = set(
224
f for f in fs
225
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
226
pending = fs - finished
227
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
228
finished = list(finished)
229
try:
230
yield from _yield_finished_futures(finished, waiter,
231
ref_collect=(fs,))
232
233
while pending:
234
if timeout is None:
235
wait_timeout = None
236
else:
237
wait_timeout = end_time - time.monotonic()
238
if wait_timeout < 0:
239
raise TimeoutError(
240
'%d (of %d) futures unfinished' % (
241
len(pending), total_futures))
242
243
waiter.event.wait(wait_timeout)
244
245
with waiter.lock:
246
finished = waiter.finished_futures
247
waiter.finished_futures = []
248
waiter.event.clear()
249
250
# reverse to keep finishing order
251
finished.reverse()
252
yield from _yield_finished_futures(finished, waiter,
253
ref_collect=(fs, pending))
254
255
finally:
256
# Remove waiter from unfinished futures
257
for f in fs:
258
with f._condition:
259
f._waiters.remove(waiter)
260
261
DoneAndNotDoneFutures = collections.namedtuple(
262
'DoneAndNotDoneFutures', 'done not_done')
263
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
264
"""Wait for the futures in the given sequence to complete.
265
266
Args:
267
fs: The sequence of Futures (possibly created by different Executors) to
268
wait upon.
269
timeout: The maximum number of seconds to wait. If None, then there
270
is no limit on the wait time.
271
return_when: Indicates when this function should return. The options
272
are:
273
274
FIRST_COMPLETED - Return when any future finishes or is
275
cancelled.
276
FIRST_EXCEPTION - Return when any future finishes by raising an
277
exception. If no future raises an exception
278
then it is equivalent to ALL_COMPLETED.
279
ALL_COMPLETED - Return when all futures finish or are cancelled.
280
281
Returns:
282
A named 2-tuple of sets. The first set, named 'done', contains the
283
futures that completed (is finished or cancelled) before the wait
284
completed. The second set, named 'not_done', contains uncompleted
285
futures. Duplicate futures given to *fs* are removed and will be
286
returned only once.
287
"""
288
fs = set(fs)
289
with _AcquireFutures(fs):
290
done = {f for f in fs
291
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
292
not_done = fs - done
293
if (return_when == FIRST_COMPLETED) and done:
294
return DoneAndNotDoneFutures(done, not_done)
295
elif (return_when == FIRST_EXCEPTION) and done:
296
if any(f for f in done
297
if not f.cancelled() and f.exception() is not None):
298
return DoneAndNotDoneFutures(done, not_done)
299
300
if len(done) == len(fs):
301
return DoneAndNotDoneFutures(done, not_done)
302
303
waiter = _create_and_install_waiters(fs, return_when)
304
305
waiter.event.wait(timeout)
306
for f in fs:
307
with f._condition:
308
f._waiters.remove(waiter)
309
310
done.update(waiter.finished_futures)
311
return DoneAndNotDoneFutures(done, fs - done)
312
313
314
def _result_or_cancel(fut, timeout=None):
315
try:
316
try:
317
return fut.result(timeout)
318
finally:
319
fut.cancel()
320
finally:
321
# Break a reference cycle with the exception in self._exception
322
del fut
323
324
325
class Future(object):
326
"""Represents the result of an asynchronous computation."""
327
328
def __init__(self):
329
"""Initializes the future. Should not be called by clients."""
330
self._condition = threading.Condition()
331
self._state = PENDING
332
self._result = None
333
self._exception = None
334
self._waiters = []
335
self._done_callbacks = []
336
337
def _invoke_callbacks(self):
338
for callback in self._done_callbacks:
339
try:
340
callback(self)
341
except Exception:
342
LOGGER.exception('exception calling callback for %r', self)
343
344
def __repr__(self):
345
with self._condition:
346
if self._state == FINISHED:
347
if self._exception:
348
return '<%s at %#x state=%s raised %s>' % (
349
self.__class__.__name__,
350
id(self),
351
_STATE_TO_DESCRIPTION_MAP[self._state],
352
self._exception.__class__.__name__)
353
else:
354
return '<%s at %#x state=%s returned %s>' % (
355
self.__class__.__name__,
356
id(self),
357
_STATE_TO_DESCRIPTION_MAP[self._state],
358
self._result.__class__.__name__)
359
return '<%s at %#x state=%s>' % (
360
self.__class__.__name__,
361
id(self),
362
_STATE_TO_DESCRIPTION_MAP[self._state])
363
364
def cancel(self):
365
"""Cancel the future if possible.
366
367
Returns True if the future was cancelled, False otherwise. A future
368
cannot be cancelled if it is running or has already completed.
369
"""
370
with self._condition:
371
if self._state in [RUNNING, FINISHED]:
372
return False
373
374
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
375
return True
376
377
self._state = CANCELLED
378
self._condition.notify_all()
379
380
self._invoke_callbacks()
381
return True
382
383
def cancelled(self):
384
"""Return True if the future was cancelled."""
385
with self._condition:
386
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
387
388
def running(self):
389
"""Return True if the future is currently executing."""
390
with self._condition:
391
return self._state == RUNNING
392
393
def done(self):
394
"""Return True if the future was cancelled or finished executing."""
395
with self._condition:
396
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
397
398
def __get_result(self):
399
if self._exception:
400
try:
401
raise self._exception
402
finally:
403
# Break a reference cycle with the exception in self._exception
404
self = None
405
else:
406
return self._result
407
408
def add_done_callback(self, fn):
409
"""Attaches a callable that will be called when the future finishes.
410
411
Args:
412
fn: A callable that will be called with this future as its only
413
argument when the future completes or is cancelled. The callable
414
will always be called by a thread in the same process in which
415
it was added. If the future has already completed or been
416
cancelled then the callable will be called immediately. These
417
callables are called in the order that they were added.
418
"""
419
with self._condition:
420
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
421
self._done_callbacks.append(fn)
422
return
423
try:
424
fn(self)
425
except Exception:
426
LOGGER.exception('exception calling callback for %r', self)
427
428
def result(self, timeout=None):
429
"""Return the result of the call that the future represents.
430
431
Args:
432
timeout: The number of seconds to wait for the result if the future
433
isn't done. If None, then there is no limit on the wait time.
434
435
Returns:
436
The result of the call that the future represents.
437
438
Raises:
439
CancelledError: If the future was cancelled.
440
TimeoutError: If the future didn't finish executing before the given
441
timeout.
442
Exception: If the call raised then that exception will be raised.
443
"""
444
try:
445
with self._condition:
446
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
447
raise CancelledError()
448
elif self._state == FINISHED:
449
return self.__get_result()
450
451
self._condition.wait(timeout)
452
453
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
454
raise CancelledError()
455
elif self._state == FINISHED:
456
return self.__get_result()
457
else:
458
raise TimeoutError()
459
finally:
460
# Break a reference cycle with the exception in self._exception
461
self = None
462
463
def exception(self, timeout=None):
464
"""Return the exception raised by the call that the future represents.
465
466
Args:
467
timeout: The number of seconds to wait for the exception if the
468
future isn't done. If None, then there is no limit on the wait
469
time.
470
471
Returns:
472
The exception raised by the call that the future represents or None
473
if the call completed without raising.
474
475
Raises:
476
CancelledError: If the future was cancelled.
477
TimeoutError: If the future didn't finish executing before the given
478
timeout.
479
"""
480
481
with self._condition:
482
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
483
raise CancelledError()
484
elif self._state == FINISHED:
485
return self._exception
486
487
self._condition.wait(timeout)
488
489
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
490
raise CancelledError()
491
elif self._state == FINISHED:
492
return self._exception
493
else:
494
raise TimeoutError()
495
496
# The following methods should only be used by Executors and in tests.
497
def set_running_or_notify_cancel(self):
498
"""Mark the future as running or process any cancel notifications.
499
500
Should only be used by Executor implementations and unit tests.
501
502
If the future has been cancelled (cancel() was called and returned
503
True) then any threads waiting on the future completing (though calls
504
to as_completed() or wait()) are notified and False is returned.
505
506
If the future was not cancelled then it is put in the running state
507
(future calls to running() will return True) and True is returned.
508
509
This method should be called by Executor implementations before
510
executing the work associated with this future. If this method returns
511
False then the work should not be executed.
512
513
Returns:
514
False if the Future was cancelled, True otherwise.
515
516
Raises:
517
RuntimeError: if this method was already called or if set_result()
518
or set_exception() was called.
519
"""
520
with self._condition:
521
if self._state == CANCELLED:
522
self._state = CANCELLED_AND_NOTIFIED
523
for waiter in self._waiters:
524
waiter.add_cancelled(self)
525
# self._condition.notify_all() is not necessary because
526
# self.cancel() triggers a notification.
527
return False
528
elif self._state == PENDING:
529
self._state = RUNNING
530
return True
531
else:
532
LOGGER.critical('Future %s in unexpected state: %s',
533
id(self),
534
self._state)
535
raise RuntimeError('Future in unexpected state')
536
537
def set_result(self, result):
538
"""Sets the return value of work associated with the future.
539
540
Should only be used by Executor implementations and unit tests.
541
"""
542
with self._condition:
543
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
544
raise InvalidStateError('{}: {!r}'.format(self._state, self))
545
self._result = result
546
self._state = FINISHED
547
for waiter in self._waiters:
548
waiter.add_result(self)
549
self._condition.notify_all()
550
self._invoke_callbacks()
551
552
def set_exception(self, exception):
553
"""Sets the result of the future as being the given exception.
554
555
Should only be used by Executor implementations and unit tests.
556
"""
557
with self._condition:
558
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
559
raise InvalidStateError('{}: {!r}'.format(self._state, self))
560
self._exception = exception
561
self._state = FINISHED
562
for waiter in self._waiters:
563
waiter.add_exception(self)
564
self._condition.notify_all()
565
self._invoke_callbacks()
566
567
__class_getitem__ = classmethod(types.GenericAlias)
568
569
class Executor(object):
570
"""This is an abstract base class for concrete asynchronous executors."""
571
572
def submit(self, fn, /, *args, **kwargs):
573
"""Submits a callable to be executed with the given arguments.
574
575
Schedules the callable to be executed as fn(*args, **kwargs) and returns
576
a Future instance representing the execution of the callable.
577
578
Returns:
579
A Future representing the given call.
580
"""
581
raise NotImplementedError()
582
583
def map(self, fn, *iterables, timeout=None, chunksize=1):
584
"""Returns an iterator equivalent to map(fn, iter).
585
586
Args:
587
fn: A callable that will take as many arguments as there are
588
passed iterables.
589
timeout: The maximum number of seconds to wait. If None, then there
590
is no limit on the wait time.
591
chunksize: The size of the chunks the iterable will be broken into
592
before being passed to a child process. This argument is only
593
used by ProcessPoolExecutor; it is ignored by
594
ThreadPoolExecutor.
595
596
Returns:
597
An iterator equivalent to: map(func, *iterables) but the calls may
598
be evaluated out-of-order.
599
600
Raises:
601
TimeoutError: If the entire result iterator could not be generated
602
before the given timeout.
603
Exception: If fn(*args) raises for any values.
604
"""
605
if timeout is not None:
606
end_time = timeout + time.monotonic()
607
608
fs = [self.submit(fn, *args) for args in zip(*iterables)]
609
610
# Yield must be hidden in closure so that the futures are submitted
611
# before the first iterator value is required.
612
def result_iterator():
613
try:
614
# reverse to keep finishing order
615
fs.reverse()
616
while fs:
617
# Careful not to keep a reference to the popped future
618
if timeout is None:
619
yield _result_or_cancel(fs.pop())
620
else:
621
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
622
finally:
623
for future in fs:
624
future.cancel()
625
return result_iterator()
626
627
def shutdown(self, wait=True, *, cancel_futures=False):
628
"""Clean-up the resources associated with the Executor.
629
630
It is safe to call this method several times. Otherwise, no other
631
methods can be called after this one.
632
633
Args:
634
wait: If True then shutdown will not return until all running
635
futures have finished executing and the resources used by the
636
executor have been reclaimed.
637
cancel_futures: If True then shutdown will cancel all pending
638
futures. Futures that are completed or running will not be
639
cancelled.
640
"""
641
pass
642
643
def __enter__(self):
644
return self
645
646
def __exit__(self, exc_type, exc_val, exc_tb):
647
self.shutdown(wait=True)
648
return False
649
650
651
class BrokenExecutor(RuntimeError):
652
"""
653
Raised when a executor has become non-functional after a severe failure.
654
"""
655
656