Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagecell
Path: blob/master/tests/trusted_kernel_manager_tests.py
447 views
1
import trusted_kernel_manager
2
from misc import assert_is, assert_equal, assert_in, assert_not_in, assert_raises, assert_regexp_matches, assert_is_instance, assert_is_not_none, assert_greater, assert_len, assert_uuid, capture_output, Config
3
import random
4
import os
5
import sys
6
import ast
7
import zmq
8
from IPython.zmq.session import Session
9
from contextlib import contextmanager
10
import time
11
import re
12
import config_default as conf
13
sage = conf.sage
14
configg = Config()
15
d = configg.get_default_config("_default_config")
16
17
from IPython.testing.decorators import skip
18
19
def test_init():
20
tmkm = trusted_kernel_manager.TrustedMultiKernelManager()
21
assert_len(tmkm._kernels.keys(), 0)
22
assert_len(tmkm._comps.keys(), 0)
23
assert_len(tmkm._clients.keys(), 0)
24
assert_is(hasattr(tmkm, "context"), True)
25
26
def test_init_parameters():
27
tmkm = trusted_kernel_manager.TrustedMultiKernelManager(default_computer_config = {"a": "b"}, kernel_timeout = 3.14)
28
assert_equal(tmkm.default_computer_config, {"a": "b"})
29
assert_equal(tmkm.kernel_timeout, 3.14)
30
31
32
class TestTrustedMultiKernelManager(object):
33
executing_re = re.compile(r'executing .* -python .*receiver\.py.*/dev/null')
34
default_comp_config = {"host": "localhost",
35
"username": None,
36
"python": sage + " -python",
37
"location": os.getcwd(),
38
"log_file": 'test.log',
39
"max": 15}
40
41
def setUp(self): #called automatically before each test is run
42
self.a = trusted_kernel_manager.TrustedMultiKernelManager(default_computer_config = d)
43
44
def _populate_comps_kernels(self):
45
self.a._comps["testcomp1"] = {"host": "localhost",
46
"port": random.randrange(50000,60000),
47
"kernels": {"kone": None, "ktwo": None},
48
"max_kernels": 10,
49
"beat_interval": 3.0,
50
"first_beat": 5.0}
51
self.a._comps["testcomp2"] = {"host": "localhost",
52
"port": random.randrange(50000,60000),
53
"kernels": {"kthree": None},
54
"max_kernels": 15,
55
"beat_interval": 2.0,
56
"first_beat": 4.0}
57
self.a._kernels["kone"] = {"comp_id": "testcomp1", "ports": {"hb_port": 50001, "iopub_port": 50002, "shell_port": 50003, "stdin_port": 50004}}
58
self.a._kernels["ktwo"] = {"comp_id": "testcomp1", "ports": {"hb_port": 50005, "iopub_port": 50006, "shell_port": 50007, "stdin_port": 50008}}
59
self.a._kernels["kthree"] = {"comp_id": "testcomp2", "ports": {"hb_port": 50009, "iopub_port": 50010, "shell_port": 50011, "stdin_port": 50012}}
60
61
def tearDown(self):
62
for i in list(self.a._comps):
63
try:
64
self.a.remove_computer(i)
65
except:
66
pass
67
68
69
def test_get_kernel_ids_success(self):
70
self._populate_comps_kernels()
71
x = self.a.get_kernel_ids("testcomp1")
72
y = self.a.get_kernel_ids("testcomp2")
73
assert_len(x, 2)
74
assert_len(y, 1)
75
assert_in("kone", x)
76
assert_in("ktwo", x)
77
assert_not_in("kthree", x)
78
assert_in("kthree", y)
79
80
81
def test_get_kernel_ids_invalid_comp(self):
82
self._populate_comps_kernels()
83
x = self.a.get_kernel_ids("testcomp3")
84
assert_len(x, 0)
85
86
87
def test_get_kernel_ids_no_args(self):
88
self._populate_comps_kernels()
89
self.a._kernels = {"a": None, "b": None, "c": None}
90
x = self.a.get_kernel_ids()
91
assert_len(x, 3)
92
93
94
def test_get_hb_info_success(self):
95
self._populate_comps_kernels()
96
(b, c) = self.a.get_hb_info("kone")
97
assert_equal(b, 3.0)
98
assert_equal(c, 5.0)
99
(b, c) = self.a.get_hb_info("kthree")
100
assert_equal(b, 2.0)
101
assert_equal(c, 4.0)
102
103
104
def test_get_hb_info_invalid_kernel_id(self):
105
self._populate_comps_kernels()
106
assert_raises(KeyError, self.a.get_hb_info, "blah")
107
108
109
def test_ssh_untrusted(self):
110
client = self.a._setup_ssh_connection(self.default_comp_config["host"], self.default_comp_config["username"])
111
with capture_output() as (out, err):
112
x = self.a._ssh_untrusted(self.default_comp_config, client)
113
out = out[0]
114
assert_is_not_none(x)
115
assert_is_instance(x,int)
116
# TODO: check to make sure the int returned is a valid port
117
#assert_regexp_matches(out, self.executing_re)
118
119
def test_add_computer_success(self): # depends on _setup_ssh_connection, _ssh_untrusted
120
new_config = self.default_comp_config.copy()
121
new_config.update({'beat_interval': 0.5, 'first_beat': 1, 'kernels': {}})
122
123
with capture_output(split=True) as (out,err):
124
x = self.a.add_computer(self.default_comp_config)
125
assert_is_not_none(x)
126
assert_uuid(x)
127
assert_in(x, self.a._comps)
128
for k in new_config:
129
assert_equal(self.a._comps[x][k], new_config[k], "config value %s (%s) does not agree (should be %s)"%(k,self.a._comps[x][k], new_config[k]))
130
131
assert_in("ssh", self.a._clients[x])
132
133
#assert_regexp_matches(out[0], self.executing_re)
134
assert_regexp_matches(out[0], r'ZMQ Connection with computer [a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12} at port \d+ established')
135
136
def test_setup_ssh_connection_success(self):
137
x = self.a._setup_ssh_connection("localhost", username=None)
138
assert_in("AutoAddPolicy", str(x._policy))
139
assert_len(x.get_host_keys(), 1)
140
141
def _check_all_kernels_killed_out(self, out):
142
expected_out = {'content': {'status': 'All kernels killed!'}, 'type': 'success'}
143
outdict = ast.literal_eval(out)
144
assert_equal(outdict, expected_out)
145
146
def test_purge_kernels_no_kernels(self): # depends on add_computer
147
x = self.a.add_computer(self.default_comp_config)
148
149
with capture_output() as (out, err):
150
self.a.purge_kernels(x)
151
out = out[0]
152
153
self._check_all_kernels_killed_out(out)
154
assert_equal(self.a._comps[x]["kernels"], {})
155
assert_equal(self.a._kernels, {})
156
157
def test_purge_kernels_success(self): # depends on add_computer, new_session
158
x = self.a.add_computer(self.default_comp_config)
159
y = self.a.new_session()
160
z = self.a.new_session()
161
162
with capture_output() as (out, err):
163
self.a.purge_kernels(x)
164
out = out[0]
165
self._check_all_kernels_killed_out(out)
166
assert_equal(self.a._comps[x]["kernels"], {})
167
assert_equal(self.a._kernels, {})
168
169
def test_remove_computer_success(self): # depends on add_computer, new_session
170
x = self.a.add_computer(self.default_comp_config)
171
kern1 = self.a.new_session()
172
kern2 = self.a.new_session()
173
b = self.a.add_computer(self.default_comp_config)
174
175
# remove computer with active kernels
176
with capture_output() as (out, err):
177
self.a.remove_computer(x)
178
out = out[0]
179
self._check_all_kernels_killed_out(out)
180
assert_not_in(kern1, self.a._kernels)
181
assert_not_in(kern2, self.a._kernels)
182
assert_not_in(x, self.a._comps)
183
184
# remove computer with no kernels
185
with capture_output() as (out, err):
186
self.a.remove_computer(b)
187
out = out[0]
188
self._check_all_kernels_killed_out(out)
189
assert_not_in(b, self.a._comps)
190
191
def test_restart_kernel_success(self): # depends on add_computer, new_session
192
x = self.a.add_computer(self.default_comp_config)
193
kern1 = self.a.new_session()
194
kern2 = self.a.new_session()
195
196
with capture_output() as (out, err):
197
self.a.restart_kernel(kern2)
198
out = out[0]
199
200
y = ast.literal_eval(out)
201
assert_is_instance(y, dict)
202
assert_in("content", y)
203
assert_len(y["content"], 2)
204
assert_in("type", y)
205
assert_equal(y["type"], "success")
206
assert_in("kernel_id", y["content"])
207
assert_uuid(y["content"]["kernel_id"])
208
assert_in("connection", y["content"])
209
assert_len(y["content"]["connection"], 6)
210
for s in ("stdin_port", "hb_port", "shell_port", "iopub_port"):
211
assert_in(s, y["content"]["connection"])
212
assert_len(str(y["content"]["connection"][s]), 5)
213
assert_in("ip", y["content"]["connection"])
214
assert_equal(y["content"]["connection"]["ip"], "127.0.0.1")
215
assert_in("key", y["content"]["connection"])
216
assert_uuid(y["content"]["connection"]["key"])
217
218
def test_interrupt_kernel_success(self): # depends on add_computer, new_session
219
x = self.a.add_computer(self.default_comp_config)
220
kern1 = self.a.new_session()
221
222
reply = self.a.interrupt_kernel(kern1)
223
224
assert_equal(reply["type"], "success")
225
226
def test_new_session_success(self): # depends on add_computer
227
x = self.a.add_computer(self.default_comp_config)
228
229
with capture_output() as (out, err):
230
kern1 = self.a.new_session()
231
out = out[0]
232
233
assert_in(kern1, self.a._kernels)
234
assert_in("comp_id", self.a._kernels[kern1])
235
assert_uuid(self.a._kernels[kern1]["comp_id"])
236
assert_in("connection", self.a._kernels[kern1])
237
assert_in("executing", self.a._kernels[kern1])
238
assert_is(self.a._kernels[kern1]["executing"], False)
239
assert_in("timeout", self.a._kernels[kern1])
240
assert_greater(time.time(), self.a._kernels[kern1]["timeout"])
241
x = self.a._kernels[kern1]["comp_id"]
242
assert_in(kern1, self.a._comps[x]["kernels"])
243
assert_is_instance(self.a._sessions[kern1], Session)
244
#assert_in("CONNECTION FILE ::: ", out)
245
246
def test_end_session_success(self): # depends on add_computer, new_session
247
x = self.a.add_computer(self.default_comp_config)
248
kern1 = self.a.new_session()
249
kern2 = self.a.new_session()
250
with capture_output(split=True) as (out,err):
251
self.a.end_session(kern1)
252
253
assert_not_in(kern1, self.a._kernels.keys())
254
for v in self.a._comps.values():
255
assert_not_in(kern1, v["kernels"])
256
#assert_in("Killing Kernel ::: %s at "%kern1, out[0])
257
#assert_in("Kernel %s successfully killed."%kern1, out[1])
258
with capture_output(split=True) as (out,err):
259
self.a.end_session(kern2)
260
261
assert_not_in(kern2, self.a._kernels)
262
for v in self.a._comps.values():
263
assert_not_in(kern2, v["kernels"])
264
265
#assert_in("Killing Kernel ::: %s at "%kern2, out[0])
266
#assert_in("Kernel %s successfully killed."%kern2, out[1])
267
268
def test_find_open_computer_success(self):
269
self.a._comps["testcomp1"] = {"max_kernels": 3, "kernels": {}}
270
self.a._comps["testcomp2"] = {"max_kernels": 5, "kernels": {}}
271
272
for i in range(8):
273
y = self.a._find_open_computer()
274
assert_equal(y == "testcomp1" or y == "testcomp2", True)
275
self.a._comps[y]["max_kernels"] -= 1
276
277
try:
278
self.a._find_open_computer()
279
except IOError as e:
280
assert_equal("Could not find open computer. There are 2 computers available.", e.message)
281
282
def test_create_connected_stream(self):
283
host="localhost"
284
port = 51337
285
socket_type = zmq.SUB
286
287
with capture_output() as (out, err):
288
ret = self.a._create_connected_stream(host, port, socket_type)
289
out = out[0]
290
291
assert_is(ret.closed(), False)
292
assert_equal(ret.socket.socket_type, zmq.SUB)
293
#assert_equal(out, "Connecting to: tcp://%s:%i\n" % (host, port))
294
295
host="localhost"
296
port = 51337
297
socket_type = zmq.REQ
298
299
ret = self.a._create_connected_stream(host, port, socket_type)
300
301
assert_is(ret.closed(), False)
302
assert_equal(ret.socket.socket_type, zmq.REQ)
303
304
host="localhost"
305
port = 51337
306
socket_type = zmq.DEALER
307
308
ret = self.a._create_connected_stream(host, port, socket_type)
309
310
assert_is(ret.closed(), False)
311
assert_equal(ret.socket.socket_type, zmq.DEALER)
312
313
def test_create_iopub_stream(self): # depends on create_connected_stream
314
kernel_id = "kern1"
315
comp_id = "testcomp1"
316
self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "iopub_port": 50101}}
317
self.a._comps[comp_id] = {"host": "localhost"}
318
319
ret = self.a.create_iopub_stream(kernel_id)
320
321
assert_is(ret.closed(), False)
322
assert_equal(ret.socket.socket_type, zmq.SUB)
323
324
325
def test_create_shell_stream(self): # depends on create_connected_stream
326
kernel_id = "kern1"
327
comp_id = "testcomp1"
328
self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "shell_port": 50101}}
329
self.a._comps[comp_id] = {"host": "localhost"}
330
331
ret = self.a.create_shell_stream(kernel_id)
332
333
assert_is(ret.closed(), False)
334
assert_equal(ret.socket.socket_type, zmq.DEALER)
335
336
337
def test_create_hb_stream(self): # depends on create_connected_stream
338
kernel_id = "kern1"
339
comp_id = "testcomp1"
340
self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "hb_port": 50101}}
341
self.a._comps[comp_id] = {"host": "localhost"}
342
343
ret = self.a.create_hb_stream(kernel_id)
344
assert_is(ret.closed(), False)
345
assert_equal(ret.socket.socket_type, zmq.REQ)
346
347
348