Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagecell
Path: blob/master/contrib/vm/compute_node/root/healthcheck.py
1075 views
1
#!/usr/bin/env python3
2
3
from datetime import datetime
4
import os
5
import pwd
6
import random
7
import re
8
import shlex
9
import shutil
10
import subprocess
11
import sys
12
import time
13
14
import requests
15
16
17
RETRIES = 3
18
PROBE_TIMEOUT_SECONDS = 15
19
RETRY_SLEEP_SECONDS = 0.5
20
OOM_LOOKBACK_MINUTES = 5
21
CONTEXT_KEEP_LINES = 720
22
CONTEXT_LINES_ON_FAILURE = 3
23
24
SERVICE_NAME = "sagecell"
25
CONTEXT_LOG = "/root/healthcheck-context.log"
26
EVENT_LOG = "/root/healthcheck.log"
27
WORKER_USER = "{worker}"
28
29
OOM_PATTERNS = (
30
"Out of memory",
31
"Killed process",
32
"oom-kill",
33
"OOM killer",
34
"Memory cgroup out of memory",
35
"oom_reaper",
36
)
37
OOM_GREP = "|".join(re.escape(pattern) for pattern in OOM_PATTERNS)
38
KERNELS_LOOKBACK_MINUTES = 3
39
40
41
def timestamp():
42
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
43
44
45
def log_message(message):
46
with open(EVENT_LOG, "a", encoding="utf-8") as f:
47
f.write(f"{timestamp()} {message}\n")
48
49
50
def log_status_line(line):
51
with open(EVENT_LOG, "a", encoding="utf-8") as f:
52
f.write(line + "\n")
53
54
55
def update_context_log(line):
56
lines = []
57
if os.path.exists(CONTEXT_LOG):
58
with open(CONTEXT_LOG, encoding="utf-8") as f:
59
lines = f.read().splitlines()
60
previous = lines[-CONTEXT_LINES_ON_FAILURE:]
61
lines = (lines + [line])[-CONTEXT_KEEP_LINES:]
62
with open(CONTEXT_LOG, "w", encoding="utf-8") as f:
63
f.write("\n".join(lines) + "\n")
64
return previous
65
66
67
def try_output(command):
68
"""Return command output; treat errors as empty output."""
69
try:
70
return subprocess.check_output(
71
shlex.split(command),
72
text=True,
73
stderr=subprocess.DEVNULL,
74
)
75
except (FileNotFoundError, subprocess.CalledProcessError):
76
return ""
77
78
79
def human_memory_from_kib(value_kib):
80
value = float(value_kib) * 1024
81
for unit in ("B", "K", "M", "G", "T", "P"):
82
if value < 1024 or unit == "P":
83
if unit in ("B", "K"):
84
return f"{int(round(value)):4d}{unit}"
85
return f"{value:4.1f}{unit}"
86
value /= 1024
87
return f"{value:4.1f}P"
88
89
90
def memory_topic():
91
with open("/proc/meminfo", encoding="utf-8") as f:
92
meminfo = {}
93
for line in f:
94
key, value = line.split(":", 1)
95
meminfo[key] = int(value.strip().split()[0])
96
used_kib = meminfo["MemTotal"] - meminfo["MemAvailable"]
97
return f"M:{human_memory_from_kib(used_kib)}"
98
99
100
def oom_topic():
101
try:
102
worker_uid = str(pwd.getpwnam(WORKER_USER).pw_uid)
103
except KeyError:
104
return "OOM:?/?"
105
worker_patterns = (
106
WORKER_USER,
107
f" uid {worker_uid}",
108
f" uid={worker_uid}",
109
f" UID: {worker_uid}",
110
f" user {worker_uid}",
111
)
112
output = try_output(
113
f"journalctl -k --since '{OOM_LOOKBACK_MINUTES} minutes ago' --no-pager -o cat --grep '{OOM_GREP}'"
114
)
115
total = 0
116
worker_events = 0
117
for line in output.splitlines():
118
if not any(pattern in line for pattern in OOM_PATTERNS):
119
continue
120
total += 1
121
if any(pattern in line for pattern in worker_patterns):
122
worker_events += 1
123
return f"OOM:{total}/{worker_events}"
124
125
126
def format_float(value, width, decimals):
127
if value is None:
128
return "?" * width
129
return f"{value:{width}.{decimals}f}"
130
131
132
def format_integer(value, width):
133
if value is None:
134
return "?" * width
135
return f"{int(round(value)):>{width}d}"
136
137
138
def kernels_topic():
139
output = try_output(
140
f"journalctl --since '{KERNELS_LOOKBACK_MINUTES} minutes ago' --no-pager -o cat --grep 'tracking [0-9]+ kernels' -n 1"
141
)
142
match = re.search(r"tracking ([0-9]+) kernels", output)
143
tracking = format_integer(int(match.group(1)), 2) if match else "??"
144
145
output = try_output(
146
f"journalctl --since '{KERNELS_LOOKBACK_MINUTES} minutes ago' --no-pager -o cat --grep '[0-9]+ preforked kernels left' -n 1"
147
)
148
match = re.search(r"([0-9]+) preforked kernels left", output)
149
preforked = format_integer(int(match.group(1)), 2) if match else "??"
150
151
return f"K:{tracking}/{preforked}"
152
153
154
def disk_topics():
155
if shutil.which("iostat") is None:
156
return ["iostat not installed"]
157
output = try_output("iostat -dxy 1 1")
158
lines = output.splitlines()
159
header = None
160
devices = []
161
for raw_line in lines:
162
line = raw_line.strip()
163
if not line:
164
continue
165
columns = line.split()
166
if columns[0] == "Device":
167
header = columns
168
continue
169
if header is None:
170
continue
171
if columns[0].startswith(("loop", "ram", "sr")):
172
continue
173
if len(columns) != len(header):
174
continue
175
row = dict(zip(header, columns))
176
devices.append(row)
177
if not devices:
178
return ["error calling iostat"]
179
180
def parse_metric(row, name):
181
try:
182
return float(row[name])
183
except (KeyError, ValueError):
184
return None
185
186
topics = []
187
for row in sorted(devices, key=lambda device: device["Device"]):
188
util = parse_metric(row, "%util")
189
aqu = parse_metric(row, "aqu-sz")
190
r_await = parse_metric(row, "r_await")
191
w_await = parse_metric(row, "w_await")
192
topics.append(
193
"{}:{}% {}q {}r {}w".format(
194
row["Device"],
195
format_integer(util, 3),
196
format_float(aqu, 3, 1),
197
format_integer(r_await, 2),
198
format_integer(w_await, 2),
199
)
200
)
201
return topics
202
203
204
def status_line():
205
"""Return a compact one-line health snapshot.
206
207
Example:
208
2026-04-01 03:39:25.925167 K: 2/ 9 L:1.06 0.74 0.84 M: 1.4G OOM:0/0 sda: 1%% 0.1q 6r 13w sdb: 3%% 0.3q 2r 2w
209
210
Topics:
211
timestamp current local time
212
K tracked kernels / preforked kernels left
213
L 1/5/15 minute load averages
214
M used RAM
215
OOM all recent OOM events / recent worker-related OOM events
216
sda, sdb per-disk util%%, queue size, read-await ms, write-await ms
217
"""
218
load = os.getloadavg()
219
topics = [
220
timestamp(),
221
kernels_topic(),
222
"L:{} {} {}".format(
223
format_float(load[0], 4, 2),
224
format_float(load[1], 4, 2),
225
format_float(load[2], 4, 2),
226
),
227
memory_topic(),
228
oom_topic(),
229
]
230
topics.extend(disk_topics())
231
return " ".join(topics)
232
233
234
def run_probe(base_url):
235
a = random.randint(-2**31, 2**31)
236
b = random.randint(-2**31, 2**31)
237
# The handling of temporary files in Sage 9.7 does not allow SageMathCell to
238
# function properly if there are no regular requests producing temporary
239
# files. To fight it, we'll generate one during health checks. See
240
# https://groups.google.com/g/sage-devel/c/jpwUb8OCVVc/m/R4r5bnOkBQAJ
241
code = "show(plot(sin)); print({} + {})".format(a, b)
242
try:
243
response = requests.post(
244
base_url + "/service",
245
data={"code": code, "accepted_tos": "true"},
246
timeout=PROBE_TIMEOUT_SECONDS,
247
)
248
response.raise_for_status()
249
reply = response.json()
250
# Every few hours we have a request that comes back as executed, but the
251
# stdout is not in the dictionary. It seems that the compute message
252
# never actually gets sent to the kernel and it appears the problem is
253
# in the zmq connection between the webserver and the kernel.
254
#
255
# Also sometimes reply is unsuccessful, yet the server keeps running
256
# and other requests are serviced. Since a restart breaks all active
257
# interacts, better not to restart the server that "mostly works" and
258
# instead we'll just accumulate statistics on these random errors to
259
# help resolve them.
260
if (
261
reply["success"]
262
and "stdout" in reply
263
and int(reply["stdout"].strip()) == a + b
264
):
265
return True, None
266
return False, str(reply)
267
except Exception as exc: # pylint: disable=broad-except
268
return False, str(exc)
269
270
271
def main():
272
if len(sys.argv) != 2:
273
print("usage: healthcheck.py <base_url>")
274
return 2
275
276
previous_context = update_context_log(status_line())
277
278
if subprocess.call(
279
["systemctl", "--quiet", "is-active", SERVICE_NAME],
280
stdout=subprocess.DEVNULL,
281
stderr=subprocess.DEVNULL,
282
) != 0:
283
print(f"{timestamp()} Service is not active, skipping check")
284
return 0
285
286
remaining_attempts = RETRIES
287
first_failure = True
288
while remaining_attempts:
289
remaining_attempts -= 1
290
ok, reason = run_probe(sys.argv[1])
291
if ok:
292
return 0
293
294
current_line = status_line()
295
if first_failure:
296
for context_line in previous_context:
297
log_status_line(context_line)
298
first_failure = False
299
log_status_line(current_line)
300
log_message(f"healthcheck failed, {remaining_attempts} attempts left: {reason}")
301
302
if remaining_attempts:
303
time.sleep(RETRY_SLEEP_SECONDS)
304
305
log_message(f"restarting {SERVICE_NAME}")
306
try:
307
subprocess.check_call(["systemctl", "restart", SERVICE_NAME])
308
log_message("restart succeeded")
309
except (FileNotFoundError, subprocess.CalledProcessError):
310
log_status_line(status_line())
311
log_message("restart failed")
312
return 1
313
314
315
if __name__ == "__main__":
316
sys.exit(main())
317
318