Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
official-stockfish
GitHub Repository: official-stockfish/Stockfish
Path: blob/master/tests/testing.py
376 views
1
import subprocess
2
from typing import List
3
import os
4
import collections
5
import time
6
import sys
7
import traceback
8
import fnmatch
9
from functools import wraps
10
from contextlib import redirect_stdout
11
import io
12
import tarfile
13
import pathlib
14
import concurrent.futures
15
import tempfile
16
import shutil
17
import requests
18
19
CYAN_COLOR = "\033[36m"
20
GRAY_COLOR = "\033[2m"
21
RED_COLOR = "\033[31m"
22
GREEN_COLOR = "\033[32m"
23
RESET_COLOR = "\033[0m"
24
WHITE_BOLD = "\033[1m"
25
26
MAX_TIMEOUT = 60 * 5
27
28
PATH = pathlib.Path(__file__).parent.resolve()
29
30
31
class Valgrind:
32
@staticmethod
33
def get_valgrind_command():
34
return [
35
"valgrind",
36
"--error-exitcode=42",
37
"--errors-for-leak-kinds=all",
38
"--leak-check=full",
39
]
40
41
@staticmethod
42
def get_valgrind_thread_command():
43
return ["valgrind", "--error-exitcode=42", "--fair-sched=try"]
44
45
46
class TSAN:
47
@staticmethod
48
def set_tsan_option():
49
with open(f"tsan.supp", "w") as f:
50
f.write(
51
"""
52
race:Stockfish::TTEntry::read
53
race:Stockfish::TTEntry::save
54
race:Stockfish::TranspositionTable::probe
55
race:Stockfish::TranspositionTable::hashfull
56
"""
57
)
58
59
os.environ["TSAN_OPTIONS"] = "suppressions=./tsan.supp"
60
61
@staticmethod
62
def unset_tsan_option():
63
os.environ.pop("TSAN_OPTIONS", None)
64
os.remove(f"tsan.supp")
65
66
67
class EPD:
68
@staticmethod
69
def create_bench_epd():
70
with open(f"{os.path.join(PATH,'bench_tmp.epd')}", "w") as f:
71
f.write(
72
"""
73
Rn6/1rbq1bk1/2p2n1p/2Bp1p2/3Pp1pP/1N2P1P1/2Q1NPB1/6K1 w - - 2 26
74
rnbqkb1r/ppp1pp2/5n1p/3p2p1/P2PP3/5P2/1PP3PP/RNBQKBNR w KQkq - 0 3
75
3qnrk1/4bp1p/1p2p1pP/p2bN3/1P1P1B2/P2BQ3/5PP1/4R1K1 w - - 9 28
76
r4rk1/1b2ppbp/pq4pn/2pp1PB1/1p2P3/1P1P1NN1/1PP3PP/R2Q1RK1 w - - 0 13
77
"""
78
)
79
80
@staticmethod
81
def delete_bench_epd():
82
os.remove(f"{os.path.join(PATH,'bench_tmp.epd')}")
83
84
85
class Syzygy:
86
@staticmethod
87
def get_syzygy_path():
88
return os.path.abspath("syzygy")
89
90
@staticmethod
91
def download_syzygy():
92
if not os.path.isdir(os.path.join(PATH, "syzygy")):
93
url = "https://api.github.com/repos/niklasf/python-chess/tarball/9b9aa13f9f36d08aadfabff872882f4ab1494e95"
94
file = "niklasf-python-chess-9b9aa13"
95
96
with tempfile.TemporaryDirectory() as tmpdirname:
97
tarball_path = os.path.join(tmpdirname, f"{file}.tar.gz")
98
99
response = requests.get(url, stream=True)
100
with open(tarball_path, "wb") as f:
101
for chunk in response.iter_content(chunk_size=8192):
102
f.write(chunk)
103
104
with tarfile.open(tarball_path, "r:gz") as tar:
105
tar.extractall(tmpdirname)
106
107
shutil.move(
108
os.path.join(tmpdirname, file), os.path.join(PATH, "syzygy")
109
)
110
111
112
class OrderedClassMembers(type):
113
@classmethod
114
def __prepare__(self, name, bases):
115
return collections.OrderedDict()
116
117
def __new__(self, name, bases, classdict):
118
classdict["__ordered__"] = [
119
key for key in classdict.keys() if key not in ("__module__", "__qualname__")
120
]
121
return type.__new__(self, name, bases, classdict)
122
123
124
class TimeoutException(Exception):
125
def __init__(self, message: str, timeout: int):
126
self.message = message
127
self.timeout = timeout
128
129
130
def timeout_decorator(timeout: float):
131
def decorator(func):
132
@wraps(func)
133
def wrapper(*args, **kwargs):
134
with concurrent.futures.ThreadPoolExecutor() as executor:
135
future = executor.submit(func, *args, **kwargs)
136
try:
137
result = future.result(timeout=timeout)
138
except concurrent.futures.TimeoutError:
139
raise TimeoutException(
140
f"Function {func.__name__} timed out after {timeout} seconds",
141
timeout,
142
)
143
return result
144
145
return wrapper
146
147
return decorator
148
149
150
class MiniTestFramework:
151
def __init__(self):
152
self.passed_test_suites = 0
153
self.failed_test_suites = 0
154
self.passed_tests = 0
155
self.failed_tests = 0
156
self.stop_on_failure = True
157
158
def has_failed(self) -> bool:
159
return self.failed_test_suites > 0
160
161
def run(self, classes: List[type]) -> bool:
162
self.start_time = time.time()
163
164
for test_class in classes:
165
with tempfile.TemporaryDirectory() as tmpdirname:
166
original_cwd = os.getcwd()
167
os.chdir(tmpdirname)
168
169
try:
170
if self.__run(test_class):
171
self.failed_test_suites += 1
172
else:
173
self.passed_test_suites += 1
174
except Exception as e:
175
self.failed_test_suites += 1
176
print(f"\n{RED_COLOR}Error: {e}{RESET_COLOR}")
177
finally:
178
os.chdir(original_cwd)
179
180
self.__print_summary(round(time.time() - self.start_time, 2))
181
return self.has_failed()
182
183
def __run(self, test_class) -> bool:
184
test_instance = test_class()
185
test_name = test_instance.__class__.__name__
186
test_methods = [m for m in test_instance.__ordered__ if m.startswith("test_")]
187
188
print(f"\nTest Suite: {test_name}")
189
190
if hasattr(test_instance, "beforeAll"):
191
test_instance.beforeAll()
192
193
fails = 0
194
195
for method in test_methods:
196
fails += self.__run_test_method(test_instance, method)
197
198
if hasattr(test_instance, "afterAll"):
199
test_instance.afterAll()
200
201
self.failed_tests += fails
202
203
return fails > 0
204
205
def __run_test_method(self, test_instance, method: str) -> int:
206
print(f" Running {method}... \r", end="", flush=True)
207
208
buffer = io.StringIO()
209
fails = 0
210
211
try:
212
t0 = time.time()
213
214
with redirect_stdout(buffer):
215
if hasattr(test_instance, "beforeEach"):
216
test_instance.beforeEach()
217
218
getattr(test_instance, method)()
219
220
if hasattr(test_instance, "afterEach"):
221
test_instance.afterEach()
222
223
duration = time.time() - t0
224
225
self.print_success(f" {method} ({duration * 1000:.2f}ms)")
226
self.passed_tests += 1
227
except Exception as e:
228
if isinstance(e, TimeoutException):
229
self.print_failure(
230
f" {method} (hit execution limit of {e.timeout} seconds)"
231
)
232
233
if isinstance(e, AssertionError):
234
self.__handle_assertion_error(t0, method)
235
236
if self.stop_on_failure:
237
self.__print_buffer_output(buffer)
238
raise e
239
240
fails += 1
241
finally:
242
self.__print_buffer_output(buffer)
243
244
return fails
245
246
def __handle_assertion_error(self, start_time, method: str):
247
duration = time.time() - start_time
248
self.print_failure(f" {method} ({duration * 1000:.2f}ms)")
249
traceback_output = "".join(traceback.format_tb(sys.exc_info()[2]))
250
251
colored_traceback = "\n".join(
252
f" {CYAN_COLOR}{line}{RESET_COLOR}"
253
for line in traceback_output.splitlines()
254
)
255
256
print(colored_traceback)
257
258
def __print_buffer_output(self, buffer: io.StringIO):
259
output = buffer.getvalue()
260
if output:
261
indented_output = "\n".join(f" {line}" for line in output.splitlines())
262
print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}")
263
print(f"{GRAY_COLOR}{indented_output}{RESET_COLOR}")
264
print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}")
265
266
def __print_summary(self, duration: float):
267
print(f"\n{WHITE_BOLD}Test Summary{RESET_COLOR}\n")
268
print(
269
f" Test Suites: {GREEN_COLOR}{self.passed_test_suites} passed{RESET_COLOR}, {RED_COLOR}{self.failed_test_suites} failed{RESET_COLOR}, {self.passed_test_suites + self.failed_test_suites} total"
270
)
271
print(
272
f" Tests: {GREEN_COLOR}{self.passed_tests} passed{RESET_COLOR}, {RED_COLOR}{self.failed_tests} failed{RESET_COLOR}, {self.passed_tests + self.failed_tests} total"
273
)
274
print(f" Time: {duration}s\n")
275
276
def print_failure(self, add: str):
277
print(f" {RED_COLOR}{RESET_COLOR}{add}", flush=True)
278
279
def print_success(self, add: str):
280
print(f" {GREEN_COLOR}{RESET_COLOR}{add}", flush=True)
281
282
283
class Stockfish:
284
def __init__(
285
self,
286
prefix: List[str],
287
path: str,
288
args: List[str] = [],
289
cli: bool = False,
290
):
291
self.path = path
292
self.process = None
293
self.args = args
294
self.cli = cli
295
self.prefix = prefix
296
self.output = []
297
298
self.start()
299
300
def _check_process_alive(self):
301
if not self.process or self.process.poll() is not None:
302
print("\n".join(self.output))
303
raise RuntimeError("Stockfish process has terminated")
304
305
def start(self):
306
if self.cli:
307
self.process = subprocess.run(
308
self.prefix + [self.path] + self.args,
309
capture_output=True,
310
text=True,
311
)
312
313
if self.process.returncode != 0:
314
print(self.process.stdout)
315
print(self.process.stderr)
316
print(f"Process failed with return code {self.process.returncode}")
317
318
return
319
320
self.process = subprocess.Popen(
321
self.prefix + [self.path] + self.args,
322
stdin=subprocess.PIPE,
323
stdout=subprocess.PIPE,
324
stderr=subprocess.STDOUT,
325
universal_newlines=True,
326
bufsize=1,
327
)
328
329
def setoption(self, name: str, value: str):
330
self.send_command(f"setoption name {name} value {value}")
331
332
def send_command(self, command: str):
333
if not self.process:
334
raise RuntimeError("Stockfish process is not started")
335
336
self._check_process_alive()
337
338
self.process.stdin.write(command + "\n")
339
self.process.stdin.flush()
340
341
@timeout_decorator(MAX_TIMEOUT)
342
def equals(self, expected_output: str):
343
for line in self.readline():
344
if line == expected_output:
345
return
346
347
@timeout_decorator(MAX_TIMEOUT)
348
def expect(self, expected_output: str):
349
for line in self.readline():
350
if fnmatch.fnmatch(line, expected_output):
351
return
352
353
@timeout_decorator(MAX_TIMEOUT)
354
def contains(self, expected_output: str):
355
for line in self.readline():
356
if expected_output in line:
357
return
358
359
@timeout_decorator(MAX_TIMEOUT)
360
def starts_with(self, expected_output: str):
361
for line in self.readline():
362
if line.startswith(expected_output):
363
return
364
365
@timeout_decorator(MAX_TIMEOUT)
366
def check_output(self, callback):
367
if not callback:
368
raise ValueError("Callback function is required")
369
370
for line in self.readline():
371
if callback(line) == True:
372
return
373
374
def readline(self):
375
if not self.process:
376
raise RuntimeError("Stockfish process is not started")
377
378
while True:
379
self._check_process_alive()
380
line = self.process.stdout.readline().strip()
381
self.output.append(line)
382
383
yield line
384
385
def clear_output(self):
386
self.output = []
387
388
def get_output(self) -> List[str]:
389
return self.output
390
391
def quit(self):
392
self.send_command("quit")
393
394
def close(self):
395
if self.process:
396
self.process.stdin.close()
397
self.process.stdout.close()
398
return self.process.wait()
399
400
return 0
401
402