Path: blob/master/tests/trusted_kernel_manager_tests.py
447 views
import trusted_kernel_manager1from misc import assert_is, assert_equal, assert_in, assert_not_in, assert_raises, assert_regexp_matches, assert_is_instance, assert_is_not_none, assert_greater, assert_len, assert_uuid, capture_output, Config2import random3import os4import sys5import ast6import zmq7from IPython.zmq.session import Session8from contextlib import contextmanager9import time10import re11import config_default as conf12sage = conf.sage13configg = Config()14d = configg.get_default_config("_default_config")1516from IPython.testing.decorators import skip1718def test_init():19tmkm = trusted_kernel_manager.TrustedMultiKernelManager()20assert_len(tmkm._kernels.keys(), 0)21assert_len(tmkm._comps.keys(), 0)22assert_len(tmkm._clients.keys(), 0)23assert_is(hasattr(tmkm, "context"), True)2425def test_init_parameters():26tmkm = trusted_kernel_manager.TrustedMultiKernelManager(default_computer_config = {"a": "b"}, kernel_timeout = 3.14)27assert_equal(tmkm.default_computer_config, {"a": "b"})28assert_equal(tmkm.kernel_timeout, 3.14)293031class TestTrustedMultiKernelManager(object):32executing_re = re.compile(r'executing .* -python .*receiver\.py.*/dev/null')33default_comp_config = {"host": "localhost",34"username": None,35"python": sage + " -python",36"location": os.getcwd(),37"log_file": 'test.log',38"max": 15}3940def setUp(self): #called automatically before each test is run41self.a = trusted_kernel_manager.TrustedMultiKernelManager(default_computer_config = d)4243def _populate_comps_kernels(self):44self.a._comps["testcomp1"] = {"host": "localhost",45"port": random.randrange(50000,60000),46"kernels": {"kone": None, "ktwo": None},47"max_kernels": 10,48"beat_interval": 3.0,49"first_beat": 5.0}50self.a._comps["testcomp2"] = {"host": "localhost",51"port": random.randrange(50000,60000),52"kernels": {"kthree": None},53"max_kernels": 15,54"beat_interval": 2.0,55"first_beat": 4.0}56self.a._kernels["kone"] = {"comp_id": "testcomp1", "ports": {"hb_port": 50001, "iopub_port": 50002, "shell_port": 50003, "stdin_port": 50004}}57self.a._kernels["ktwo"] = {"comp_id": "testcomp1", "ports": {"hb_port": 50005, "iopub_port": 50006, "shell_port": 50007, "stdin_port": 50008}}58self.a._kernels["kthree"] = {"comp_id": "testcomp2", "ports": {"hb_port": 50009, "iopub_port": 50010, "shell_port": 50011, "stdin_port": 50012}}5960def tearDown(self):61for i in list(self.a._comps):62try:63self.a.remove_computer(i)64except:65pass666768def test_get_kernel_ids_success(self):69self._populate_comps_kernels()70x = self.a.get_kernel_ids("testcomp1")71y = self.a.get_kernel_ids("testcomp2")72assert_len(x, 2)73assert_len(y, 1)74assert_in("kone", x)75assert_in("ktwo", x)76assert_not_in("kthree", x)77assert_in("kthree", y)787980def test_get_kernel_ids_invalid_comp(self):81self._populate_comps_kernels()82x = self.a.get_kernel_ids("testcomp3")83assert_len(x, 0)848586def test_get_kernel_ids_no_args(self):87self._populate_comps_kernels()88self.a._kernels = {"a": None, "b": None, "c": None}89x = self.a.get_kernel_ids()90assert_len(x, 3)919293def test_get_hb_info_success(self):94self._populate_comps_kernels()95(b, c) = self.a.get_hb_info("kone")96assert_equal(b, 3.0)97assert_equal(c, 5.0)98(b, c) = self.a.get_hb_info("kthree")99assert_equal(b, 2.0)100assert_equal(c, 4.0)101102103def test_get_hb_info_invalid_kernel_id(self):104self._populate_comps_kernels()105assert_raises(KeyError, self.a.get_hb_info, "blah")106107108def test_ssh_untrusted(self):109client = self.a._setup_ssh_connection(self.default_comp_config["host"], self.default_comp_config["username"])110with capture_output() as (out, err):111x = self.a._ssh_untrusted(self.default_comp_config, client)112out = out[0]113assert_is_not_none(x)114assert_is_instance(x,int)115# TODO: check to make sure the int returned is a valid port116#assert_regexp_matches(out, self.executing_re)117118def test_add_computer_success(self): # depends on _setup_ssh_connection, _ssh_untrusted119new_config = self.default_comp_config.copy()120new_config.update({'beat_interval': 0.5, 'first_beat': 1, 'kernels': {}})121122with capture_output(split=True) as (out,err):123x = self.a.add_computer(self.default_comp_config)124assert_is_not_none(x)125assert_uuid(x)126assert_in(x, self.a._comps)127for k in new_config:128assert_equal(self.a._comps[x][k], new_config[k], "config value %s (%s) does not agree (should be %s)"%(k,self.a._comps[x][k], new_config[k]))129130assert_in("ssh", self.a._clients[x])131132#assert_regexp_matches(out[0], self.executing_re)133assert_regexp_matches(out[0], r'ZMQ Connection with computer [a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12} at port \d+ established')134135def test_setup_ssh_connection_success(self):136x = self.a._setup_ssh_connection("localhost", username=None)137assert_in("AutoAddPolicy", str(x._policy))138assert_len(x.get_host_keys(), 1)139140def _check_all_kernels_killed_out(self, out):141expected_out = {'content': {'status': 'All kernels killed!'}, 'type': 'success'}142outdict = ast.literal_eval(out)143assert_equal(outdict, expected_out)144145def test_purge_kernels_no_kernels(self): # depends on add_computer146x = self.a.add_computer(self.default_comp_config)147148with capture_output() as (out, err):149self.a.purge_kernels(x)150out = out[0]151152self._check_all_kernels_killed_out(out)153assert_equal(self.a._comps[x]["kernels"], {})154assert_equal(self.a._kernels, {})155156def test_purge_kernels_success(self): # depends on add_computer, new_session157x = self.a.add_computer(self.default_comp_config)158y = self.a.new_session()159z = self.a.new_session()160161with capture_output() as (out, err):162self.a.purge_kernels(x)163out = out[0]164self._check_all_kernels_killed_out(out)165assert_equal(self.a._comps[x]["kernels"], {})166assert_equal(self.a._kernels, {})167168def test_remove_computer_success(self): # depends on add_computer, new_session169x = self.a.add_computer(self.default_comp_config)170kern1 = self.a.new_session()171kern2 = self.a.new_session()172b = self.a.add_computer(self.default_comp_config)173174# remove computer with active kernels175with capture_output() as (out, err):176self.a.remove_computer(x)177out = out[0]178self._check_all_kernels_killed_out(out)179assert_not_in(kern1, self.a._kernels)180assert_not_in(kern2, self.a._kernels)181assert_not_in(x, self.a._comps)182183# remove computer with no kernels184with capture_output() as (out, err):185self.a.remove_computer(b)186out = out[0]187self._check_all_kernels_killed_out(out)188assert_not_in(b, self.a._comps)189190def test_restart_kernel_success(self): # depends on add_computer, new_session191x = self.a.add_computer(self.default_comp_config)192kern1 = self.a.new_session()193kern2 = self.a.new_session()194195with capture_output() as (out, err):196self.a.restart_kernel(kern2)197out = out[0]198199y = ast.literal_eval(out)200assert_is_instance(y, dict)201assert_in("content", y)202assert_len(y["content"], 2)203assert_in("type", y)204assert_equal(y["type"], "success")205assert_in("kernel_id", y["content"])206assert_uuid(y["content"]["kernel_id"])207assert_in("connection", y["content"])208assert_len(y["content"]["connection"], 6)209for s in ("stdin_port", "hb_port", "shell_port", "iopub_port"):210assert_in(s, y["content"]["connection"])211assert_len(str(y["content"]["connection"][s]), 5)212assert_in("ip", y["content"]["connection"])213assert_equal(y["content"]["connection"]["ip"], "127.0.0.1")214assert_in("key", y["content"]["connection"])215assert_uuid(y["content"]["connection"]["key"])216217def test_interrupt_kernel_success(self): # depends on add_computer, new_session218x = self.a.add_computer(self.default_comp_config)219kern1 = self.a.new_session()220221reply = self.a.interrupt_kernel(kern1)222223assert_equal(reply["type"], "success")224225def test_new_session_success(self): # depends on add_computer226x = self.a.add_computer(self.default_comp_config)227228with capture_output() as (out, err):229kern1 = self.a.new_session()230out = out[0]231232assert_in(kern1, self.a._kernels)233assert_in("comp_id", self.a._kernels[kern1])234assert_uuid(self.a._kernels[kern1]["comp_id"])235assert_in("connection", self.a._kernels[kern1])236assert_in("executing", self.a._kernels[kern1])237assert_is(self.a._kernels[kern1]["executing"], False)238assert_in("timeout", self.a._kernels[kern1])239assert_greater(time.time(), self.a._kernels[kern1]["timeout"])240x = self.a._kernels[kern1]["comp_id"]241assert_in(kern1, self.a._comps[x]["kernels"])242assert_is_instance(self.a._sessions[kern1], Session)243#assert_in("CONNECTION FILE ::: ", out)244245def test_end_session_success(self): # depends on add_computer, new_session246x = self.a.add_computer(self.default_comp_config)247kern1 = self.a.new_session()248kern2 = self.a.new_session()249with capture_output(split=True) as (out,err):250self.a.end_session(kern1)251252assert_not_in(kern1, self.a._kernels.keys())253for v in self.a._comps.values():254assert_not_in(kern1, v["kernels"])255#assert_in("Killing Kernel ::: %s at "%kern1, out[0])256#assert_in("Kernel %s successfully killed."%kern1, out[1])257with capture_output(split=True) as (out,err):258self.a.end_session(kern2)259260assert_not_in(kern2, self.a._kernels)261for v in self.a._comps.values():262assert_not_in(kern2, v["kernels"])263264#assert_in("Killing Kernel ::: %s at "%kern2, out[0])265#assert_in("Kernel %s successfully killed."%kern2, out[1])266267def test_find_open_computer_success(self):268self.a._comps["testcomp1"] = {"max_kernels": 3, "kernels": {}}269self.a._comps["testcomp2"] = {"max_kernels": 5, "kernels": {}}270271for i in range(8):272y = self.a._find_open_computer()273assert_equal(y == "testcomp1" or y == "testcomp2", True)274self.a._comps[y]["max_kernels"] -= 1275276try:277self.a._find_open_computer()278except IOError as e:279assert_equal("Could not find open computer. There are 2 computers available.", e.message)280281def test_create_connected_stream(self):282host="localhost"283port = 51337284socket_type = zmq.SUB285286with capture_output() as (out, err):287ret = self.a._create_connected_stream(host, port, socket_type)288out = out[0]289290assert_is(ret.closed(), False)291assert_equal(ret.socket.socket_type, zmq.SUB)292#assert_equal(out, "Connecting to: tcp://%s:%i\n" % (host, port))293294host="localhost"295port = 51337296socket_type = zmq.REQ297298ret = self.a._create_connected_stream(host, port, socket_type)299300assert_is(ret.closed(), False)301assert_equal(ret.socket.socket_type, zmq.REQ)302303host="localhost"304port = 51337305socket_type = zmq.DEALER306307ret = self.a._create_connected_stream(host, port, socket_type)308309assert_is(ret.closed(), False)310assert_equal(ret.socket.socket_type, zmq.DEALER)311312def test_create_iopub_stream(self): # depends on create_connected_stream313kernel_id = "kern1"314comp_id = "testcomp1"315self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "iopub_port": 50101}}316self.a._comps[comp_id] = {"host": "localhost"}317318ret = self.a.create_iopub_stream(kernel_id)319320assert_is(ret.closed(), False)321assert_equal(ret.socket.socket_type, zmq.SUB)322323324def test_create_shell_stream(self): # depends on create_connected_stream325kernel_id = "kern1"326comp_id = "testcomp1"327self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "shell_port": 50101}}328self.a._comps[comp_id] = {"host": "localhost"}329330ret = self.a.create_shell_stream(kernel_id)331332assert_is(ret.closed(), False)333assert_equal(ret.socket.socket_type, zmq.DEALER)334335336def test_create_hb_stream(self): # depends on create_connected_stream337kernel_id = "kern1"338comp_id = "testcomp1"339self.a._kernels[kernel_id] = {"comp_id": comp_id, "connection": {"ip": "127.0.0.1", "hb_port": 50101}}340self.a._comps[comp_id] = {"host": "localhost"}341342ret = self.a.create_hb_stream(kernel_id)343assert_is(ret.closed(), False)344assert_equal(ret.socket.socket_type, zmq.REQ)345346347348