Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagelib
Path: blob/master/sage/parallel/decorate.py
4054 views
1
r"""
2
Decorate interface for parallel computation
3
"""
4
5
import types
6
7
from sage.rings.all import Integer
8
9
from reference import parallel_iter as p_iter_reference
10
from use_fork import p_iter_fork
11
import multiprocessing_sage
12
13
def normalize_input(a):
14
r"""
15
Convert ``a`` to a pair ``(args, kwds)`` using some rules:
16
17
- if already of that form, leave that way.
18
- if ``a`` is a tuple make ``(a,{})``
19
- if ``a`` is a dict make ``(tuple([]),a)``
20
- otherwise make ``((a,),{})``
21
22
INPUT:
23
24
- ``a`` -- object
25
26
OUTPUT:
27
28
- ``args`` -- tuple
29
- ``kwds`` -- dictionary
30
31
EXAMPLES::
32
33
sage: sage.parallel.decorate.normalize_input( (2, {3:4}) )
34
((2, {3: 4}), {})
35
sage: sage.parallel.decorate.normalize_input( (2,3) )
36
((2, 3), {})
37
sage: sage.parallel.decorate.normalize_input( {3:4} )
38
((), {3: 4})
39
sage: sage.parallel.decorate.normalize_input( 5 )
40
((5,), {})
41
"""
42
if isinstance(a, tuple) and len(a) == 2 and isinstance(a[0],tuple) and isinstance(a[1],dict):
43
return a
44
elif isinstance(a, tuple):
45
return (a, {})
46
elif isinstance(a, dict):
47
return (tuple([]), a)
48
else:
49
return ((a,), {})
50
51
52
class Parallel:
53
r"""
54
Create a ``parallel``-decorated function.
55
This is the object created by :func:`parallel`.
56
"""
57
def __init__(self, p_iter='fork', ncpus=None, **kwds):
58
"""
59
EXAMPLES::
60
61
sage: P = sage.parallel.decorate.Parallel(); P
62
<sage.parallel.decorate.Parallel instance at 0x...>
63
"""
64
# The default p_iter is currently the 'fork' implementation.
65
# This has changed.
66
67
self.p_iter = None
68
69
if isinstance(p_iter, (int, long, Integer)):
70
p_iter, ncpus = 'fork', p_iter
71
72
if ncpus is None:
73
from ncpus import ncpus as compute_ncpus
74
ncpus = compute_ncpus()
75
76
if p_iter == 'fork':
77
self.p_iter = p_iter_fork(ncpus, **kwds)
78
elif p_iter == 'multiprocessing':
79
self.p_iter = multiprocessing_sage.pyprocessing(ncpus)
80
elif p_iter == 'reference':
81
self.p_iter = p_iter_reference
82
elif isinstance(p_iter, str):
83
raise ValueError, "unknown iterator '%s'" % p_iter
84
else:
85
if self.p_iter is None:
86
self.p_iter = p_iter
87
88
def __call__(self, f):
89
r"""
90
Create a callable object that wraps ``f`` and that when called
91
with a list of inputs returns an iterator over pairs ``(x,
92
f(x))`` in possibly random order. Here ``x`` is replaced by
93
its normalized form ``(args, kwds)`` using
94
:func:`normalize_inputs`.
95
96
INPUT:
97
98
- ``f`` -- Python callable object or function
99
100
OUTPUT:
101
102
- Decorated version of ``f``
103
104
EXAMPLES::
105
106
sage: from sage.parallel.decorate import Parallel
107
sage: p = Parallel()
108
sage: f = x^2-1
109
sage: p(f)
110
<sage.parallel.decorate.ParallelFunction object at ...>
111
112
sage: P = sage.parallel.decorate.Parallel()
113
sage: def g(n,m): return n+m
114
sage: h = P(g) # indirect doctest
115
sage: list(h([(2,3)]))
116
[(((2, 3), {}), 5)]
117
"""
118
return ParallelFunction(self, f)
119
120
class ParallelFunction(object):
121
"""
122
Class which parallelizes a function or class method.
123
This is typically accessed indirectly through
124
:meth:`Parallel.__call__`.
125
"""
126
def __init__(self, parallel, func):
127
"""
128
.. note::
129
130
This is typically accessed indirectly through
131
:meth:`Parallel.__call__`.
132
133
INPUT:
134
135
- ``parallel`` -- a :class:`Parallel` object which controls
136
how the parallel execution will be done.
137
138
- ``func`` -- Python callable object or function
139
140
"""
141
self.parallel = parallel
142
self.func = func
143
144
def __call__(self, *args, **kwds):
145
"""
146
EXAMPLES::
147
148
sage: from sage.parallel.decorate import Parallel
149
sage: p = Parallel()
150
sage: def f(x):
151
... return x*x
152
sage: pf = p(f); pf
153
<sage.parallel.decorate.ParallelFunction object at ...>
154
sage: pf(2)
155
4
156
sage: sorted(pf([2,3]))
157
[(((2,), {}), 4), (((3,), {}), 9)]
158
"""
159
if len(args) > 0 and isinstance(args[0], (list,
160
types.GeneratorType)):
161
return self.parallel.p_iter(self.func, (normalize_input(a)
162
for a in args[0]))
163
else:
164
return self.func(*args, **kwds)
165
166
def __get__(self, instance, owner):
167
"""
168
Implement part of the descriptor protocol for
169
:class:`ParallelFunction` objects.
170
171
.. note::
172
173
This is the key to fixing Trac #11461.
174
175
EXAMPLES:
176
177
We verify the the decorated functions work correctly on
178
methods, classmethods, and staticmethods, for both the
179
parallel and non-parallel versions::
180
181
sage: class Foo(object):
182
... @parallel(2)
183
... def square(self, n):
184
... return n*n
185
... @parallel(2)
186
... @classmethod
187
... def square_classmethod(cls, n):
188
... return n*n
189
... @parallel(2)
190
... @staticmethod
191
... def square_staticmethod(n):
192
... return n*n
193
sage: a = Foo()
194
sage: a.square(3)
195
9
196
sage: sorted(a.square([2,3]))
197
[(((2,), {}), 4), (((3,), {}), 9)]
198
sage: a.square_classmethod(3)
199
9
200
sage: sorted(a.square_classmethod([2,3]))
201
[(((2,), {}), 4), (((3,), {}), 9)]
202
sage: Foo.square_classmethod(3)
203
9
204
sage: sorted(Foo.square_classmethod([2,3]))
205
[(((2,), {}), 4), (((3,), {}), 9)]
206
sage: a.square_staticmethod(3)
207
9
208
sage: sorted(a.square_staticmethod([2,3]))
209
[(((2,), {}), 4), (((3,), {}), 9)]
210
sage: Foo.square_staticmethod(3)
211
9
212
sage: sorted(Foo.square_staticmethod([2,3]))
213
[(((2,), {}), 4), (((3,), {}), 9)]
214
"""
215
try:
216
#If this ParallelFunction object is accessed as an
217
#attribute of a class or instance, the underlying function
218
#should be "accessed" in the same way.
219
new_func = self.func.__get__(instance, owner)
220
except AttributeError:
221
#This will happen if a non-function attribute is
222
#decorated. For example, an expression that's an
223
#attribute of a class.
224
new_func = self.func
225
return ParallelFunction(self.parallel, new_func)
226
227
def _sage_argspec_(self):
228
"""
229
Returns the argument specification for this object, which is
230
just the argument specification for the underlying function.
231
See :module:`sage.misc.sageinspect` for more information on
232
this convention.
233
234
EXAMPLES::
235
236
sage: from sage.parallel.decorate import Parallel
237
sage: p = Parallel(2)
238
sage: def f(x, y):
239
... return x + y
240
sage: from sage.misc.sageinspect import sage_getargspec
241
sage: sage_getargspec(p(f))
242
ArgSpec(args=['x', 'y'], varargs=None, keywords=None, defaults=None)
243
"""
244
from sage.misc.sageinspect import sage_getargspec
245
return sage_getargspec(self.func)
246
247
def _sage_src_(self):
248
"""
249
Returns the source code for this object, which is just the
250
source code for the underlying function. See
251
:module:`sage.misc.sageinspect` for more information on this
252
convention.
253
254
EXAMPLES::
255
256
sage: from sage.parallel.decorate import Parallel
257
sage: p = Parallel(2)
258
sage: def f(x, y):
259
... return x + y
260
sage: from sage.misc.sageinspect import sage_getsource
261
sage: 'return x + y' in sage_getsource(p(f))
262
True
263
"""
264
from sage.misc.sageinspect import sage_getsource
265
return sage_getsource(self.func)
266
267
def _sage_doc_(self):
268
"""
269
Returns the docstring for this object, which is just the
270
docstring for the underlying function. See
271
:module:`sage.misc.sageinspect` for more information on this
272
convention.
273
274
EXAMPLES::
275
276
sage: from sage.parallel.decorate import Parallel
277
sage: p = Parallel(2)
278
sage: def f(x, y):
279
... '''Test docstring'''
280
... return x + y
281
sage: from sage.misc.sageinspect import sage_getdoc
282
sage: sage_getdoc(p(f))
283
'Test docstring\n'
284
"""
285
from sage.misc.sageinspect import sage_getdoc
286
return sage_getdoc(self.func)
287
288
289
def parallel(p_iter='fork', ncpus=None, **kwds):
290
r"""
291
This is a decorator that gives a function a parallel interface,
292
allowing it to be called with a list of inputs, whose values will
293
be computed in parallel.
294
295
.. warning::
296
297
The parallel subprocesses will not have access to data
298
created in pexpect interfaces. This behavior with respect to
299
pexpect interfaces is very important to keep in mind when
300
setting up certain computations. It's the one big limitation
301
of this decorator.
302
303
INPUT:
304
305
- ``p_iter`` -- parallel iterator function or string:
306
- ``'fork'`` -- (default) use a new forked subprocess for each input
307
- ``'multiprocessing'`` -- use multiprocessing library
308
- ``'reference'`` -- use a fake serial reference implementation
309
- ``ncpus`` -- integer, maximal number of subprocesses to use at the same time
310
- ``timeout`` -- number of seconds until each subprocess is killed (only supported
311
by 'fork'; zero means not at all)
312
313
.. warning::
314
315
If you use anything but ``'fork'`` above, then a whole new
316
subprocess is spawned, so none of your local state (variables,
317
certain functions, etc.) is available.
318
319
320
EXAMPLES:
321
322
We create a simple decoration for a simple function. The number
323
of cpus (or cores, or hardware threads) is automatically detected::
324
325
sage: @parallel
326
... def f(n): return n*n
327
sage: f(10)
328
100
329
sage: sorted(list(f([1,2,3])))
330
[(((1,), {}), 1), (((2,), {}), 4), (((3,), {}), 9)]
331
332
We use exactly two cpus::
333
334
sage: @parallel(2)
335
... def f(n): return n*n
336
337
338
We create a decorator that uses three subprocesses, and times out
339
individual processes after 10 seconds::
340
341
sage: @parallel(ncpus=3, timeout=10)
342
... def fac(n): return factor(2^n-1)
343
sage: for X, Y in sorted(list(fac([101,119,151,197,209]))): print X,Y
344
((101,), {}) 7432339208719 * 341117531003194129
345
((119,), {}) 127 * 239 * 20231 * 131071 * 62983048367 * 131105292137
346
((151,), {}) 18121 * 55871 * 165799 * 2332951 * 7289088383388253664437433
347
((197,), {}) 7487 * 26828803997912886929710867041891989490486893845712448833
348
((209,), {}) 23 * 89 * 524287 * 94803416684681 * 1512348937147247 * 5346950541323960232319657
349
350
sage: @parallel('multiprocessing')
351
... def f(N): return N^2
352
sage: v = list(f([1,2,4])); v.sort(); v
353
[(((1,), {}), 1), (((2,), {}), 4), (((4,), {}), 16)]
354
sage: @parallel('reference')
355
... def f(N): return N^2
356
sage: v = list(f([1,2,4])); v.sort(); v
357
[(((1,), {}), 1), (((2,), {}), 4), (((4,), {}), 16)]
358
359
For functions that take multiple arguments, enclose the arguments in tuples
360
when calling the parallel function::
361
362
sage: @parallel
363
... def f(a,b): return a*b
364
sage: for X, Y in sorted(list(f([(2,3),(3,5),(5,7)]))): print X, Y
365
((2, 3), {}) 6
366
((3, 5), {}) 15
367
((5, 7), {}) 35
368
369
For functions that take a single tuple as an argument, enclose it in an
370
additional tuple at call time, to distinguish it as the first argument,
371
as opposed to a tuple of arguments::
372
373
sage: @parallel
374
... def firstEntry(aTuple): return aTuple[0]
375
sage: for X, Y in sorted(list(firstEntry([((1,2,3,4),),((5,6,7,8),)]))): print X, Y
376
(((1, 2, 3, 4),), {}) 1
377
(((5, 6, 7, 8),), {}) 5
378
379
The parallel decorator also works with methods, classmethods, and
380
staticmethods. Be sure to apply the parallel decorator after ("above")
381
either the ``classmethod`` or ``staticmethod`` decorators::
382
383
sage: class Foo(object):
384
... @parallel(2)
385
... def square(self, n):
386
... return n*n
387
... @parallel(2)
388
... @classmethod
389
... def square_classmethod(cls, n):
390
... return n*n
391
sage: a = Foo()
392
sage: a.square(3)
393
9
394
sage: sorted(a.square([2,3]))
395
[(((2,), {}), 4), (((3,), {}), 9)]
396
sage: Foo.square_classmethod(3)
397
9
398
sage: sorted(Foo.square_classmethod([2,3]))
399
[(((2,), {}), 4), (((3,), {}), 9)]
400
sage: Foo.square_classmethod(3)
401
9
402
403
.. warning::
404
405
Currently, parallel methods do not work with the
406
multiprocessing implementation.
407
"""
408
import types
409
if isinstance(p_iter, types.FunctionType):
410
return Parallel()(p_iter)
411
return Parallel(p_iter, ncpus, **kwds)
412
413
414
415
416
###################################################################
417
# The @fork decorator -- evaluate a function with no side effects
418
# in memory, so the only side effects (if any) are on disk.
419
#
420
# We have both a function and a class below, so that the decorator
421
# can be used with or without options:
422
#
423
# @fork
424
# def f(...): ...
425
# and
426
# @fork(...options...):
427
# def f(...): ...
428
###################################################################
429
430
class Fork:
431
"""
432
A ``fork`` decorator class.
433
"""
434
def __init__(self, timeout=0, verbose=False):
435
"""
436
INPUT:
437
438
- ``timeout`` -- (default: 0) kill the subprocess after it has run this
439
many seconds (wall time), or if ``timeout`` is zero, do not kill it.
440
- ``verbose`` -- (default: ``False``) whether to print anything about
441
what the decorator does (e.g., killing the subprocess)
442
443
EXAMPLES::
444
445
sage: sage.parallel.decorate.Fork()
446
<sage.parallel.decorate.Fork instance at 0x...>
447
sage: sage.parallel.decorate.Fork(timeout=3)
448
<sage.parallel.decorate.Fork instance at 0x...>
449
"""
450
self.timeout = timeout
451
self.verbose = verbose
452
453
def __call__(self, f):
454
"""
455
INPUT:
456
457
- ``f`` -- a function
458
459
OUTPUT:
460
461
- A decorated function.
462
463
EXAMPLES::
464
465
sage: F = sage.parallel.decorate.Fork(timeout=3)
466
sage: def g(n,m): return n+m
467
sage: h = F(g) # indirect doctest
468
sage: h(2,3)
469
5
470
"""
471
P = Parallel(p_iter='fork', ncpus=1, timeout=self.timeout,
472
verbose=self.verbose)
473
g = P(f)
474
def h(*args, **kwds):
475
return list(g([(args, kwds)]))[0][1]
476
return h
477
478
def fork(f=None, timeout=0, verbose=False):
479
"""
480
Decorate a function so that when called it runs in a forked
481
subprocess. This means that it won't have any in-memory
482
side effects on the parent Sage process. The pexpect interfaces
483
are all reset.
484
485
INPUT:
486
487
- ``f`` -- a function
488
- ``timeout`` -- (default: 0) if positive, kill the subprocess after
489
this many seconds (wall time)
490
- ``verbose`` -- (default: ``False``) whether to print anything
491
about what the decorator does (e.g., killing the subprocess)
492
493
.. warning::
494
495
The forked subprocess will not have access to data created
496
in pexpect interfaces. This behavior with respect to pexpect
497
interfaces is very important to keep in mind when setting up
498
certain computations. It's the one big limitation of this
499
decorator.
500
501
EXAMPLES:
502
503
We create a function and run it with the ``fork`` decorator. Note
504
that it does not have a side effect. Despite trying to change
505
the global variable ``a`` below in ``g``, the variable ``a`` does not
506
get changed::
507
508
sage: a = 5
509
sage: @fork
510
... def g(n, m):
511
... global a
512
... a = 10
513
... return factorial(n).ndigits() + m
514
sage: g(5, m=5)
515
8
516
sage: a
517
5
518
519
We use ``fork`` to make sure that the function terminates after one
520
second, no matter what::
521
522
sage: @fork(timeout=1, verbose=True)
523
... def g(n, m): return factorial(n).ndigits() + m
524
sage: g(5, m=5)
525
8
526
sage: g(10^7, m=5)
527
Killing subprocess ... with input ((10000000,), {'m': 5}) which took too long
528
'NO DATA (timed out)'
529
530
We illustrate that the state of the pexpect interface is not altered by
531
forked functions (they get their own new pexpect interfaces!)::
532
533
sage: gp.eval('a = 5')
534
'5'
535
sage: @fork()
536
... def g():
537
... gp.eval('a = 10')
538
... return gp.eval('a')
539
sage: g()
540
'10'
541
sage: gp.eval('a')
542
'5'
543
544
We illustrate that the forked function has its own pexpect
545
interface::
546
547
sage: gp.eval('a = 15')
548
'15'
549
sage: @fork()
550
... def g(): return gp.eval('a')
551
sage: g()
552
'a'
553
554
We illustrate that segfaulting subprocesses are no trouble at all::
555
556
sage: cython('def f(): print <char*>0')
557
sage: @fork
558
... def g(): f()
559
sage: g()
560
'NO DATA'
561
"""
562
F = Fork(timeout=timeout, verbose=verbose)
563
return F(f) if f else F
564
565