Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sage
Path: blob/develop/src/sage_docbuild/utils.py
4052 views
1
r"""
2
Utilities
3
"""
4
5
import errno
6
import os
7
import platform
8
import traceback
9
from typing import Optional
10
11
12
class RemoteException(Exception):
13
"""
14
Raised if an exception occurred in one of the child processes.
15
"""
16
17
tb: str
18
19
def __init__(self, tb: str):
20
"""
21
Initialize the exception.
22
23
INPUT:
24
25
- ``tb`` -- the traceback of the exception.
26
"""
27
self.tb = tb
28
29
def __str__(self):
30
"""
31
Return a string representation of the exception.
32
"""
33
return self.tb
34
35
36
class RemoteExceptionWrapper:
37
"""
38
Used by child processes to capture exceptions thrown during execution and
39
report them to the main process, including the correct traceback.
40
"""
41
42
exc: BaseException
43
tb: str
44
45
def __init__(self, exc: BaseException):
46
"""
47
Initialize the exception wrapper.
48
49
INPUT:
50
51
- ``exc`` -- the exception to wrap.
52
"""
53
self.exc = exc
54
self.tb = traceback.format_exc()
55
# We cannot pickle the traceback, thus convert it to a string.
56
# Later on unpickling, we set the original tracback as the cause of the exception
57
# This approach is taken from https://bugs.python.org/issue13831
58
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
59
tb = "".join(tb)
60
self.exc = exc
61
self.tb = f'\n"""\n{tb}"""'
62
63
@staticmethod
64
def _rebuild_exc(exc: BaseException, tb: str):
65
"""
66
Reconstructs the exception, putting the original exception as cause.
67
"""
68
exc.__cause__ = RemoteException(tb)
69
return exc
70
71
def __reduce__(self):
72
"""
73
TESTS::
74
sage: import pickle
75
sage: from sage_docbuild.utils import RemoteExceptionWrapper
76
sage: pickle.dumps(RemoteExceptionWrapper(ZeroDivisionError()), 0).decode()
77
...RemoteExceptionWrapper...ZeroDivisionError...
78
"""
79
return RemoteExceptionWrapper._rebuild_exc, (self.exc, self.tb)
80
81
82
class WorkerDiedException(RuntimeError):
83
"""Raised if a worker process dies unexpected."""
84
85
original_exception: Optional[BaseException]
86
87
def __init__(
88
self, message: Optional[str], original_exception: Optional[BaseException] = None
89
):
90
super().__init__(message)
91
self.original_exception = original_exception
92
93
94
def build_many(target, args, processes=None):
95
"""
96
Map a list of arguments in ``args`` to a single-argument target function
97
``target`` in parallel using ``multiprocessing.cpu_count()`` (or
98
``processes`` if given) simultaneous processes.
99
100
This is a simplified version of ``multiprocessing.Pool.map`` from the
101
Python standard library which avoids a couple of its pitfalls. In
102
particular, it can abort (with a :class:`RuntimeError`)
103
without hanging if one of
104
the worker processes unexpectedly dies. It also has semantics equivalent
105
to ``maxtasksperchild=1``; that is, one process is started per argument.
106
As such, this is inefficient for processing large numbers of fast tasks,
107
but appropriate for running longer tasks (such as doc builds) which may
108
also require significant cleanup.
109
110
It also avoids starting new processes from a pthread, which results in at
111
least one known issue:
112
113
* When PARI is built with multi-threading support, forking a Sage
114
process from a thread leaves the main Pari interface instance broken
115
(see :issue:`26608#comment:38`).
116
117
In the future this may be replaced by a generalized version of the more
118
robust parallel processing implementation from ``sage.doctest.forker``.
119
120
EXAMPLES::
121
122
sage: from sage_docbuild.utils import build_many
123
sage: def target(N):
124
....: import time
125
....: time.sleep(float(0.1))
126
....: print('Processed task %s' % N)
127
sage: _ = build_many(target, range(8), processes=8)
128
Processed task ...
129
Processed task ...
130
Processed task ...
131
Processed task ...
132
Processed task ...
133
Processed task ...
134
Processed task ...
135
Processed task ...
136
137
This version can also return a result, and thus can
138
be used as a replacement for ``multiprocessing.Pool.map`` (i.e. it still
139
blocks until the result is ready)::
140
141
sage: def square(N):
142
....: return N * N
143
sage: build_many(square, range(100))
144
[0, 1, 4, 9, ..., 9604, 9801]
145
146
If the target function raises an exception in any of the workers,
147
``build_many`` raises that exception and all other results are discarded.
148
Any in-progress tasks may still be allowed to complete gracefully before
149
the exception is raised::
150
151
sage: def target(N):
152
....: import time, os, signal
153
....: if N == 4:
154
....: # Task 4 is a poison pill
155
....: 1 / 0
156
....: else:
157
....: time.sleep(float(0.5))
158
....: print('Processed task %s' % N)
159
160
Note: In practice this test might still show output from the other worker
161
processes before the poison-pill is executed. It may also display the
162
traceback from the failing process on stderr. However, due to how the
163
doctest runner works, the doctest will only expect the final exception::
164
165
sage: build_many(target, range(8), processes=8)
166
Traceback (most recent call last):
167
...
168
raise ZeroDivisionError("rational division by zero")
169
ZeroDivisionError: rational division by zero
170
...
171
raise worker_exc.original_exception
172
ZeroDivisionError: rational division by zero
173
174
Similarly, if one of the worker processes dies unexpectedly otherwise exits
175
non-zero (e.g. killed by a signal) any in-progress tasks will be completed
176
gracefully, but then a :class:`RuntimeError` is raised and pending tasks
177
are not started::
178
179
sage: def target(N):
180
....: import time, os, signal
181
....: if N == 4:
182
....: # Task 4 is a poison pill
183
....: os.kill(os.getpid(), signal.SIGKILL)
184
....: else:
185
....: time.sleep(float(0.5))
186
....: print('Processed task %s' % N)
187
sage: build_many(target, range(8), processes=8)
188
Traceback (most recent call last):
189
...
190
WorkerDiedException: worker for 4 died with non-zero exit code -9
191
"""
192
from multiprocessing import Process, Queue, cpu_count, set_start_method
193
194
# With OS X, Python 3.8 defaults to use 'spawn' instead of 'fork'
195
# in multiprocessing, and Sage docbuilding doesn't work with
196
# 'spawn'. See trac #27754.
197
if platform.system() == "Darwin":
198
set_start_method("fork", force=True)
199
from queue import Empty
200
201
if processes is None:
202
processes = cpu_count()
203
204
workers = [None] * processes
205
tasks = enumerate(args)
206
results = []
207
result_queue = Queue()
208
209
# Utility functions #
210
def run_worker(target, queue, idx, task):
211
try:
212
result = target(task)
213
except BaseException as exc:
214
queue.put((None, RemoteExceptionWrapper(exc)))
215
else:
216
queue.put((idx, result))
217
218
def bring_out_yer_dead(w, task, exitcode):
219
"""
220
Handle a dead / completed worker. Raises WorkerDiedException if it
221
returned with a non-zero exit code.
222
"""
223
224
if w is None or exitcode is None:
225
# I'm not dead yet! (or I haven't even been born yet)
226
return (w, task)
227
228
# Hack: If we wait()ed on this worker manually we have to tell it
229
# it's dead:
230
if w._popen.returncode is None:
231
w._popen.returncode = exitcode
232
233
if exitcode != 0:
234
raise WorkerDiedException(
235
f"worker for {task[1]} died with non-zero exit code {w.exitcode}"
236
)
237
238
# Get result from the queue; depending on ordering this may not be
239
# *the* result for this worker, but for each completed worker there
240
# should be *a* result so let's get it
241
try:
242
result = result_queue.get_nowait()
243
if result[0] is None:
244
# Indicates that an exception occurred in the target function
245
exception = result[1]
246
raise WorkerDiedException("", original_exception=exception)
247
else:
248
results.append(result)
249
except Empty:
250
# Generally shouldn't happen but could in case of a race condition;
251
# don't worry we'll collect any remaining results at the end.
252
pass
253
254
# Helps multiprocessing with some internal bookkeeping
255
w.join()
256
257
return None
258
259
def wait_for_one():
260
"""Wait for a single process and return its pid and exit code."""
261
try:
262
pid, sts = os.wait()
263
except OSError as exc:
264
# No more processes to wait on if ECHILD
265
if exc.errno != errno.ECHILD:
266
raise
267
else:
268
return None, None
269
270
if os.WIFSIGNALED(sts):
271
exitcode = -os.WTERMSIG(sts)
272
else:
273
exitcode = os.WEXITSTATUS(sts)
274
275
return pid, exitcode
276
277
def reap_workers(waited_pid=None, waited_exitcode=None):
278
"""
279
This is the main worker handling loop.
280
281
Checks if workers have completed their tasks and spawns new workers if
282
there are more tasks on the queue. Returns `False` if there is more
283
work to be done or `True` if the work is complete.
284
285
Raises a ``WorkerDiedException`` if a worker exits unexpectedly.
286
"""
287
288
all_done = True
289
290
for idx, w in enumerate(workers):
291
if w is not None:
292
w, task = w
293
if w.pid == waited_pid:
294
exitcode = waited_exitcode
295
else:
296
exitcode = w.exitcode
297
298
w = bring_out_yer_dead(w, task, exitcode)
299
300
# Worker w is dead/not started, so start a new worker
301
# in its place with the next document from the queue
302
if w is None:
303
try:
304
task = next(tasks)
305
except StopIteration:
306
pass
307
else:
308
w = Process(target=run_worker, args=((target, result_queue) + task))
309
w.start()
310
# Pair the new worker with the task it's performing (mostly
311
# for debugging purposes)
312
w = (w, task)
313
314
workers[idx] = w
315
316
if w is not None:
317
all_done = False
318
319
# If all workers are dead and there are no more items to
320
# process in the queue then we are done
321
return all_done
322
323
# Main loop #
324
325
waited_pid = None
326
# Set along with waited_exitcode by calls to wait_for_one()
327
328
waited_exitcode = None
329
worker_exc = None # Set to a WorkerDiedException if one occurs
330
331
try:
332
while True:
333
# Check the status of each worker and break out of the loop if
334
# all work is done.
335
# We'll check each worker process against the returned
336
# pid back at the top of the `while True` loop. We also
337
# check any other processes that may have exited in the
338
# meantime
339
try:
340
if reap_workers(waited_pid, waited_exitcode):
341
break
342
except WorkerDiedException as exc:
343
worker_exc = exc
344
break
345
346
waited_pid, waited_exitcode = wait_for_one()
347
finally:
348
try:
349
remaining_workers = [w for w in workers if w is not None]
350
for w, _ in remaining_workers:
351
# Give any remaining workers a chance to shut down gracefully
352
try:
353
w.terminate()
354
except OSError as exc:
355
if exc.errno != errno.ESRCH:
356
# Otherwise it was already dead so this was expected
357
raise
358
for w, _ in remaining_workers:
359
w.join()
360
finally:
361
if worker_exc is not None:
362
# Re-raise the RuntimeError from bring_out_yer_dead set if a
363
# worker died unexpectedly, or the original exception if it's
364
# wrapping one
365
if worker_exc.original_exception is not None:
366
raise worker_exc.original_exception
367
else:
368
raise worker_exc
369
370
# All workers should be shut down by now and should have completed without
371
# error. No new items will be added to the result queue, so we can get all
372
# the remaining results, if any.
373
while True:
374
try:
375
results.append(result_queue.get_nowait())
376
except Empty:
377
break
378
379
# Return the results sorted according to their original task order
380
return [r[1] for r in sorted(results, key=lambda r: r[0])]
381
382