Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagesmc
Path: blob/master/src/sage/parallel/use_fork.py
8814 views
1
"""
2
Parallel iterator built using the ``fork()`` system call
3
"""
4
5
################################################################################
6
# Copyright (C) 2010 William Stein <[email protected]>
7
#
8
# Distributed under the terms of (any version of) the GNU
9
# General Public License (GPL). The full text of the GPL is available at:
10
#
11
# http://www.gnu.org/licenses/
12
################################################################################
13
14
from sage.ext.c_lib import AlarmInterrupt
15
from sage.misc.misc import alarm, cancel_alarm
16
17
class p_iter_fork:
18
"""
19
A parallel iterator implemented using ``fork()``.
20
21
EXAMPLES::
22
23
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
24
<sage.parallel.use_fork.p_iter_fork instance at ...>
25
sage: X.ncpus
26
2
27
sage: X.timeout
28
3.0
29
sage: X.verbose
30
False
31
"""
32
def __init__(self, ncpus, timeout=0, verbose=False, reset_interfaces=True):
33
"""
34
Create a ``fork()``-based parallel iterator.
35
36
INPUT:
37
38
- ``ncpus`` -- the maximal number of simultaneous
39
subprocesses to spawn
40
- ``timeout`` -- (float, default: 0) wall time in seconds until
41
a subprocess is automatically killed
42
- ``verbose`` -- (default: False) whether to print
43
anything about what the iterator does (e.g., killing
44
subprocesses)
45
- ``reset_interfaces`` -- (default: True) whether to reset
46
all pexpect interfaces
47
48
EXAMPLES::
49
50
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
51
<sage.parallel.use_fork.p_iter_fork instance at ...>
52
sage: X.ncpus
53
2
54
sage: X.timeout
55
3.0
56
sage: X.verbose
57
False
58
"""
59
self.ncpus = int(ncpus)
60
if self.ncpus != ncpus: # check that there wasn't a roundoff
61
raise TypeError, "ncpus must be an integer"
62
self.timeout = float(timeout) # require a float
63
self.verbose = verbose
64
self.reset_interfaces = reset_interfaces
65
66
def __call__(self, f, inputs):
67
"""
68
Parallel iterator using ``fork()``.
69
70
INPUT:
71
72
- ``f`` -- a Python function that need not be pickleable or anything else!
73
74
- ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where
75
``args`` is a tuple and ``kwds`` is a dictionary.
76
77
OUTPUT:
78
79
EXAMPLES::
80
81
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
82
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
83
[(([10], {}), 100), (([20], {}), 400)]
84
sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})])))
85
[(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)]
86
87
TESTS:
88
89
The output of functions decorated with :func:parallel is read
90
as a pickle by the parent process. We intentionally break the
91
unpickling and demonstrate that this failure is handled
92
gracefully (an exception is displayed and an empty list is
93
returned)::
94
95
sage: Polygen = parallel(polygen)
96
sage: list(Polygen([QQ]))
97
[(((Rational Field,), {}), x)]
98
sage: from sage.structure.sage_object import unpickle_override, register_unpickle_override
99
sage: register_unpickle_override('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint', Integer)
100
sage: L = list(Polygen([QQ]))
101
('__init__() takes at most 2 positional arguments (4 given)', <type 'sage.rings.integer.Integer'>, (Univariate Polynomial Ring in x over Rational Field, [0, 1], False, True))
102
sage: L
103
[]
104
105
Fix the unpickling::
106
107
sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')]
108
sage: list(Polygen([QQ,QQ]))
109
[(((Rational Field,), {}), x), (((Rational Field,), {}), x)]
110
"""
111
n = self.ncpus
112
v = list(inputs)
113
import os, sys, signal
114
from sage.structure.sage_object import load
115
from sage.misc.all import tmp_dir, walltime
116
dir = tmp_dir()
117
timeout = self.timeout
118
119
workers = {}
120
try:
121
while len(v) > 0 or len(workers) > 0:
122
# Spawn up to n subprocesses
123
while len(v) > 0 and len(workers) < n:
124
# Subprocesses shouldn't inherit unflushed buffers (cf. #11778):
125
sys.stdout.flush()
126
sys.stderr.flush()
127
128
pid = os.fork()
129
# The way fork works is that pid returns the
130
# nonzero pid of the subprocess for the master
131
# process and returns 0 for the subprocess.
132
if pid:
133
# This is the parent master process.
134
workers[pid] = [v[0], walltime(), '']
135
del v[0]
136
else:
137
# This is the subprocess.
138
self._subprocess(f, dir, v[0])
139
140
if len(workers) > 0:
141
# Now wait for one subprocess to finish and report the result.
142
# However, wait at most the time since the oldest process started.
143
if timeout:
144
oldest = min([X[1] for X in workers.values()])
145
alarm(max(timeout - (walltime()-oldest), 0.1))
146
147
try:
148
pid = os.wait()[0]
149
cancel_alarm()
150
w = workers.pop(pid)
151
except AlarmInterrupt:
152
cancel_alarm()
153
# Kill workers that are too old
154
for pid, X in workers.iteritems():
155
if walltime() - X[1] > timeout:
156
if self.verbose:
157
print(
158
"Killing subprocess %s with input %s which took too long"
159
% (pid, X[0]) )
160
os.kill(pid, signal.SIGKILL)
161
X[-1] = ' (timed out)'
162
except KeyError:
163
# Some other process exited, not our problem...
164
pass
165
else:
166
# collect data from process that successfully terminated
167
sobj = os.path.join(dir, '%s.sobj'%pid)
168
if not os.path.exists(sobj):
169
X = "NO DATA" + w[-1] # the message field
170
else:
171
X = load(sobj, compress=False)
172
os.unlink(sobj)
173
out = os.path.join(dir, '%s.out'%pid)
174
if not os.path.exists(out):
175
output = "NO OUTPUT"
176
else:
177
output = open(out).read()
178
os.unlink(out)
179
180
if output.strip():
181
print output,
182
183
yield (w[0], X)
184
185
except Exception as msg:
186
print msg
187
188
finally:
189
# Clean up all temporary files.
190
try:
191
for X in os.listdir(dir):
192
os.unlink(os.path.join(dir, X))
193
os.rmdir(dir)
194
except OSError as msg:
195
if self.verbose:
196
print msg
197
198
# Send "kill -9" signal to workers that are left.
199
if len(workers) > 0:
200
if self.verbose:
201
print "Killing any remaining workers..."
202
sys.stdout.flush()
203
for pid in workers.keys():
204
try:
205
os.kill(pid, signal.SIGKILL)
206
os.waitpid(pid, 0)
207
except OSError as msg:
208
if self.verbose:
209
print msg
210
211
def _subprocess(self, f, dir, x):
212
"""
213
Setup and run evaluation of ``f(*x[0], **x[1])``, storing the
214
result in the given directory ``dir``. This method is called by each
215
forked subprocess.
216
217
INPUT:
218
219
- ``f`` -- a function
220
- ``dir`` -- name of a directory
221
- ``x`` -- 2-tuple, with args and kwds
222
223
EXAMPLES:
224
225
We have only this indirect test, since a direct test would terminate the Sage session.
226
227
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
228
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
229
[(([10], {}), 100), (([20], {}), 400)]
230
"""
231
import os, sys
232
from sage.structure.sage_object import save
233
234
try:
235
# Make it so all stdout is sent to a file so it can
236
# be displayed.
237
out = os.path.join(dir, '%s.out'%os.getpid())
238
sys.stdout = open(out, 'w')
239
240
# Run some commands to tell Sage that its
241
# pid has changed (forcing a reload of
242
# misc).
243
import sage.misc.misc
244
reload(sage.misc.misc)
245
246
# The pexpect interfaces (and objects defined in them) are
247
# not valid.
248
if self.reset_interfaces:
249
sage.interfaces.quit.invalidate_all()
250
251
# Now evaluate the function f.
252
value = f(*x[0], **x[1])
253
254
# And save the result to disk.
255
sobj = os.path.join(dir, '%s.sobj'%os.getpid())
256
save(value, sobj, compress=False)
257
258
except Exception, msg:
259
# Important to print this, so it is seen by the caller.
260
print msg
261
finally:
262
sys.stdout.flush()
263
os._exit(0)
264
265