Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Tools/ccbench/ccbench.py
12 views
1
# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3
from __future__ import division
4
from __future__ import print_function
5
6
"""
7
ccbench, a Python concurrency benchmark.
8
"""
9
10
import time
11
import os
12
import sys
13
import itertools
14
import threading
15
import subprocess
16
import socket
17
from optparse import OptionParser, SUPPRESS_HELP
18
import platform
19
20
# Compatibility
21
try:
22
xrange
23
except NameError:
24
xrange = range
25
26
try:
27
map = itertools.imap
28
except AttributeError:
29
pass
30
31
32
THROUGHPUT_DURATION = 2.0
33
34
LATENCY_PING_INTERVAL = 0.1
35
LATENCY_DURATION = 2.0
36
37
BANDWIDTH_PACKET_SIZE = 1024
38
BANDWIDTH_DURATION = 2.0
39
40
41
def task_pidigits():
42
"""Pi calculation (Python)"""
43
_map = map
44
_count = itertools.count
45
_islice = itertools.islice
46
47
def calc_ndigits(n):
48
# From http://shootout.alioth.debian.org/
49
def gen_x():
50
return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51
52
def compose(a, b):
53
aq, ar, as_, at = a
54
bq, br, bs, bt = b
55
return (aq * bq,
56
aq * br + ar * bt,
57
as_ * bq + at * bs,
58
as_ * br + at * bt)
59
60
def extract(z, j):
61
q, r, s, t = z
62
return (q*j + r) // (s*j + t)
63
64
def pi_digits():
65
z = (1, 0, 0, 1)
66
x = gen_x()
67
while 1:
68
y = extract(z, 3)
69
while y != extract(z, 4):
70
z = compose(z, next(x))
71
y = extract(z, 3)
72
z = compose((10, -10*y, 0, 1), z)
73
yield y
74
75
return list(_islice(pi_digits(), n))
76
77
return calc_ndigits, (50, )
78
79
def task_regex():
80
"""regular expression (C)"""
81
# XXX this task gives horrendous latency results.
82
import re
83
# Taken from the `inspect` module
84
pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85
with open(__file__, "r") as f:
86
arg = f.read(2000)
87
return pat.findall, (arg, )
88
89
def task_sort():
90
"""list sorting (C)"""
91
def list_sort(l):
92
l = l[::-1]
93
l.sort()
94
95
return list_sort, (list(range(1000)), )
96
97
def task_compress_zlib():
98
"""zlib compression (C)"""
99
import zlib
100
with open(__file__, "rb") as f:
101
arg = f.read(5000) * 3
102
103
def compress(s):
104
zlib.decompress(zlib.compress(s, 5))
105
return compress, (arg, )
106
107
def task_compress_bz2():
108
"""bz2 compression (C)"""
109
import bz2
110
with open(__file__, "rb") as f:
111
arg = f.read(3000) * 2
112
113
def compress(s):
114
bz2.compress(s)
115
return compress, (arg, )
116
117
def task_hashing():
118
"""SHA1 hashing (C)"""
119
import hashlib
120
with open(__file__, "rb") as f:
121
arg = f.read(5000) * 30
122
123
def compute(s):
124
hashlib.sha1(s).digest()
125
return compute, (arg, )
126
127
128
throughput_tasks = [task_pidigits, task_regex]
129
for mod in 'bz2', 'hashlib':
130
try:
131
globals()[mod] = __import__(mod)
132
except ImportError:
133
globals()[mod] = None
134
135
# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
136
# hashlib if available.
137
# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
138
if bz2 is not None:
139
throughput_tasks.append(task_compress_bz2)
140
elif hashlib is not None:
141
throughput_tasks.append(task_hashing)
142
else:
143
throughput_tasks.append(task_compress_zlib)
144
145
latency_tasks = throughput_tasks
146
bandwidth_tasks = [task_pidigits]
147
148
149
class TimedLoop:
150
def __init__(self, func, args):
151
self.func = func
152
self.args = args
153
154
def __call__(self, start_time, min_duration, end_event, do_yield=False):
155
step = 20
156
niters = 0
157
duration = 0.0
158
_time = time.time
159
_sleep = time.sleep
160
_func = self.func
161
_args = self.args
162
t1 = start_time
163
while True:
164
for i in range(step):
165
_func(*_args)
166
t2 = _time()
167
# If another thread terminated, the current measurement is invalid
168
# => return the previous one.
169
if end_event:
170
return niters, duration
171
niters += step
172
duration = t2 - start_time
173
if duration >= min_duration:
174
end_event.append(None)
175
return niters, duration
176
if t2 - t1 < 0.01:
177
# Minimize interference of measurement on overall runtime
178
step = step * 3 // 2
179
elif do_yield:
180
# OS scheduling of Python threads is sometimes so bad that we
181
# have to force thread switching ourselves, otherwise we get
182
# completely useless results.
183
_sleep(0.0001)
184
t1 = t2
185
186
187
def run_throughput_test(func, args, nthreads):
188
assert nthreads >= 1
189
190
# Warm up
191
func(*args)
192
193
results = []
194
loop = TimedLoop(func, args)
195
end_event = []
196
197
if nthreads == 1:
198
# Pure single-threaded performance, without any switching or
199
# synchronization overhead.
200
start_time = time.time()
201
results.append(loop(start_time, THROUGHPUT_DURATION,
202
end_event, do_yield=False))
203
return results
204
205
started = False
206
ready_cond = threading.Condition()
207
start_cond = threading.Condition()
208
ready = []
209
210
def run():
211
with ready_cond:
212
ready.append(None)
213
ready_cond.notify()
214
with start_cond:
215
while not started:
216
start_cond.wait()
217
results.append(loop(start_time, THROUGHPUT_DURATION,
218
end_event, do_yield=True))
219
220
threads = []
221
for i in range(nthreads):
222
threads.append(threading.Thread(target=run))
223
for t in threads:
224
t.daemon = True
225
t.start()
226
# We don't want measurements to include thread startup overhead,
227
# so we arrange for timing to start after all threads are ready.
228
with ready_cond:
229
while len(ready) < nthreads:
230
ready_cond.wait()
231
with start_cond:
232
start_time = time.time()
233
started = True
234
start_cond.notify(nthreads)
235
for t in threads:
236
t.join()
237
238
return results
239
240
def run_throughput_tests(max_threads):
241
for task in throughput_tasks:
242
print(task.__doc__)
243
print()
244
func, args = task()
245
nthreads = 1
246
baseline_speed = None
247
while nthreads <= max_threads:
248
results = run_throughput_test(func, args, nthreads)
249
# Taking the max duration rather than average gives pessimistic
250
# results rather than optimistic.
251
speed = sum(r[0] for r in results) / max(r[1] for r in results)
252
print("threads=%d: %d" % (nthreads, speed), end="")
253
if baseline_speed is None:
254
print(" iterations/s.")
255
baseline_speed = speed
256
else:
257
print(" ( %d %%)" % (speed / baseline_speed * 100))
258
nthreads += 1
259
print()
260
261
262
LAT_END = "END"
263
264
def _sendto(sock, s, addr):
265
sock.sendto(s.encode('ascii'), addr)
266
267
def _recv(sock, n):
268
return sock.recv(n).decode('ascii')
269
270
def latency_client(addr, nb_pings, interval):
271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
272
try:
273
_time = time.time
274
_sleep = time.sleep
275
def _ping():
276
_sendto(sock, "%r\n" % _time(), addr)
277
# The first ping signals the parent process that we are ready.
278
_ping()
279
# We give the parent a bit of time to notice.
280
_sleep(1.0)
281
for i in range(nb_pings):
282
_sleep(interval)
283
_ping()
284
_sendto(sock, LAT_END + "\n", addr)
285
finally:
286
sock.close()
287
288
def run_latency_client(**kwargs):
289
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
290
cmd_line.extend(['--latclient', repr(kwargs)])
291
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
292
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
293
294
def run_latency_test(func, args, nthreads):
295
# Create a listening socket to receive the pings. We use UDP which should
296
# be painlessly cross-platform.
297
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
298
sock.bind(("127.0.0.1", 0))
299
addr = sock.getsockname()
300
301
interval = LATENCY_PING_INTERVAL
302
duration = LATENCY_DURATION
303
nb_pings = int(duration / interval)
304
305
results = []
306
threads = []
307
end_event = []
308
start_cond = threading.Condition()
309
started = False
310
if nthreads > 0:
311
# Warm up
312
func(*args)
313
314
results = []
315
loop = TimedLoop(func, args)
316
ready = []
317
ready_cond = threading.Condition()
318
319
def run():
320
with ready_cond:
321
ready.append(None)
322
ready_cond.notify()
323
with start_cond:
324
while not started:
325
start_cond.wait()
326
loop(start_time, duration * 1.5, end_event, do_yield=False)
327
328
for i in range(nthreads):
329
threads.append(threading.Thread(target=run))
330
for t in threads:
331
t.daemon = True
332
t.start()
333
# Wait for threads to be ready
334
with ready_cond:
335
while len(ready) < nthreads:
336
ready_cond.wait()
337
338
# Run the client and wait for the first ping(s) to arrive before
339
# unblocking the background threads.
340
chunks = []
341
process = run_latency_client(addr=sock.getsockname(),
342
nb_pings=nb_pings, interval=interval)
343
s = _recv(sock, 4096)
344
_time = time.time
345
346
with start_cond:
347
start_time = _time()
348
started = True
349
start_cond.notify(nthreads)
350
351
while LAT_END not in s:
352
s = _recv(sock, 4096)
353
t = _time()
354
chunks.append((t, s))
355
356
# Tell the background threads to stop.
357
end_event.append(None)
358
for t in threads:
359
t.join()
360
process.wait()
361
sock.close()
362
363
for recv_time, chunk in chunks:
364
# NOTE: it is assumed that a line sent by a client wasn't received
365
# in two chunks because the lines are very small.
366
for line in chunk.splitlines():
367
line = line.strip()
368
if line and line != LAT_END:
369
send_time = eval(line)
370
assert isinstance(send_time, float)
371
results.append((send_time, recv_time))
372
373
return results
374
375
def run_latency_tests(max_threads):
376
for task in latency_tasks:
377
print("Background CPU task:", task.__doc__)
378
print()
379
func, args = task()
380
nthreads = 0
381
while nthreads <= max_threads:
382
results = run_latency_test(func, args, nthreads)
383
n = len(results)
384
# We print out milliseconds
385
lats = [1000 * (t2 - t1) for (t1, t2) in results]
386
#print(list(map(int, lats)))
387
avg = sum(lats) / n
388
dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
389
print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
390
print()
391
#print(" [... from %d samples]" % n)
392
nthreads += 1
393
print()
394
395
396
BW_END = "END"
397
398
def bandwidth_client(addr, packet_size, duration):
399
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
400
sock.bind(("127.0.0.1", 0))
401
local_addr = sock.getsockname()
402
_time = time.time
403
_sleep = time.sleep
404
def _send_chunk(msg):
405
_sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
406
# We give the parent some time to be ready.
407
_sleep(1.0)
408
try:
409
start_time = _time()
410
end_time = start_time + duration * 2.0
411
i = 0
412
while _time() < end_time:
413
_send_chunk(str(i))
414
s = _recv(sock, packet_size)
415
assert len(s) == packet_size
416
i += 1
417
_send_chunk(BW_END)
418
finally:
419
sock.close()
420
421
def run_bandwidth_client(**kwargs):
422
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
423
cmd_line.extend(['--bwclient', repr(kwargs)])
424
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
425
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
426
427
def run_bandwidth_test(func, args, nthreads):
428
# Create a listening socket to receive the packets. We use UDP which should
429
# be painlessly cross-platform.
430
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
431
sock.bind(("127.0.0.1", 0))
432
addr = sock.getsockname()
433
434
duration = BANDWIDTH_DURATION
435
packet_size = BANDWIDTH_PACKET_SIZE
436
437
results = []
438
threads = []
439
end_event = []
440
start_cond = threading.Condition()
441
started = False
442
if nthreads > 0:
443
# Warm up
444
func(*args)
445
446
results = []
447
loop = TimedLoop(func, args)
448
ready = []
449
ready_cond = threading.Condition()
450
451
def run():
452
with ready_cond:
453
ready.append(None)
454
ready_cond.notify()
455
with start_cond:
456
while not started:
457
start_cond.wait()
458
loop(start_time, duration * 1.5, end_event, do_yield=False)
459
460
for i in range(nthreads):
461
threads.append(threading.Thread(target=run))
462
for t in threads:
463
t.daemon = True
464
t.start()
465
# Wait for threads to be ready
466
with ready_cond:
467
while len(ready) < nthreads:
468
ready_cond.wait()
469
470
# Run the client and wait for the first packet to arrive before
471
# unblocking the background threads.
472
process = run_bandwidth_client(addr=addr,
473
packet_size=packet_size,
474
duration=duration)
475
_time = time.time
476
# This will also wait for the parent to be ready
477
s = _recv(sock, packet_size)
478
remote_addr = eval(s.partition('#')[0])
479
480
with start_cond:
481
start_time = _time()
482
started = True
483
start_cond.notify(nthreads)
484
485
n = 0
486
first_time = None
487
while not end_event and BW_END not in s:
488
_sendto(sock, s, remote_addr)
489
s = _recv(sock, packet_size)
490
if first_time is None:
491
first_time = _time()
492
n += 1
493
end_time = _time()
494
495
end_event.append(None)
496
for t in threads:
497
t.join()
498
process.kill()
499
500
return (n - 1) / (end_time - first_time)
501
502
def run_bandwidth_tests(max_threads):
503
for task in bandwidth_tasks:
504
print("Background CPU task:", task.__doc__)
505
print()
506
func, args = task()
507
nthreads = 0
508
baseline_speed = None
509
while nthreads <= max_threads:
510
results = run_bandwidth_test(func, args, nthreads)
511
speed = results
512
#speed = len(results) * 1.0 / results[-1][0]
513
print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
514
if baseline_speed is None:
515
print(" packets/s.")
516
baseline_speed = speed
517
else:
518
print(" ( %d %%)" % (speed / baseline_speed * 100))
519
nthreads += 1
520
print()
521
522
523
def main():
524
usage = "usage: %prog [-h|--help] [options]"
525
parser = OptionParser(usage=usage)
526
parser.add_option("-t", "--throughput",
527
action="store_true", dest="throughput", default=False,
528
help="run throughput tests")
529
parser.add_option("-l", "--latency",
530
action="store_true", dest="latency", default=False,
531
help="run latency tests")
532
parser.add_option("-b", "--bandwidth",
533
action="store_true", dest="bandwidth", default=False,
534
help="run I/O bandwidth tests")
535
parser.add_option("-i", "--interval",
536
action="store", type="int", dest="check_interval", default=None,
537
help="sys.setcheckinterval() value "
538
"(Python 3.8 and older)")
539
parser.add_option("-I", "--switch-interval",
540
action="store", type="float", dest="switch_interval", default=None,
541
help="sys.setswitchinterval() value "
542
"(Python 3.2 and newer)")
543
parser.add_option("-n", "--num-threads",
544
action="store", type="int", dest="nthreads", default=4,
545
help="max number of threads in tests")
546
547
# Hidden option to run the pinging and bandwidth clients
548
parser.add_option("", "--latclient",
549
action="store", dest="latclient", default=None,
550
help=SUPPRESS_HELP)
551
parser.add_option("", "--bwclient",
552
action="store", dest="bwclient", default=None,
553
help=SUPPRESS_HELP)
554
555
options, args = parser.parse_args()
556
if args:
557
parser.error("unexpected arguments")
558
559
if options.latclient:
560
kwargs = eval(options.latclient)
561
latency_client(**kwargs)
562
return
563
564
if options.bwclient:
565
kwargs = eval(options.bwclient)
566
bandwidth_client(**kwargs)
567
return
568
569
if not options.throughput and not options.latency and not options.bandwidth:
570
options.throughput = options.latency = options.bandwidth = True
571
if options.check_interval:
572
sys.setcheckinterval(options.check_interval)
573
if options.switch_interval:
574
sys.setswitchinterval(options.switch_interval)
575
576
print("== %s %s (%s) ==" % (
577
platform.python_implementation(),
578
platform.python_version(),
579
platform.python_build()[0],
580
))
581
# Processor identification often has repeated spaces
582
cpu = ' '.join(platform.processor().split())
583
print("== %s %s on '%s' ==" % (
584
platform.machine(),
585
platform.system(),
586
cpu,
587
))
588
print()
589
590
if options.throughput:
591
print("--- Throughput ---")
592
print()
593
run_throughput_tests(options.nthreads)
594
595
if options.latency:
596
print("--- Latency ---")
597
print()
598
run_latency_tests(options.nthreads)
599
600
if options.bandwidth:
601
print("--- I/O bandwidth ---")
602
print()
603
run_bandwidth_tests(options.nthreads)
604
605
if __name__ == "__main__":
606
main()
607
608