"""
Parallel iterator built using the ``fork()`` system call
"""
class p_iter_fork:
"""
A parallel iterator implemented using ``fork()``.
EXAMPLES::
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
<sage.parallel.use_fork.p_iter_fork instance at ...>
sage: X.ncpus
2
sage: X.timeout
3.0
sage: X.verbose
False
"""
def __init__(self, ncpus, timeout=0, verbose=False, reset_interfaces=True):
"""
Create a ``fork()``-based parallel iterator.
INPUT:
- ``ncpus`` -- the maximal number of simultaneous
subprocesses to spawn
- ``timeout`` -- (float, default: 0) wall time in seconds until
a subprocess is automatically killed
- ``verbose`` -- (default: False) whether to print
anything about what the iterator does (e.g., killing
subprocesses)
- ``reset_interfaces`` -- (default: True) whether to reset
all pexpect interfaces
EXAMPLES::
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X
<sage.parallel.use_fork.p_iter_fork instance at ...>
sage: X.ncpus
2
sage: X.timeout
3.0
sage: X.verbose
False
"""
self.ncpus = int(ncpus)
if self.ncpus != ncpus:
raise TypeError, "ncpus must be an integer"
self.timeout = float(timeout)
self.verbose = verbose
self.reset_interfaces = reset_interfaces
def __call__(self, f, inputs):
"""
Parallel iterator using ``fork()``.
INPUT:
- ``f`` -- a Python function that need not be pickleable or anything else!
- ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where
``args`` is a tuple and ``kwds`` is a dictionary.
OUTPUT:
EXAMPLES::
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
[(([10], {}), 100), (([20], {}), 400)]
sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})])))
[(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)]
"""
n = self.ncpus
v = list(inputs)
import os, sys, signal
from sage.structure.sage_object import load
from sage.misc.misc import tmp_dir, walltime
dir = tmp_dir()
timeout = self.timeout
sys.stdout.flush()
sys.stderr.flush()
workers = {}
try:
while len(v) > 0 or len(workers) > 0:
while len(v) > 0 and len(workers) < n:
pid = os.fork()
if pid:
workers[pid] = [v[0], walltime(), '']
del v[0]
else:
self._subprocess(f, dir, v[0])
if len(workers) > 0:
if timeout:
def mysig(a,b):
raise RuntimeError, "SIGALRM"
oldest = min([X[1] for X in workers.values()])
signal.signal(signal.SIGALRM, mysig)
signal.alarm(max(int(timeout - (walltime()-oldest)), 1))
try:
pid = os.wait()[0]
signal.signal(signal.SIGALRM, signal.SIG_IGN)
except RuntimeError:
signal.signal(signal.SIGALRM, signal.SIG_IGN)
for pid, X in workers.iteritems():
if walltime() - X[1] > timeout:
if self.verbose:
print(
"Killing subprocess %s with input %s which took too long"
% (pid, X[0]) )
sys.stdout.flush()
os.kill(pid,9)
X[-1] = ' (timed out)'
else:
if pid in workers:
sobj = os.path.join(dir, '%s.sobj'%pid)
if not os.path.exists(sobj):
X = "NO DATA" + workers[pid][-1]
else:
X = load(sobj, compress=False)
os.unlink(sobj)
out = os.path.join(dir, '%s.out'%pid)
if not os.path.exists(out):
output = "NO OUTPUT"
else:
output = open(out).read()
os.unlink(out)
if output.strip():
print output,
sys.stdout.flush()
yield (workers[pid][0], X)
del workers[pid]
except Exception, msg:
print msg
sys.stdout.flush()
finally:
try:
for X in os.listdir(dir):
os.unlink(os.path.join(dir, X))
os.rmdir(dir)
except OSError, msg:
if self.verbose:
print msg
sys.stdout.flush()
if len(workers) > 0:
print "Killing any remaining workers..."
sys.stdout.flush()
for pid in workers.keys():
try:
os.kill(pid, 9)
except OSError, msg:
if self.verbose:
print msg
sys.stdout.flush()
os.wait()
def _subprocess(self, f, dir, x):
"""
Setup and run evaluation of ``f(*x[0], **x[1])``, storing the
result in the given directory ``dir``. This method is called by each
forked subprocess.
INPUT:
- ``f`` -- a function
- ``dir`` -- name of a directory
- ``x`` -- 2-tuple, with args and kwds
EXAMPLES:
We have only this indirect test, since a direct test would terminate the Sage session.
sage: F = sage.parallel.use_fork.p_iter_fork(2,3)
sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})])))
[(([10], {}), 100), (([20], {}), 400)]
"""
import os, sys
from sage.structure.sage_object import save
try:
out = os.path.join(dir, '%s.out'%os.getpid())
sys.stdout = open(out, 'w')
import sage.misc.misc
reload(sage.misc.misc)
if self.reset_interfaces:
sage.interfaces.quit.invalidate_all()
value = f(*x[0], **x[1])
sobj = os.path.join(dir, '%s.sobj'%os.getpid())
save(value, sobj, compress=False)
except Exception, msg:
print msg
finally:
sys.stdout.flush()
os._exit(0)