Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Roblox
GitHub Repository: Roblox/luau
Path: blob/master/tools/fuzz/fuzzer-postprocess.py
2725 views
1
#!/usr/bin/python3
2
# This file is part of the Luau programming language and is licensed under MIT License; see LICENSE.txt for details
3
4
import argparse
5
import jinja2
6
import multiprocessing
7
import os
8
import shutil
9
import subprocess
10
import sys
11
12
13
def is_crash(reproducer_name: str) -> bool:
14
return reproducer_name.startswith("crash-") or reproducer_name.startswith("oom-")
15
16
17
class CrashReport:
18
def __init__(self, args, crash_id):
19
self.id = crash_id
20
self.args = args
21
self.crash_root = os.path.join(args.output_directory, crash_id)
22
23
def trace(self) -> str:
24
trace_path = os.path.join(self.crash_root, "trace.txt")
25
26
if os.path.exists(trace_path):
27
with open(os.path.join(self.crash_root, "trace.txt"), "r") as trace_file:
28
return trace_file.read()
29
else:
30
return None
31
32
def modules(self) -> str:
33
with open(os.path.join(self.crash_root, "modules.txt"), "r") as modules_file:
34
return modules_file.read()
35
36
def artifact_link(self) -> str:
37
return f"{self.args.artifact_root}/{self.id}/minimized_reproducer"
38
39
40
class MetaValue:
41
def __init__(self, name, value):
42
self.name = name
43
self.value = value
44
self.link = None
45
46
47
def minimize_crash(args, reproducer, workdir):
48
if not is_crash(os.path.basename(reproducer)):
49
# Not actually a crash, so no minimization is actually possible.
50
return
51
52
print(
53
f"Minimizing reproducer {os.path.basename(reproducer)} for {args.minimize_for} seconds.")
54
55
reproducer_absolute = os.path.abspath(reproducer)
56
57
artifact = os.path.join(workdir, "minimized_reproducer")
58
minimize_result = subprocess.run([args.executable, "-detect_leaks=0", "-minimize_crash=1",
59
f"-exact_artifact_path={artifact}", f"-max_total_time={args.minimize_for}", reproducer_absolute], cwd=workdir, stdout=sys.stdout if args.verbose else subprocess.DEVNULL, stderr=sys.stderr if args.verbose else subprocess.DEVNULL)
60
61
if minimize_result.returncode != 0:
62
print(
63
f"Minimize process exited with code {minimize_result.returncode}; minimization failed.")
64
return
65
66
if os.path.exists(artifact):
67
print(
68
f"Minimized {os.path.basename(reproducer)} from {os.path.getsize(reproducer)} bytes to {os.path.getsize(artifact)}.")
69
70
71
def process_crash(args, reproducer):
72
crash_id = os.path.basename(reproducer)
73
crash_output = os.path.join(args.output_directory, crash_id)
74
print(f"Processing reproducer {crash_id}.")
75
76
print(f"Output will be stored in {crash_output}.")
77
if os.path.exists(crash_output):
78
print(f"Contents of {crash_output} will be discarded.")
79
shutil.rmtree(crash_output, ignore_errors=True)
80
81
os.makedirs(crash_output)
82
shutil.copyfile(reproducer, os.path.join(crash_output, "original_reproducer"))
83
shutil.copyfile(reproducer, os.path.join(
84
crash_output, "minimized_reproducer"))
85
86
minimize_crash(args, reproducer, crash_output)
87
88
if is_crash(crash_id):
89
trace_result = subprocess.run([args.executable, os.path.join(
90
crash_output, "minimized_reproducer")], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
91
trace_text = trace_result.stdout
92
93
with open(os.path.join(crash_output, "trace.txt"), "w") as trace_file:
94
trace_file.write(trace_text)
95
96
modules_result = subprocess.run([args.prototest, os.path.join(
97
crash_output, "minimized_reproducer"), "-detect_leaks=0", "-verbosity=0"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
98
modules_text = modules_result.stdout
99
100
module_index_of = modules_text.index("Module")
101
modules_text = modules_text[module_index_of:]
102
103
with open(os.path.join(crash_output, "modules.txt"), "w") as modules_file:
104
modules_file.write(modules_text)
105
106
return CrashReport(args, crash_id)
107
108
109
def process_crashes(args):
110
crash_names = sorted(os.listdir(args.source_directory))
111
with multiprocessing.Pool(args.workers) as pool:
112
crashes = [(args, os.path.join(args.source_directory, c)) for c in crash_names]
113
crashes = pool.starmap(process_crash, crashes)
114
print(f"Processed {len(crashes)} crashes.")
115
return crashes
116
117
118
def generate_report(crashes, meta):
119
env = jinja2.Environment(
120
loader=jinja2.PackageLoader("fuzzer-postprocess"),
121
autoescape=jinja2.select_autoescape()
122
)
123
124
template = env.get_template("index.html")
125
with open("fuzz-report.html", "w") as report_file:
126
report_file.write(template.render(
127
crashes=crashes,
128
meta=meta,
129
))
130
131
132
def __main__():
133
parser = argparse.ArgumentParser()
134
parser.add_argument("--source_directory", required=True)
135
parser.add_argument("--output_directory", required=True)
136
parser.add_argument("--executable", required=True)
137
parser.add_argument("--prototest", required=True)
138
parser.add_argument("--minimize_for", required=True)
139
parser.add_argument("--artifact_root", required=True)
140
parser.add_argument("--verbose", "-v", action="store_true")
141
parser.add_argument("--workers", action="store", type=int, default=4)
142
meta_group = parser.add_argument_group(
143
"metadata", description="Report metadata to attach.")
144
meta_group.add_argument("--meta.values", nargs="*",
145
help="Any metadata to attach, in the form name=value. Multiple values may be specified.", dest="metadata_values", default=[])
146
meta_group.add_argument("--meta.urls", nargs="*",
147
help="URLs to attach to metadata, in the form name=url. Multiple values may be specified. A value must also be specified with --meta.values.", dest="metadata_urls", default=[])
148
args = parser.parse_args()
149
150
meta_values = dict()
151
for pair in args.metadata_values:
152
components = pair.split("=", 1)
153
name = components[0]
154
value = components[1]
155
156
meta_values[name] = MetaValue(name, value)
157
158
for pair in args.metadata_urls:
159
components = pair.split("=", 1)
160
name = components[0]
161
url = components[1]
162
163
if name in meta_values:
164
meta_values[name].link = url
165
else:
166
print(f"Metadata {name} has URL {url} but no value specified.")
167
168
meta_values = sorted(list(meta_values.values()), key=lambda x: x.name)
169
170
crashes = process_crashes(args)
171
generate_report(crashes, meta_values)
172
173
174
if __name__ == "__main__":
175
__main__()
176
177