Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Doc/includes/mp_pool.py
12 views
1
import multiprocessing
2
import time
3
import random
4
import sys
5
6
#
7
# Functions used by test code
8
#
9
10
def calculate(func, args):
11
result = func(*args)
12
return '%s says that %s%s = %s' % (
13
multiprocessing.current_process().name,
14
func.__name__, args, result
15
)
16
17
def calculatestar(args):
18
return calculate(*args)
19
20
def mul(a, b):
21
time.sleep(0.5 * random.random())
22
return a * b
23
24
def plus(a, b):
25
time.sleep(0.5 * random.random())
26
return a + b
27
28
def f(x):
29
return 1.0 / (x - 5.0)
30
31
def pow3(x):
32
return x ** 3
33
34
def noop(x):
35
pass
36
37
#
38
# Test code
39
#
40
41
def test():
42
PROCESSES = 4
43
print('Creating pool with %d processes\n' % PROCESSES)
44
45
with multiprocessing.Pool(PROCESSES) as pool:
46
#
47
# Tests
48
#
49
50
TASKS = [(mul, (i, 7)) for i in range(10)] + \
51
[(plus, (i, 8)) for i in range(10)]
52
53
results = [pool.apply_async(calculate, t) for t in TASKS]
54
imap_it = pool.imap(calculatestar, TASKS)
55
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
56
57
print('Ordered results using pool.apply_async():')
58
for r in results:
59
print('\t', r.get())
60
print()
61
62
print('Ordered results using pool.imap():')
63
for x in imap_it:
64
print('\t', x)
65
print()
66
67
print('Unordered results using pool.imap_unordered():')
68
for x in imap_unordered_it:
69
print('\t', x)
70
print()
71
72
print('Ordered results using pool.map() --- will block till complete:')
73
for x in pool.map(calculatestar, TASKS):
74
print('\t', x)
75
print()
76
77
#
78
# Test error handling
79
#
80
81
print('Testing error handling:')
82
83
try:
84
print(pool.apply(f, (5,)))
85
except ZeroDivisionError:
86
print('\tGot ZeroDivisionError as expected from pool.apply()')
87
else:
88
raise AssertionError('expected ZeroDivisionError')
89
90
try:
91
print(pool.map(f, list(range(10))))
92
except ZeroDivisionError:
93
print('\tGot ZeroDivisionError as expected from pool.map()')
94
else:
95
raise AssertionError('expected ZeroDivisionError')
96
97
try:
98
print(list(pool.imap(f, list(range(10)))))
99
except ZeroDivisionError:
100
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
101
else:
102
raise AssertionError('expected ZeroDivisionError')
103
104
it = pool.imap(f, list(range(10)))
105
for i in range(10):
106
try:
107
x = next(it)
108
except ZeroDivisionError:
109
if i == 5:
110
pass
111
except StopIteration:
112
break
113
else:
114
if i == 5:
115
raise AssertionError('expected ZeroDivisionError')
116
117
assert i == 9
118
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
119
print()
120
121
#
122
# Testing timeouts
123
#
124
125
print('Testing ApplyResult.get() with timeout:', end=' ')
126
res = pool.apply_async(calculate, TASKS[0])
127
while 1:
128
sys.stdout.flush()
129
try:
130
sys.stdout.write('\n\t%s' % res.get(0.02))
131
break
132
except multiprocessing.TimeoutError:
133
sys.stdout.write('.')
134
print()
135
print()
136
137
print('Testing IMapIterator.next() with timeout:', end=' ')
138
it = pool.imap(calculatestar, TASKS)
139
while 1:
140
sys.stdout.flush()
141
try:
142
sys.stdout.write('\n\t%s' % it.next(0.02))
143
except StopIteration:
144
break
145
except multiprocessing.TimeoutError:
146
sys.stdout.write('.')
147
print()
148
print()
149
150
151
if __name__ == '__main__':
152
multiprocessing.freeze_support()
153
test()
154
155