Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/concurrent/futures/process.py
12 views
1
# Copyright 2009 Brian Quinlan. All Rights Reserved.
2
# Licensed to PSF under a Contributor Agreement.
3
4
"""Implements ProcessPoolExecutor.
5
6
The following diagram and text describe the data-flow through the system:
7
8
|======================= In-process =====================|== Out-of-process ==|
9
10
+----------+ +----------+ +--------+ +-----------+ +---------+
11
| | => | Work Ids | | | | Call Q | | Process |
12
| | +----------+ | | +-----------+ | Pool |
13
| | | ... | | | | ... | +---------+
14
| | | 6 | => | | => | 5, call() | => | |
15
| | | 7 | | | | ... | | |
16
| Process | | ... | | Local | +-----------+ | Process |
17
| Pool | +----------+ | Worker | | #1..n |
18
| Executor | | Thread | | |
19
| | +----------- + | | +-----------+ | |
20
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21
| | +------------+ | | +-----------+ | |
22
| | | 6: call() | | | | ... | | |
23
| | | future | | | | 4, result | | |
24
| | | ... | | | | 3, except | | |
25
+----------+ +------------+ +--------+ +-----------+ +---------+
26
27
Executor.submit() called:
28
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29
- adds the id of the _WorkItem to the "Work Ids" queue
30
31
Local worker thread:
32
- reads work ids from the "Work Ids" queue and looks up the corresponding
33
WorkItem from the "Work Items" dict: if the work item has been cancelled then
34
it is simply removed from the dict, otherwise it is repackaged as a
35
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38
- reads _ResultItems from "Result Q", updates the future stored in the
39
"Work Items" dict and deletes the dict entry
40
41
Process #1..n:
42
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43
_ResultItems in "Result Q"
44
"""
45
46
__author__ = 'Brian Quinlan ([email protected])'
47
48
import os
49
from concurrent.futures import _base
50
import queue
51
import multiprocessing as mp
52
# This import is required to load the multiprocessing.connection submodule
53
# so that it can be accessed later as `mp.connection`
54
import multiprocessing.connection
55
from multiprocessing.queues import Queue
56
import threading
57
import weakref
58
from functools import partial
59
import itertools
60
import sys
61
from traceback import format_exception
62
63
64
_threads_wakeups = weakref.WeakKeyDictionary()
65
_global_shutdown = False
66
67
68
class _ThreadWakeup:
69
def __init__(self):
70
self._closed = False
71
self._reader, self._writer = mp.Pipe(duplex=False)
72
73
def close(self):
74
if not self._closed:
75
self._closed = True
76
self._writer.close()
77
self._reader.close()
78
79
def wakeup(self):
80
if not self._closed:
81
self._writer.send_bytes(b"")
82
83
def clear(self):
84
if not self._closed:
85
while self._reader.poll():
86
self._reader.recv_bytes()
87
88
89
def _python_exit():
90
global _global_shutdown
91
_global_shutdown = True
92
items = list(_threads_wakeups.items())
93
for _, thread_wakeup in items:
94
# call not protected by ProcessPoolExecutor._shutdown_lock
95
thread_wakeup.wakeup()
96
for t, _ in items:
97
t.join()
98
99
# Register for `_python_exit()` to be called just before joining all
100
# non-daemon threads. This is used instead of `atexit.register()` for
101
# compatibility with subinterpreters, which no longer support daemon threads.
102
# See bpo-39812 for context.
103
threading._register_atexit(_python_exit)
104
105
# Controls how many more calls than processes will be queued in the call queue.
106
# A smaller number will mean that processes spend more time idle waiting for
107
# work while a larger number will make Future.cancel() succeed less frequently
108
# (Futures in the call queue cannot be cancelled).
109
EXTRA_QUEUED_CALLS = 1
110
111
112
# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
113
# It can wait on, at most, 63 objects. There is an overhead of two objects:
114
# - the result queue reader
115
# - the thread wakeup reader
116
_MAX_WINDOWS_WORKERS = 63 - 2
117
118
# Hack to embed stringification of remote traceback in local traceback
119
120
class _RemoteTraceback(Exception):
121
def __init__(self, tb):
122
self.tb = tb
123
def __str__(self):
124
return self.tb
125
126
class _ExceptionWithTraceback:
127
def __init__(self, exc, tb):
128
tb = ''.join(format_exception(type(exc), exc, tb))
129
self.exc = exc
130
# Traceback object needs to be garbage-collected as its frames
131
# contain references to all the objects in the exception scope
132
self.exc.__traceback__ = None
133
self.tb = '\n"""\n%s"""' % tb
134
def __reduce__(self):
135
return _rebuild_exc, (self.exc, self.tb)
136
137
def _rebuild_exc(exc, tb):
138
exc.__cause__ = _RemoteTraceback(tb)
139
return exc
140
141
class _WorkItem(object):
142
def __init__(self, future, fn, args, kwargs):
143
self.future = future
144
self.fn = fn
145
self.args = args
146
self.kwargs = kwargs
147
148
class _ResultItem(object):
149
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
150
self.work_id = work_id
151
self.exception = exception
152
self.result = result
153
self.exit_pid = exit_pid
154
155
class _CallItem(object):
156
def __init__(self, work_id, fn, args, kwargs):
157
self.work_id = work_id
158
self.fn = fn
159
self.args = args
160
self.kwargs = kwargs
161
162
163
class _SafeQueue(Queue):
164
"""Safe Queue set exception to the future object linked to a job"""
165
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
166
thread_wakeup):
167
self.pending_work_items = pending_work_items
168
self.shutdown_lock = shutdown_lock
169
self.thread_wakeup = thread_wakeup
170
super().__init__(max_size, ctx=ctx)
171
172
def _on_queue_feeder_error(self, e, obj):
173
if isinstance(obj, _CallItem):
174
tb = format_exception(type(e), e, e.__traceback__)
175
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
176
work_item = self.pending_work_items.pop(obj.work_id, None)
177
with self.shutdown_lock:
178
self.thread_wakeup.wakeup()
179
# work_item can be None if another process terminated. In this
180
# case, the executor_manager_thread fails all work_items
181
# with BrokenProcessPool
182
if work_item is not None:
183
work_item.future.set_exception(e)
184
else:
185
super()._on_queue_feeder_error(e, obj)
186
187
188
def _get_chunks(*iterables, chunksize):
189
""" Iterates over zip()ed iterables in chunks. """
190
it = zip(*iterables)
191
while True:
192
chunk = tuple(itertools.islice(it, chunksize))
193
if not chunk:
194
return
195
yield chunk
196
197
198
def _process_chunk(fn, chunk):
199
""" Processes a chunk of an iterable passed to map.
200
201
Runs the function passed to map() on a chunk of the
202
iterable passed to map.
203
204
This function is run in a separate process.
205
206
"""
207
return [fn(*args) for args in chunk]
208
209
210
def _sendback_result(result_queue, work_id, result=None, exception=None,
211
exit_pid=None):
212
"""Safely send back the given result or exception"""
213
try:
214
result_queue.put(_ResultItem(work_id, result=result,
215
exception=exception, exit_pid=exit_pid))
216
except BaseException as e:
217
exc = _ExceptionWithTraceback(e, e.__traceback__)
218
result_queue.put(_ResultItem(work_id, exception=exc,
219
exit_pid=exit_pid))
220
221
222
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
223
"""Evaluates calls from call_queue and places the results in result_queue.
224
225
This worker is run in a separate process.
226
227
Args:
228
call_queue: A ctx.Queue of _CallItems that will be read and
229
evaluated by the worker.
230
result_queue: A ctx.Queue of _ResultItems that will written
231
to by the worker.
232
initializer: A callable initializer, or None
233
initargs: A tuple of args for the initializer
234
"""
235
if initializer is not None:
236
try:
237
initializer(*initargs)
238
except BaseException:
239
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
240
# The parent will notice that the process stopped and
241
# mark the pool broken
242
return
243
num_tasks = 0
244
exit_pid = None
245
while True:
246
call_item = call_queue.get(block=True)
247
if call_item is None:
248
# Wake up queue management thread
249
result_queue.put(os.getpid())
250
return
251
252
if max_tasks is not None:
253
num_tasks += 1
254
if num_tasks >= max_tasks:
255
exit_pid = os.getpid()
256
257
try:
258
r = call_item.fn(*call_item.args, **call_item.kwargs)
259
except BaseException as e:
260
exc = _ExceptionWithTraceback(e, e.__traceback__)
261
_sendback_result(result_queue, call_item.work_id, exception=exc,
262
exit_pid=exit_pid)
263
else:
264
_sendback_result(result_queue, call_item.work_id, result=r,
265
exit_pid=exit_pid)
266
del r
267
268
# Liberate the resource as soon as possible, to avoid holding onto
269
# open files or shared memory that is not needed anymore
270
del call_item
271
272
if exit_pid is not None:
273
return
274
275
276
class _ExecutorManagerThread(threading.Thread):
277
"""Manages the communication between this process and the worker processes.
278
279
The manager is run in a local thread.
280
281
Args:
282
executor: A reference to the ProcessPoolExecutor that owns
283
this thread. A weakref will be own by the manager as well as
284
references to internal objects used to introspect the state of
285
the executor.
286
"""
287
288
def __init__(self, executor):
289
# Store references to necessary internals of the executor.
290
291
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
292
# main Thread and avoid deadlocks caused by permanently locked queues.
293
self.thread_wakeup = executor._executor_manager_thread_wakeup
294
self.shutdown_lock = executor._shutdown_lock
295
296
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
297
# to determine if the ProcessPoolExecutor has been garbage collected
298
# and that the manager can exit.
299
# When the executor gets garbage collected, the weakref callback
300
# will wake up the queue management thread so that it can terminate
301
# if there is no pending work item.
302
def weakref_cb(_,
303
thread_wakeup=self.thread_wakeup,
304
shutdown_lock=self.shutdown_lock):
305
mp.util.debug('Executor collected: triggering callback for'
306
' QueueManager wakeup')
307
with shutdown_lock:
308
thread_wakeup.wakeup()
309
310
self.executor_reference = weakref.ref(executor, weakref_cb)
311
312
# A list of the ctx.Process instances used as workers.
313
self.processes = executor._processes
314
315
# A ctx.Queue that will be filled with _CallItems derived from
316
# _WorkItems for processing by the process workers.
317
self.call_queue = executor._call_queue
318
319
# A ctx.SimpleQueue of _ResultItems generated by the process workers.
320
self.result_queue = executor._result_queue
321
322
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
323
self.work_ids_queue = executor._work_ids
324
325
# Maximum number of tasks a worker process can execute before
326
# exiting safely
327
self.max_tasks_per_child = executor._max_tasks_per_child
328
329
# A dict mapping work ids to _WorkItems e.g.
330
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
331
self.pending_work_items = executor._pending_work_items
332
333
super().__init__()
334
335
def run(self):
336
# Main loop for the executor manager thread.
337
338
while True:
339
self.add_call_item_to_queue()
340
341
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
342
343
if is_broken:
344
self.terminate_broken(cause)
345
return
346
if result_item is not None:
347
self.process_result_item(result_item)
348
349
process_exited = result_item.exit_pid is not None
350
if process_exited:
351
p = self.processes.pop(result_item.exit_pid)
352
p.join()
353
354
# Delete reference to result_item to avoid keeping references
355
# while waiting on new results.
356
del result_item
357
358
if executor := self.executor_reference():
359
if process_exited:
360
with self.shutdown_lock:
361
executor._adjust_process_count()
362
else:
363
executor._idle_worker_semaphore.release()
364
del executor
365
366
if self.is_shutting_down():
367
self.flag_executor_shutting_down()
368
369
# When only canceled futures remain in pending_work_items, our
370
# next call to wait_result_broken_or_wakeup would hang forever.
371
# This makes sure we have some running futures or none at all.
372
self.add_call_item_to_queue()
373
374
# Since no new work items can be added, it is safe to shutdown
375
# this thread if there are no pending work items.
376
if not self.pending_work_items:
377
self.join_executor_internals()
378
return
379
380
def add_call_item_to_queue(self):
381
# Fills call_queue with _WorkItems from pending_work_items.
382
# This function never blocks.
383
while True:
384
if self.call_queue.full():
385
return
386
try:
387
work_id = self.work_ids_queue.get(block=False)
388
except queue.Empty:
389
return
390
else:
391
work_item = self.pending_work_items[work_id]
392
393
if work_item.future.set_running_or_notify_cancel():
394
self.call_queue.put(_CallItem(work_id,
395
work_item.fn,
396
work_item.args,
397
work_item.kwargs),
398
block=True)
399
else:
400
del self.pending_work_items[work_id]
401
continue
402
403
def wait_result_broken_or_wakeup(self):
404
# Wait for a result to be ready in the result_queue while checking
405
# that all worker processes are still running, or for a wake up
406
# signal send. The wake up signals come either from new tasks being
407
# submitted, from the executor being shutdown/gc-ed, or from the
408
# shutdown of the python interpreter.
409
result_reader = self.result_queue._reader
410
assert not self.thread_wakeup._closed
411
wakeup_reader = self.thread_wakeup._reader
412
readers = [result_reader, wakeup_reader]
413
worker_sentinels = [p.sentinel for p in list(self.processes.values())]
414
ready = mp.connection.wait(readers + worker_sentinels)
415
416
cause = None
417
is_broken = True
418
result_item = None
419
if result_reader in ready:
420
try:
421
result_item = result_reader.recv()
422
is_broken = False
423
except BaseException as e:
424
cause = format_exception(type(e), e, e.__traceback__)
425
426
elif wakeup_reader in ready:
427
is_broken = False
428
429
with self.shutdown_lock:
430
self.thread_wakeup.clear()
431
432
return result_item, is_broken, cause
433
434
def process_result_item(self, result_item):
435
# Process the received a result_item. This can be either the PID of a
436
# worker that exited gracefully or a _ResultItem
437
438
if isinstance(result_item, int):
439
# Clean shutdown of a worker using its PID
440
# (avoids marking the executor broken)
441
assert self.is_shutting_down()
442
p = self.processes.pop(result_item)
443
p.join()
444
if not self.processes:
445
self.join_executor_internals()
446
return
447
else:
448
# Received a _ResultItem so mark the future as completed.
449
work_item = self.pending_work_items.pop(result_item.work_id, None)
450
# work_item can be None if another process terminated (see above)
451
if work_item is not None:
452
if result_item.exception:
453
work_item.future.set_exception(result_item.exception)
454
else:
455
work_item.future.set_result(result_item.result)
456
457
def is_shutting_down(self):
458
# Check whether we should start shutting down the executor.
459
executor = self.executor_reference()
460
# No more work items can be added if:
461
# - The interpreter is shutting down OR
462
# - The executor that owns this worker has been collected OR
463
# - The executor that owns this worker has been shutdown.
464
return (_global_shutdown or executor is None
465
or executor._shutdown_thread)
466
467
def terminate_broken(self, cause):
468
# Terminate the executor because it is in a broken state. The cause
469
# argument can be used to display more information on the error that
470
# lead the executor into becoming broken.
471
472
# Mark the process pool broken so that submits fail right now.
473
executor = self.executor_reference()
474
if executor is not None:
475
executor._broken = ('A child process terminated '
476
'abruptly, the process pool is not '
477
'usable anymore')
478
executor._shutdown_thread = True
479
executor = None
480
481
# All pending tasks are to be marked failed with the following
482
# BrokenProcessPool error
483
bpe = BrokenProcessPool("A process in the process pool was "
484
"terminated abruptly while the future was "
485
"running or pending.")
486
if cause is not None:
487
bpe.__cause__ = _RemoteTraceback(
488
f"\n'''\n{''.join(cause)}'''")
489
490
# Mark pending tasks as failed.
491
for work_id, work_item in self.pending_work_items.items():
492
work_item.future.set_exception(bpe)
493
# Delete references to object. See issue16284
494
del work_item
495
self.pending_work_items.clear()
496
497
# Terminate remaining workers forcibly: the queues or their
498
# locks may be in a dirty state and block forever.
499
for p in self.processes.values():
500
p.terminate()
501
502
# clean up resources
503
self.join_executor_internals()
504
505
def flag_executor_shutting_down(self):
506
# Flag the executor as shutting down and cancel remaining tasks if
507
# requested as early as possible if it is not gc-ed yet.
508
executor = self.executor_reference()
509
if executor is not None:
510
executor._shutdown_thread = True
511
# Cancel pending work items if requested.
512
if executor._cancel_pending_futures:
513
# Cancel all pending futures and update pending_work_items
514
# to only have futures that are currently running.
515
new_pending_work_items = {}
516
for work_id, work_item in self.pending_work_items.items():
517
if not work_item.future.cancel():
518
new_pending_work_items[work_id] = work_item
519
self.pending_work_items = new_pending_work_items
520
# Drain work_ids_queue since we no longer need to
521
# add items to the call queue.
522
while True:
523
try:
524
self.work_ids_queue.get_nowait()
525
except queue.Empty:
526
break
527
# Make sure we do this only once to not waste time looping
528
# on running processes over and over.
529
executor._cancel_pending_futures = False
530
531
def shutdown_workers(self):
532
n_children_to_stop = self.get_n_children_alive()
533
n_sentinels_sent = 0
534
# Send the right number of sentinels, to make sure all children are
535
# properly terminated.
536
while (n_sentinels_sent < n_children_to_stop
537
and self.get_n_children_alive() > 0):
538
for i in range(n_children_to_stop - n_sentinels_sent):
539
try:
540
self.call_queue.put_nowait(None)
541
n_sentinels_sent += 1
542
except queue.Full:
543
break
544
545
def join_executor_internals(self):
546
self.shutdown_workers()
547
# Release the queue's resources as soon as possible.
548
self.call_queue.close()
549
self.call_queue.join_thread()
550
with self.shutdown_lock:
551
self.thread_wakeup.close()
552
# If .join() is not called on the created processes then
553
# some ctx.Queue methods may deadlock on Mac OS X.
554
for p in self.processes.values():
555
p.join()
556
557
def get_n_children_alive(self):
558
# This is an upper bound on the number of children alive.
559
return sum(p.is_alive() for p in self.processes.values())
560
561
562
_system_limits_checked = False
563
_system_limited = None
564
565
566
def _check_system_limits():
567
global _system_limits_checked, _system_limited
568
if _system_limits_checked:
569
if _system_limited:
570
raise NotImplementedError(_system_limited)
571
_system_limits_checked = True
572
try:
573
import multiprocessing.synchronize
574
except ImportError:
575
_system_limited = (
576
"This Python build lacks multiprocessing.synchronize, usually due "
577
"to named semaphores being unavailable on this platform."
578
)
579
raise NotImplementedError(_system_limited)
580
try:
581
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
582
except (AttributeError, ValueError):
583
# sysconf not available or setting not available
584
return
585
if nsems_max == -1:
586
# indetermined limit, assume that limit is determined
587
# by available memory only
588
return
589
if nsems_max >= 256:
590
# minimum number of semaphores available
591
# according to POSIX
592
return
593
_system_limited = ("system provides too few semaphores (%d"
594
" available, 256 necessary)" % nsems_max)
595
raise NotImplementedError(_system_limited)
596
597
598
def _chain_from_iterable_of_lists(iterable):
599
"""
600
Specialized implementation of itertools.chain.from_iterable.
601
Each item in *iterable* should be a list. This function is
602
careful not to keep references to yielded objects.
603
"""
604
for element in iterable:
605
element.reverse()
606
while element:
607
yield element.pop()
608
609
610
class BrokenProcessPool(_base.BrokenExecutor):
611
"""
612
Raised when a process in a ProcessPoolExecutor terminated abruptly
613
while a future was in the running state.
614
"""
615
616
617
class ProcessPoolExecutor(_base.Executor):
618
def __init__(self, max_workers=None, mp_context=None,
619
initializer=None, initargs=(), *, max_tasks_per_child=None):
620
"""Initializes a new ProcessPoolExecutor instance.
621
622
Args:
623
max_workers: The maximum number of processes that can be used to
624
execute the given calls. If None or not given then as many
625
worker processes will be created as the machine has processors.
626
mp_context: A multiprocessing context to launch the workers created
627
using the multiprocessing.get_context('start method') API. This
628
object should provide SimpleQueue, Queue and Process.
629
initializer: A callable used to initialize worker processes.
630
initargs: A tuple of arguments to pass to the initializer.
631
max_tasks_per_child: The maximum number of tasks a worker process
632
can complete before it will exit and be replaced with a fresh
633
worker process. The default of None means worker process will
634
live as long as the executor. Requires a non-'fork' mp_context
635
start method. When given, we default to using 'spawn' if no
636
mp_context is supplied.
637
"""
638
_check_system_limits()
639
640
if max_workers is None:
641
self._max_workers = os.cpu_count() or 1
642
if sys.platform == 'win32':
643
self._max_workers = min(_MAX_WINDOWS_WORKERS,
644
self._max_workers)
645
else:
646
if max_workers <= 0:
647
raise ValueError("max_workers must be greater than 0")
648
elif (sys.platform == 'win32' and
649
max_workers > _MAX_WINDOWS_WORKERS):
650
raise ValueError(
651
f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
652
653
self._max_workers = max_workers
654
655
if mp_context is None:
656
if max_tasks_per_child is not None:
657
mp_context = mp.get_context("spawn")
658
else:
659
mp_context = mp.get_context()
660
self._mp_context = mp_context
661
662
# https://github.com/python/cpython/issues/90622
663
self._safe_to_dynamically_spawn_children = (
664
self._mp_context.get_start_method(allow_none=False) != "fork")
665
666
if initializer is not None and not callable(initializer):
667
raise TypeError("initializer must be a callable")
668
self._initializer = initializer
669
self._initargs = initargs
670
671
if max_tasks_per_child is not None:
672
if not isinstance(max_tasks_per_child, int):
673
raise TypeError("max_tasks_per_child must be an integer")
674
elif max_tasks_per_child <= 0:
675
raise ValueError("max_tasks_per_child must be >= 1")
676
if self._mp_context.get_start_method(allow_none=False) == "fork":
677
# https://github.com/python/cpython/issues/90622
678
raise ValueError("max_tasks_per_child is incompatible with"
679
" the 'fork' multiprocessing start method;"
680
" supply a different mp_context.")
681
self._max_tasks_per_child = max_tasks_per_child
682
683
# Management thread
684
self._executor_manager_thread = None
685
686
# Map of pids to processes
687
self._processes = {}
688
689
# Shutdown is a two-step process.
690
self._shutdown_thread = False
691
self._shutdown_lock = threading.Lock()
692
self._idle_worker_semaphore = threading.Semaphore(0)
693
self._broken = False
694
self._queue_count = 0
695
self._pending_work_items = {}
696
self._cancel_pending_futures = False
697
698
# _ThreadWakeup is a communication channel used to interrupt the wait
699
# of the main loop of executor_manager_thread from another thread (e.g.
700
# when calling executor.submit or executor.shutdown). We do not use the
701
# _result_queue to send wakeup signals to the executor_manager_thread
702
# as it could result in a deadlock if a worker process dies with the
703
# _result_queue write lock still acquired.
704
#
705
# _shutdown_lock must be locked to access _ThreadWakeup.
706
self._executor_manager_thread_wakeup = _ThreadWakeup()
707
708
# Create communication channels for the executor
709
# Make the call queue slightly larger than the number of processes to
710
# prevent the worker processes from idling. But don't make it too big
711
# because futures in the call queue cannot be cancelled.
712
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
713
self._call_queue = _SafeQueue(
714
max_size=queue_size, ctx=self._mp_context,
715
pending_work_items=self._pending_work_items,
716
shutdown_lock=self._shutdown_lock,
717
thread_wakeup=self._executor_manager_thread_wakeup)
718
# Killed worker processes can produce spurious "broken pipe"
719
# tracebacks in the queue's own worker thread. But we detect killed
720
# processes anyway, so silence the tracebacks.
721
self._call_queue._ignore_epipe = True
722
self._result_queue = mp_context.SimpleQueue()
723
self._work_ids = queue.Queue()
724
725
def _start_executor_manager_thread(self):
726
if self._executor_manager_thread is None:
727
# Start the processes so that their sentinels are known.
728
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
729
self._launch_processes()
730
self._executor_manager_thread = _ExecutorManagerThread(self)
731
self._executor_manager_thread.start()
732
_threads_wakeups[self._executor_manager_thread] = \
733
self._executor_manager_thread_wakeup
734
735
def _adjust_process_count(self):
736
# if there's an idle process, we don't need to spawn a new one.
737
if self._idle_worker_semaphore.acquire(blocking=False):
738
return
739
740
process_count = len(self._processes)
741
if process_count < self._max_workers:
742
# Assertion disabled as this codepath is also used to replace a
743
# worker that unexpectedly dies, even when using the 'fork' start
744
# method. That means there is still a potential deadlock bug. If a
745
# 'fork' mp_context worker dies, we'll be forking a new one when
746
# we know a thread is running (self._executor_manager_thread).
747
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
748
self._spawn_process()
749
750
def _launch_processes(self):
751
# https://github.com/python/cpython/issues/90622
752
assert not self._executor_manager_thread, (
753
'Processes cannot be fork()ed after the thread has started, '
754
'deadlock in the child processes could result.')
755
for _ in range(len(self._processes), self._max_workers):
756
self._spawn_process()
757
758
def _spawn_process(self):
759
p = self._mp_context.Process(
760
target=_process_worker,
761
args=(self._call_queue,
762
self._result_queue,
763
self._initializer,
764
self._initargs,
765
self._max_tasks_per_child))
766
p.start()
767
self._processes[p.pid] = p
768
769
def submit(self, fn, /, *args, **kwargs):
770
with self._shutdown_lock:
771
if self._broken:
772
raise BrokenProcessPool(self._broken)
773
if self._shutdown_thread:
774
raise RuntimeError('cannot schedule new futures after shutdown')
775
if _global_shutdown:
776
raise RuntimeError('cannot schedule new futures after '
777
'interpreter shutdown')
778
779
f = _base.Future()
780
w = _WorkItem(f, fn, args, kwargs)
781
782
self._pending_work_items[self._queue_count] = w
783
self._work_ids.put(self._queue_count)
784
self._queue_count += 1
785
# Wake up queue management thread
786
self._executor_manager_thread_wakeup.wakeup()
787
788
if self._safe_to_dynamically_spawn_children:
789
self._adjust_process_count()
790
self._start_executor_manager_thread()
791
return f
792
submit.__doc__ = _base.Executor.submit.__doc__
793
794
def map(self, fn, *iterables, timeout=None, chunksize=1):
795
"""Returns an iterator equivalent to map(fn, iter).
796
797
Args:
798
fn: A callable that will take as many arguments as there are
799
passed iterables.
800
timeout: The maximum number of seconds to wait. If None, then there
801
is no limit on the wait time.
802
chunksize: If greater than one, the iterables will be chopped into
803
chunks of size chunksize and submitted to the process pool.
804
If set to one, the items in the list will be sent one at a time.
805
806
Returns:
807
An iterator equivalent to: map(func, *iterables) but the calls may
808
be evaluated out-of-order.
809
810
Raises:
811
TimeoutError: If the entire result iterator could not be generated
812
before the given timeout.
813
Exception: If fn(*args) raises for any values.
814
"""
815
if chunksize < 1:
816
raise ValueError("chunksize must be >= 1.")
817
818
results = super().map(partial(_process_chunk, fn),
819
_get_chunks(*iterables, chunksize=chunksize),
820
timeout=timeout)
821
return _chain_from_iterable_of_lists(results)
822
823
def shutdown(self, wait=True, *, cancel_futures=False):
824
with self._shutdown_lock:
825
self._cancel_pending_futures = cancel_futures
826
self._shutdown_thread = True
827
if self._executor_manager_thread_wakeup is not None:
828
# Wake up queue management thread
829
self._executor_manager_thread_wakeup.wakeup()
830
831
if self._executor_manager_thread is not None and wait:
832
self._executor_manager_thread.join()
833
# To reduce the risk of opening too many files, remove references to
834
# objects that use file descriptors.
835
self._executor_manager_thread = None
836
self._call_queue = None
837
if self._result_queue is not None and wait:
838
self._result_queue.close()
839
self._result_queue = None
840
self._processes = None
841
self._executor_manager_thread_wakeup = None
842
843
shutdown.__doc__ = _base.Executor.shutdown.__doc__
844
845