Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagelib
Path: blob/master/sage/parallel/use_fork.py
4045 views
1
"""
2
Parallel iterator built using the ``fork()`` system call
3
"""
4
5
################################################################################
6
# Copyright (C) 2010 William Stein <[email protected]>
7
#
8
# Distributed under the terms of (any version of) the GNU
9
# General Public License (GPL). The full text of the GPL is available at:
10
#
11
# http://www.gnu.org/licenses/
12
################################################################################
13
14
class p_iter_fork:
15
"""
16
A parallel iterator implemented using ``fork()``.
17
18
EXAMPLES::
19
20
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
21
<sage.parallel.use_fork.p_iter_fork instance at ...>
22
sage: X.ncpus
23
2
24
sage: X.timeout
25
3.0
26
sage: X.verbose
27
False
28
"""
29
def __init__(self, ncpus, timeout=0, verbose=False, reset_interfaces=True):
30
"""
31
Create a ``fork()``-based parallel iterator.
32
33
INPUT:
34
35
- ``ncpus`` -- the maximal number of simultaneous
36
subprocesses to spawn
37
- ``timeout`` -- (float, default: 0) wall time in seconds until
38
a subprocess is automatically killed
39
- ``verbose`` -- (default: False) whether to print
40
anything about what the iterator does (e.g., killing
41
subprocesses)
42
- ``reset_interfaces`` -- (default: True) whether to reset
43
all pexpect interfaces
44
45
EXAMPLES::
46
47
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
48
<sage.parallel.use_fork.p_iter_fork instance at ...>
49
sage: X.ncpus
50
2
51
sage: X.timeout
52
3.0
53
sage: X.verbose
54
False
55
"""
56
self.ncpus = int(ncpus)
57
if self.ncpus != ncpus: # check that there wasn't a roundoff
58
raise TypeError, "ncpus must be an integer"
59
self.timeout = float(timeout) # require a float
60
self.verbose = verbose
61
self.reset_interfaces = reset_interfaces
62
63
def __call__(self, f, inputs):
64
"""
65
Parallel iterator using ``fork()``.
66
67
INPUT:
68
69
- ``f`` -- a Python function that need not be pickleable or anything else!
70
- ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where
71
``args`` is a tuple and ``kwds`` is a dictionary.
72
73
OUTPUT:
74
75
EXAMPLES::
76
77
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
78
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
79
[(([10], {}), 100), (([20], {}), 400)]
80
sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})])))
81
[(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)]
82
"""
83
n = self.ncpus
84
v = list(inputs)
85
import os, sys, signal
86
from sage.structure.sage_object import load
87
from sage.misc.misc import tmp_dir, walltime
88
dir = tmp_dir()
89
timeout = self.timeout
90
# Subprocesses shouldn't inherit unflushed buffers (cf. #11778):
91
sys.stdout.flush()
92
sys.stderr.flush()
93
94
workers = {}
95
try:
96
while len(v) > 0 or len(workers) > 0:
97
# Spawn up to n subprocesses
98
while len(v) > 0 and len(workers) < n:
99
pid = os.fork()
100
# The way fork works is that pid returns the
101
# nonzero pid of the subprocess for the master
102
# process and returns 0 for the subprocess.
103
if pid:
104
# This is the parent master process.
105
workers[pid] = [v[0], walltime(), '']
106
del v[0]
107
else:
108
# This is the subprocess.
109
self._subprocess(f, dir, v[0])
110
111
if len(workers) > 0:
112
# Now wait for one subprocess to finish and report the result.
113
# However, wait at most the time since the oldest process started.
114
if timeout:
115
def mysig(a,b):
116
raise RuntimeError, "SIGALRM"
117
oldest = min([X[1] for X in workers.values()])
118
signal.signal(signal.SIGALRM, mysig)
119
signal.alarm(max(int(timeout - (walltime()-oldest)), 1))
120
try:
121
pid = os.wait()[0]
122
signal.signal(signal.SIGALRM, signal.SIG_IGN)
123
except RuntimeError:
124
signal.signal(signal.SIGALRM, signal.SIG_IGN)
125
# Kill workers that are too old
126
for pid, X in workers.iteritems():
127
if walltime() - X[1] > timeout:
128
if self.verbose:
129
print(
130
"Killing subprocess %s with input %s which took too long"
131
% (pid, X[0]) )
132
sys.stdout.flush()
133
os.kill(pid,9)
134
X[-1] = ' (timed out)'
135
else:
136
# If the computation was interrupted the pid
137
# might not be in the workers list, in which
138
# case we skip this.
139
if pid in workers:
140
# collect data from process that successfully terminated
141
sobj = os.path.join(dir, '%s.sobj'%pid)
142
if not os.path.exists(sobj):
143
X = "NO DATA" + workers[pid][-1] # the message field
144
else:
145
X = load(sobj, compress=False)
146
os.unlink(sobj)
147
out = os.path.join(dir, '%s.out'%pid)
148
if not os.path.exists(out):
149
output = "NO OUTPUT"
150
else:
151
output = open(out).read()
152
os.unlink(out)
153
154
if output.strip():
155
print output,
156
sys.stdout.flush()
157
158
yield (workers[pid][0], X)
159
del workers[pid]
160
161
except Exception, msg:
162
print msg
163
sys.stdout.flush()
164
165
finally:
166
167
# Clean up all temporary files.
168
try:
169
for X in os.listdir(dir):
170
os.unlink(os.path.join(dir, X))
171
os.rmdir(dir)
172
except OSError, msg:
173
if self.verbose:
174
print msg
175
sys.stdout.flush()
176
177
# Send "kill -9" signal to workers that are left.
178
if len(workers) > 0:
179
print "Killing any remaining workers..."
180
sys.stdout.flush()
181
for pid in workers.keys():
182
try:
183
os.kill(pid, 9)
184
except OSError, msg:
185
if self.verbose:
186
print msg
187
sys.stdout.flush()
188
os.wait()
189
190
def _subprocess(self, f, dir, x):
191
"""
192
Setup and run evaluation of ``f(*x[0], **x[1])``, storing the
193
result in the given directory ``dir``. This method is called by each
194
forked subprocess.
195
196
INPUT:
197
198
- ``f`` -- a function
199
- ``dir`` -- name of a directory
200
- ``x`` -- 2-tuple, with args and kwds
201
202
EXAMPLES:
203
204
We have only this indirect test, since a direct test would terminate the Sage session.
205
206
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
207
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
208
[(([10], {}), 100), (([20], {}), 400)]
209
"""
210
import os, sys
211
from sage.structure.sage_object import save
212
213
try:
214
# Make it so all stdout is sent to a file so it can
215
# be displayed.
216
out = os.path.join(dir, '%s.out'%os.getpid())
217
sys.stdout = open(out, 'w')
218
219
# Run some commands to tell Sage that its
220
# pid has changed (forcing a reload of
221
# misc).
222
import sage.misc.misc
223
reload(sage.misc.misc)
224
225
# The pexpect interfaces (and objects defined in them) are
226
# not valid.
227
if self.reset_interfaces:
228
sage.interfaces.quit.invalidate_all()
229
230
# Now evaluate the function f.
231
value = f(*x[0], **x[1])
232
233
# And save the result to disk.
234
sobj = os.path.join(dir, '%s.sobj'%os.getpid())
235
save(value, sobj, compress=False)
236
237
except Exception, msg:
238
# Important to print this, so it is seen by the caller.
239
print msg
240
finally:
241
sys.stdout.flush()
242
os._exit(0)
243
244