Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
yt-project
GitHub Repository: yt-project/yt
Path: blob/main/tests/nose_runner.py
928 views
1
import multiprocessing
2
import os
3
import sys
4
5
import nose
6
import numpy
7
import yaml
8
9
from yt.config import ytcfg
10
from yt.utilities.answer_testing.framework import AnswerTesting
11
12
numpy.set_printoptions(threshold=5, edgeitems=1, precision=4)
13
14
15
class NoseWorker(multiprocessing.Process):
16
def __init__(self, task_queue, result_queue):
17
multiprocessing.Process.__init__(self)
18
self.task_queue = task_queue
19
self.result_queue = result_queue
20
21
def run(self):
22
proc_name = self.name
23
while True:
24
next_task = self.task_queue.get()
25
if next_task is None:
26
print(f"{proc_name}: Exiting")
27
self.task_queue.task_done()
28
break
29
print(f"{proc_name}: {next_task}")
30
result = next_task()
31
self.task_queue.task_done()
32
self.result_queue.put(result)
33
if next_task.exclusive:
34
print(f"{proc_name}: Exiting (exclusive)")
35
break
36
return
37
38
39
class NoseTask:
40
def __init__(self, job):
41
argv, exclusive = job
42
self.argv = argv
43
self.name = argv[0]
44
self.exclusive = exclusive
45
46
def __call__(self):
47
test_dir = ytcfg.get("yt", "test_data_dir")
48
answers_dir = os.path.join(test_dir, "answers")
49
if "--with-answer-testing" in self.argv and not os.path.isdir(
50
os.path.join(answers_dir, self.name)
51
):
52
nose.run(
53
argv=self.argv + ["--answer-store"],
54
addplugins=[AnswerTesting()],
55
exit=False,
56
)
57
if os.path.isfile(f"{self.name}.xml"):
58
os.remove(f"{self.name}.xml")
59
nose.run(argv=self.argv, addplugins=[AnswerTesting()], exit=False)
60
return ""
61
62
def __str__(self):
63
return f"WILL DO self.name = {self.name}"
64
65
66
def generate_tasks_input():
67
pyver = f"py{sys.version_info.major}{sys.version_info.minor}"
68
test_dir = ytcfg.get("yt", "test_data_dir")
69
answers_dir = os.path.join(test_dir, "answers")
70
tests = yaml.load(open("tests/tests.yaml"), Loader=yaml.FullLoader)
71
72
base_argv = ["-s", "--nologcapture", "--with-xunit"]
73
74
base_answer_argv = [
75
f"--local-dir={answers_dir}",
76
"--with-answer-testing",
77
"--answer-big-data",
78
"--local",
79
]
80
81
args = []
82
83
for test in list(tests["other_tests"].keys()):
84
args.append(([test] + base_argv + tests["other_tests"][test], True))
85
for answer in list(tests["answer_tests"].keys()):
86
if tests["answer_tests"][answer] is None:
87
continue
88
argv = [f"{pyver}_{answer}"]
89
argv += base_argv + base_answer_argv
90
argv.append(f"--answer-name={argv[0]}")
91
argv += tests["answer_tests"][answer]
92
args.append((argv, False))
93
94
exclude_answers = []
95
answer_tests = tests["answer_tests"]
96
for key in answer_tests:
97
for t in answer_tests[key]:
98
exclude_answers.append(t.replace(".py:", ".").replace("/", "."))
99
exclude_answers = [f"--exclude-test={ex}" for ex in exclude_answers]
100
101
args = [
102
(
103
(item + [f"--xunit-file={item[0]}.xml"], exclusive)
104
if item[0] != "unittests"
105
else (item + ["--xunit-file=unittests.xml"] + exclude_answers, exclusive)
106
)
107
for item, exclusive in args
108
]
109
return args
110
111
112
if __name__ == "__main__":
113
# multiprocessing.log_to_stderr(logging.DEBUG)
114
tasks = multiprocessing.JoinableQueue()
115
results = multiprocessing.Queue()
116
117
num_consumers = int(os.environ.get("NUM_WORKERS", 6))
118
consumers = [NoseWorker(tasks, results) for i in range(num_consumers)]
119
for w in consumers:
120
w.start()
121
122
num_jobs = 0
123
for job in generate_tasks_input():
124
if job[1]:
125
num_consumers -= 1 # take into account exclusive jobs
126
tasks.put(NoseTask(job))
127
num_jobs += 1
128
129
for _i in range(num_consumers):
130
tasks.put(None)
131
132
tasks.join()
133
134
while num_jobs:
135
result = results.get()
136
num_jobs -= 1
137
138