Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sage
Path: blob/develop/src/sage_setup/run_parallel.py
4052 views
1
########################################################################
2
##
3
## Parallel gcc execution
4
##
5
## This code is responsible for making distutils dispatch the calls to
6
## build_ext in parallel. Since distutils doesn't seem to do this by
7
## default, we create our own extension builder and override the
8
## appropriate methods. Unfortunately, in distutils, the logic of
9
## deciding whether an extension needs to be recompiled and actually
10
## making the call to gcc to recompile the extension are in the same
11
## function. As a result, we can't just override one function and have
12
## everything magically work. Instead, we split this work between two
13
## functions. This works fine for our application, but it means that
14
## we can't use this modification to make the other parts of Sage that
15
## build with distutils call gcc in parallel.
16
##
17
########################################################################
18
19
import os
20
import sys
21
import time
22
23
keep_going = False
24
25
26
def run_command(cmd):
27
"""
28
INPUT:
29
30
- ``cmd`` -- a string; a command to run
31
32
OUTPUT: prints ``cmd`` to the console and then runs
33
``os.system(cmd)``.
34
"""
35
print(cmd)
36
sys.stdout.flush()
37
return os.system(cmd)
38
39
40
def apply_func_progress(p):
41
"""
42
Given a triple p consisting of a function, value and a string,
43
output the string and apply the function to the value.
44
45
The string could for example be some progress indicator.
46
47
This exists solely because we can't pickle an anonymous function
48
in execute_list_of_commands_in_parallel below.
49
"""
50
sys.stdout.write(p[2])
51
sys.stdout.flush()
52
return p[0](p[1])
53
54
55
def execute_list_of_commands_in_parallel(command_list, nthreads):
56
"""
57
Execute the given list of commands, possibly in parallel, using
58
``nthreads`` threads. Terminates ``setup.py`` with an exit code
59
of 1 if an error occurs in any subcommand.
60
61
INPUT:
62
63
- ``command_list`` -- a list of commands, each given as a pair of
64
the form ``[function, argument]`` of a function to call and its
65
argument
66
67
- ``nthreads`` -- integer; number of threads to use
68
69
WARNING: commands are run roughly in order, but of course successive
70
commands may be run at the same time.
71
"""
72
# Add progress indicator strings to the command_list
73
N = len(command_list)
74
progress_fmt = "[{:%i}/{}] " % len(str(N))
75
for i in range(N):
76
progress = progress_fmt.format(i+1, N)
77
command_list[i] = command_list[i] + (progress,)
78
79
from multiprocessing import Pool
80
# map_async handles KeyboardInterrupt correctly if an argument is
81
# given to get(). Plain map() and apply_async() do not work
82
# correctly, see Issue #16113.
83
pool = Pool(nthreads)
84
result = pool.map_async(apply_func_progress, command_list, 1).get(99999)
85
pool.close()
86
pool.join()
87
process_command_results(result)
88
89
90
def process_command_results(result_values):
91
error = None
92
for r in result_values:
93
if r:
94
print("Error running command, failed with status %s." % r)
95
if not keep_going:
96
sys.exit(1)
97
error = r
98
if error:
99
sys.exit(1)
100
101
102
def execute_list_of_commands(command_list):
103
"""
104
INPUT:
105
106
- ``command_list`` -- a list of strings or pairs
107
108
OUTPUT:
109
110
For each entry in command_list, we attempt to run the command.
111
If it is a string, we call ``os.system()``. If it is a pair [f, v],
112
we call f(v).
113
114
If the environment variable :envvar:`SAGE_NUM_THREADS` is set, use
115
that many threads.
116
"""
117
t = time.time()
118
# Determine the number of threads from the environment variable
119
# SAGE_NUM_THREADS, which is set automatically by sage-env
120
try:
121
nthreads = int(os.environ['SAGE_NUM_THREADS'])
122
except KeyError:
123
nthreads = 1
124
125
# normalize the command_list to handle strings correctly
126
command_list = [ [run_command, x] if isinstance(x, str) else x for x in command_list ]
127
128
# No need for more threads than there are commands, but at least one
129
nthreads = min(len(command_list), nthreads)
130
nthreads = max(1, nthreads)
131
132
def plural(n, noun):
133
if n == 1:
134
return "1 %s" % noun
135
return "%i %ss" % (n, noun)
136
137
print("Executing %s (using %s)" % (plural(len(command_list),"command"), plural(nthreads,"thread")))
138
execute_list_of_commands_in_parallel(command_list, nthreads)
139
print("Time to execute %s: %.2f seconds." % (plural(len(command_list),"command"), time.time() - t))
140
141