Path: blob/main/singlestoredb/tests/test_fusion.py
469 views
#!/usr/bin/env python1# type: ignore2"""SingleStoreDB Fusion testing."""3import os4import random5import secrets6import tempfile7import time8import unittest9from typing import Any10from typing import List1112import pytest1314import singlestoredb as s215from singlestoredb.tests import utils161718class TestFusion(unittest.TestCase):1920dbname: str = ''21dbexisted: bool = False2223@classmethod24def setUpClass(cls):25sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')26cls.dbname, cls.dbexisted = utils.load_sql(sql_file)2728@classmethod29def tearDownClass(cls):30if not cls.dbexisted:31utils.drop_database(cls.dbname)3233def setUp(self):34self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')35os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'36self.conn = s2.connect(database=type(self).dbname, local_infile=True)37self.cur = self.conn.cursor()3839def tearDown(self):40if self.enabled:41os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled42else:43del os.environ['SINGLESTOREDB_FUSION_ENABLED']4445try:46if self.cur is not None:47self.cur.close()48except Exception:49# traceback.print_exc()50pass5152try:53if self.conn is not None:54self.conn.close()55except Exception:56# traceback.print_exc()57pass5859def test_env_var(self):60os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '0'6162with self.assertRaises(s2.ProgrammingError):63self.cur.execute('show fusion commands')6465del os.environ['SINGLESTOREDB_FUSION_ENABLED']6667with self.assertRaises(s2.ProgrammingError):68self.cur.execute('show fusion commands')6970os.environ['SINGLESTOREDB_FUSION_ENABLED'] = 'yes'7172self.cur.execute('show fusion commands')73assert list(self.cur)7475def test_show_commands(self):76self.cur.execute('show fusion commands')77cmds = [x[0] for x in self.cur.fetchall()]78assert cmds79assert [x for x in cmds if x.strip().startswith('SHOW FUSION GRAMMAR')], cmds8081self.cur.execute('show fusion commands like "create%"')82cmds = [x[0] for x in self.cur.fetchall()]83assert cmds84assert [x for x in cmds if x.strip().startswith('CREATE')] == cmds, cmds8586def test_show_grammar(self):87self.cur.execute('show fusion grammar for "create workspace"')88cmds = [x[0] for x in self.cur.fetchall()]89assert cmds90assert [x for x in cmds if x.strip().startswith('CREATE WORKSPACE')], cmds919293@pytest.mark.management94class TestWorkspaceFusion(unittest.TestCase):9596id: str = secrets.token_hex(8)97dbname: str = ''98dbexisted: bool = False99workspace_groups: List[Any] = []100101@classmethod102def setUpClass(cls):103sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')104cls.dbname, cls.dbexisted = utils.load_sql(sql_file)105mgr = s2.manage_workspaces()106us_regions = [x for x in mgr.regions if x.name.startswith('US')]107non_us_regions = [x for x in mgr.regions if not x.name.startswith('US')]108wg = mgr.create_workspace_group(109f'A Fusion Testing {cls.id}',110region=random.choice(us_regions),111firewall_ranges=[],112)113cls.workspace_groups.append(wg)114wg = mgr.create_workspace_group(115f'B Fusion Testing {cls.id}',116region=random.choice(us_regions),117firewall_ranges=[],118)119cls.workspace_groups.append(wg)120wg = mgr.create_workspace_group(121f'C Fusion Testing {cls.id}',122region=random.choice(non_us_regions),123firewall_ranges=[],124)125cls.workspace_groups.append(wg)126127@classmethod128def tearDownClass(cls):129if not cls.dbexisted:130utils.drop_database(cls.dbname)131while cls.workspace_groups:132cls.workspace_groups.pop().terminate(force=True)133134def setUp(self):135self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')136os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'137self.conn = s2.connect(database=type(self).dbname, local_infile=True)138self.cur = self.conn.cursor()139140def tearDown(self):141if self.enabled:142os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled143else:144del os.environ['SINGLESTOREDB_FUSION_ENABLED']145146try:147if self.cur is not None:148self.cur.close()149except Exception:150# traceback.print_exc()151pass152153try:154if self.conn is not None:155self.conn.close()156except Exception:157# traceback.print_exc()158pass159160def test_show_regions(self):161self.cur.execute('show regions')162regs = list(self.cur)163desc = self.cur.description164165us_regs = [x for x in regs if x[0].startswith('US')]166167assert len(desc) == 3168assert len(regs) > 5169assert len(us_regs) > 5170171# LIKE172self.cur.execute('show regions like "US%"')173regs = list(self.cur)174assert regs == us_regs175176# LIMIT177self.cur.execute('show regions like "US%" limit 3')178regs = list(self.cur)179assert len(regs) == 3180181# ORDER BY182self.cur.execute('show regions like "US%" limit 3 order by name')183regs = list(self.cur)184assert len(regs) == 3185assert regs == list(sorted(regs, key=lambda x: x[0]))186187# Wrong column188with self.assertRaises(KeyError):189self.cur.execute('show regions like "US%" limit 3 order by foo')190191def test_show_workspace_groups(self):192self.cur.execute('show workspace groups')193wgs = list(self.cur)194desc = self.cur.description195196assert len(desc) == 4197assert desc[0].name == 'Name'198assert desc[1].name == 'ID'199assert desc[2].name == 'Region'200assert desc[3].name == 'FirewallRanges'201assert len(wgs) >= 3202203names = [x[0] for x in wgs]204assert f'A Fusion Testing {self.id}' in names205assert f'B Fusion Testing {self.id}' in names206assert f'C Fusion Testing {self.id}' in names207208# LIKE clause209self.cur.execute(f'show workspace groups like "A%sion Testing {self.id}"')210wgs = list(self.cur)211212names = [x[0] for x in wgs]213assert f'A Fusion Testing {self.id}' in names214assert f'B Fusion Testing {self.id}' not in names215assert f'C Fusion Testing {self.id}' not in names216217# LIMIT clause218self.cur.execute('show workspace groups limit 2')219wgs = list(self.cur)220assert len(wgs) == 2221222# EXTENDED attributes223self.cur.execute('show workspace groups extended')224wgs = list(self.cur)225desc = self.cur.description226227assert len(desc) == 6228assert desc[4].name == 'CreatedAt'229assert desc[5].name == 'TerminatedAt'230231# ORDER BY232self.cur.execute(233f'show workspace groups like "% Fusion Testing {self.id}" order by name desc',234)235wgs = list(self.cur)236237names = [x[0] for x in wgs]238assert names == [239f'C Fusion Testing {self.id}',240f'B Fusion Testing {self.id}',241f'A Fusion Testing {self.id}',242]243244# All options245self.cur.execute(246f'show workspace groups like "% Fusion Testing {self.id}" '247'extended order by name desc limit 2',248)249wgs = list(self.cur)250desc = self.cur.description251names = [x[0] for x in wgs]252253assert len(desc) == 6254assert names == [f'C Fusion Testing {self.id}', f'B Fusion Testing {self.id}']255256def test_show_workspaces(self):257mgr = s2.manage_workspaces()258wg = mgr.workspace_groups[f'B Fusion Testing {self.id}']259260self.cur.execute(261'create workspace show-ws-1 in group '262f'"B Fusion Testing {self.id}" with size S-00',263)264self.cur.execute(265'create workspace show-ws-2 in group '266f'"B Fusion Testing {self.id}" with size S-00',267)268self.cur.execute(269'create workspace show-ws-3 in group '270f'"B Fusion Testing {self.id}" with size S-00',271)272273time.sleep(30)274iterations = 20275while True:276wgs = wg.workspaces277states = [278x.state for x in wgs279if x.name in ('show-ws-1', 'show-ws-2', 'show-ws-3')280]281if len(states) == 3 and states.count('ACTIVE') == 3:282break283iterations -= 1284if not iterations:285raise RuntimeError('timed out waiting for workspaces to start')286time.sleep(30)287288# SHOW289self.cur.execute(f'show workspaces in group "B Fusion Testing {self.id}"')290desc = self.cur.description291out = list(self.cur)292names = [x[0] for x in out]293assert len(desc) == 4294assert [x[0] for x in desc] == ['Name', 'ID', 'Size', 'State']295assert len(out) >= 3296assert 'show-ws-1' in names297assert 'show-ws-2' in names298assert 'show-ws-3' in names299300# SHOW ID301self.cur.execute(f'show workspaces in group id {wg.id}')302desc = self.cur.description303out = list(self.cur)304names = [x[0] for x in out]305assert len(desc) == 4306assert [x[0] for x in desc] == ['Name', 'ID', 'Size', 'State']307assert len(out) >= 3308assert 'show-ws-1' in names309assert 'show-ws-2' in names310assert 'show-ws-3' in names311312# LIKE clause313self.cur.execute(314'show workspaces in group '315f'"B Fusion Testing {self.id}" like "%2"',316)317out = list(self.cur)318names = [x[0] for x in out]319assert len(out) >= 1320assert [x for x in names if x.endswith('2')]321assert 'show-ws-1' not in names322assert 'show-ws-2' in names323assert 'show-ws-3' not in names324325# Extended attributes326self.cur.execute(327'show workspaces in group '328f'"B Fusion Testing {self.id}" extended',329)330desc = self.cur.description331out = list(self.cur)332assert len(desc) == 7333assert [x[0] for x in desc] == [334'Name', 'ID', 'Size', 'State',335'Endpoint', 'CreatedAt', 'TerminatedAt',336]337338# ORDER BY339self.cur.execute(340'show workspaces in group '341f'"B Fusion Testing {self.id}" order by name desc',342)343out = list(self.cur)344desc = self.cur.description345assert len(desc) == 4346names = [x[0] for x in out]347assert names == ['show-ws-3', 'show-ws-2', 'show-ws-1']348349# LIMIT clause350self.cur.execute(351'show workspaces in group '352f'"B Fusion Testing {self.id}" order by name desc limit 2',353)354out = list(self.cur)355desc = self.cur.description356assert len(desc) == 4357names = [x[0] for x in out]358assert names == ['show-ws-3', 'show-ws-2']359360# All options361self.cur.execute(362f'show workspaces in group "B Fusion Testing {self.id}" '363'like "show-ws%" extended order by name desc limit 2',364)365out = list(self.cur)366desc = self.cur.description367assert len(desc) == 7368names = [x[0] for x in out]369assert names == ['show-ws-3', 'show-ws-2']370371def test_create_drop_workspace(self):372mgr = s2.manage_workspaces()373wg = mgr.workspace_groups[f'A Fusion Testing {self.id}']374375self.cur.execute(376f'create workspace foobar-1 in group "A Fusion Testing {self.id}" '377'with size S-00 wait on active',378)379foobar_1 = [x for x in wg.workspaces if x.name == 'foobar-1']380assert len(foobar_1) == 1381382self.cur.execute(383f'create workspace foobar-2 in group "A Fusion Testing {self.id}" '384'with size S-00 wait on active',385)386foobar_2 = [x for x in wg.workspaces if x.name == 'foobar-2']387assert len(foobar_2) == 1388389# Drop by name390self.cur.execute(391f'drop workspace "foobar-1" in group "A Fusion Testing {self.id}" '392'wait on terminated',393)394foobar_1 = [x for x in wg.workspaces if x.name == 'foobar-1']395assert len(foobar_1) == 0396397# Drop by ID398foobar_2_id = foobar_2[0].id399self.cur.execute(400f'drop workspace id {foobar_2_id} in group '401f'"A Fusion Testing {self.id}" wait on terminated',402)403foobar_2 = [x for x in wg.workspaces if x.name == 'foobar-2']404assert len(foobar_2) == 0405406# Drop non-existent by ID407with self.assertRaises(KeyError):408self.cur.execute(409f'drop workspace id {foobar_2_id} '410f'in group "A Fusion Testing {self.id}"',411)412413# Drop non-existent by ID with IF EXISTS414self.cur.execute(415f'drop workspace IF EXISTS id {foobar_2_id} '416f'in group "A Fusion Testing {self.id}"',417)418419def test_create_drop_workspace_group(self):420mgr = s2.manage_workspaces()421422reg = [x for x in mgr.regions if x.name.startswith('US')][0]423wg_name = f'Create WG Test {id(self)}'424425try:426self.cur.execute(427f'create workspace group "{wg_name}" '428f'in region "{reg.name}"',429)430wg = [x for x in mgr.workspace_groups if x.name == wg_name]431assert len(wg) == 1432433# Drop it by name434self.cur.execute(435f'drop workspace group "{wg_name}" '436'wait on terminated',437)438wg = [x for x in mgr.workspace_groups if x.name == wg_name]439assert len(wg) == 0440441# Create it again442self.cur.execute(443f'create workspace group "{wg_name}" in region "{reg.name}"',444)445wg = [x for x in mgr.workspace_groups if x.name == wg_name]446assert len(wg) == 1447448# Drop it by ID449wg_id = wg[0].id450self.cur.execute(f'drop workspace group id {wg_id} wait on terminated')451wg = [x for x in mgr.workspace_groups if x.name == wg_name]452assert len(wg) == 0453454# Drop non-existent455with self.assertRaises(KeyError):456self.cur.execute(f'drop workspace group id {wg_id}')457458# Drop non-existent with IF EXISTS459self.cur.execute(f'drop workspace group if exists id {wg_id}')460461finally:462try:463mgr.workspace_groups[wg_name].terminate(force=True)464except Exception:465pass466467468@pytest.mark.management469class TestJobsFusion(unittest.TestCase):470471id: str = secrets.token_hex(8)472notebook_name: str = 'Scheduling Test.ipynb'473dbname: str = ''474dbexisted: bool = False475manager: None476workspace_group: None477workspace: None478job_ids = []479480@classmethod481def setUpClass(cls):482sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')483cls.dbname, cls.dbexisted = utils.load_sql(sql_file)484cls.manager = s2.manage_workspaces()485us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]486cls.workspace_group = cls.manager.create_workspace_group(487f'Jobs Fusion Testing {cls.id}',488region=random.choice(us_regions),489firewall_ranges=[],490)491cls.workspace = cls.workspace_group.create_workspace(492f'jobs-test-{cls.id}',493wait_on_active=True,494)495os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = cls.dbname496os.environ['SINGLESTOREDB_WORKSPACE'] = cls.workspace.id497498@classmethod499def tearDownClass(cls):500for job_id in cls.job_ids:501try:502cls.manager.organizations.current.jobs.delete(job_id)503except Exception:504pass505if cls.workspace_group is not None:506cls.workspace_group.terminate(force=True)507cls.manager = None508cls.workspace_group = None509cls.workspace = None510if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:511del os.environ['SINGLESTOREDB_WORKSPACE']512if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:513del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']514515def setUp(self):516self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')517os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'518self.conn = s2.connect(database=type(self).dbname, local_infile=True)519self.cur = self.conn.cursor()520521def tearDown(self):522if self.enabled:523os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled524else:525del os.environ['SINGLESTOREDB_FUSION_ENABLED']526527try:528if self.cur is not None:529self.cur.close()530except Exception:531# traceback.print_exc()532pass533534try:535if self.conn is not None:536self.conn.close()537except Exception:538# traceback.print_exc()539pass540541def test_schedule_drop_job(self):542# schedule recurring job543self.cur.execute(544f'schedule job using notebook "{self.notebook_name}" '545'with mode "recurring" '546'execute every 5 minutes '547'with name "recurring-job" '548'create snapshot '549'resume target '550'with runtime "notebooks-cpu-small" '551'with parameters '552'{"strParam": "string", "intParam": 1, '553'"floatParam": 1.0, "boolParam": true}',554)555out = list(self.cur)556job_id = out[0][0]557self.job_ids.append(job_id)558desc = self.cur.description559assert len(desc) == 1560assert desc[0][0] == 'JobID'561assert len(out) == 1562assert out[0][0] == job_id563564# drop job565self.cur.execute(f'drop jobs {job_id}')566out = list(self.cur)567desc = self.cur.description568assert len(desc) == 2569assert [x[0] for x in desc] == [570'JobID', 'Success',571]572assert len(out) == 1573res = out[0]574assert res[0] == job_id575assert res[1] == 1576577def test_run_wait_drop_job(self):578# run job579self.cur.execute(580f'run job using notebook "{self.notebook_name}" '581'with runtime "notebooks-cpu-small" '582'with parameters '583'{"strParam": "string", "intParam": 1, '584'"floatParam": 1.0, "boolParam": true}',585)586out = list(self.cur)587job_id = out[0][0]588self.job_ids.append(job_id)589desc = self.cur.description590assert len(desc) == 1591assert desc[0][0] == 'JobID'592assert len(out) == 1593assert out[0][0] == job_id594595# wait on job596self.cur.execute(f'wait on jobs {job_id}')597out = list(self.cur)598desc = self.cur.description599assert len(desc) == 1600assert desc[0][0] == 'Success'601assert out[0][0] == 1602603# drop job604self.cur.execute(f'drop jobs {job_id}')605out = list(self.cur)606desc = self.cur.description607assert len(desc) == 2608assert [x[0] for x in desc] == [609'JobID', 'Success',610]611assert len(out) == 1612res = out[0]613assert res[0] == job_id614assert res[1] == 1615616def test_show_jobs_and_executions(self):617# schedule recurring job618self.cur.execute(619f'schedule job using notebook "{self.notebook_name}" '620'with mode "recurring" '621'execute every 5 minutes '622'with name "show-job" '623'with runtime "notebooks-cpu-small" '624'with parameters '625'{"strParam": "string", "intParam": 1, '626'"floatParam": 1.0, "boolParam": true}',627)628out = list(self.cur)629job_id = out[0][0]630self.job_ids.append(job_id)631desc = self.cur.description632assert len(desc) == 1633assert desc[0][0] == 'JobID'634assert len(out) == 1635assert out[0][0] == job_id636637# show jobs with name like "show-job"638self.cur.execute(f'show jobs {job_id} like "show-job"')639out = list(self.cur)640desc = self.cur.description641assert len(desc) == 9642assert [x[0] for x in desc] == [643'JobID', 'Name', 'CreatedAt', 'EnqueuedBy',644'CompletedExecutions', 'NotebookPath', 'DatabaseName', 'TargetID',645'TargetType',646]647assert len(out) == 1648job = out[0]649assert job[0] == job_id650assert job[1] == 'show-job'651assert job[5] == self.notebook_name652assert job[6] == self.dbname653assert job[7] == self.workspace.id654assert job[8] == 'Workspace'655656# show jobs with name like "show-job" extended657self.cur.execute(f'show jobs {job_id} like "show-job" extended')658out = list(self.cur)659desc = self.cur.description660assert len(desc) == 17661assert [x[0] for x in desc] == [662'JobID', 'Name', 'CreatedAt', 'EnqueuedBy',663'CompletedExecutions', 'NotebookPath', 'DatabaseName', 'TargetID',664'TargetType', 'Description', 'TerminatedAt', 'CreateSnapshot',665'MaxDurationInMins', 'ExecutionIntervalInMins', 'Mode', 'StartAt',666'ResumeTarget',667]668assert len(out) == 1669job = out[0]670assert job[0] == job_id671assert job[1] == 'show-job'672assert job[5] == self.notebook_name673assert job[6] == self.dbname674assert job[7] == self.workspace.id675assert job[8] == 'Workspace'676assert not job[11]677assert job[13] == 5678assert job[14] == 'Recurring'679assert not job[16]680681# show executions for job with id job_id from 1 to 5682self.cur.execute(f'show job executions for {job_id} from 1 to 5')683out = list(self.cur)684desc = self.cur.description685assert len(desc) == 7686assert [x[0] for x in desc] == [687'ExecutionID', 'ExecutionNumber', 'JobID',688'Status', 'ScheduledStartTime', 'StartedAt', 'FinishedAt',689]690exec_job_ids = [x[2] for x in out]691for x in exec_job_ids:692assert x == job_id693694# show executions for job with id job_id from 1 to 5 extended695self.cur.execute(f'show job executions for {job_id} from 1 to 5 extended')696out = list(self.cur)697desc = self.cur.description698assert len(desc) == 8699assert [x[0] for x in desc] == [700'ExecutionID', 'ExecutionNumber', 'JobID',701'Status', 'ScheduledStartTime', 'StartedAt', 'FinishedAt',702'SnapshotNotebookPath',703]704exec_job_ids = [x[2] for x in out]705for x in exec_job_ids:706assert x == job_id707708# drop job709self.cur.execute(f'drop jobs {job_id}')710out = list(self.cur)711desc = self.cur.description712assert len(desc) == 2713assert [x[0] for x in desc] == [714'JobID', 'Success',715]716assert len(out) == 1717res = out[0]718assert res[0] == job_id719assert res[1] == 1720721722@pytest.mark.management723class TestStageFusion(unittest.TestCase):724725id: str = secrets.token_hex(8)726dbname: str = 'information_schema'727manager: None728workspace_group: None729workspace_group_2: None730731@classmethod732def setUpClass(cls):733cls.manager = s2.manage_workspaces()734us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]735cls.workspace_group = cls.manager.create_workspace_group(736f'Stage Fusion Testing 1 {cls.id}',737region=random.choice(us_regions),738firewall_ranges=[],739)740cls.workspace_group_2 = cls.manager.create_workspace_group(741f'Stage Fusion Testing 2 {cls.id}',742region=random.choice(us_regions),743firewall_ranges=[],744)745# Wait for both workspace groups to start746time.sleep(5)747748os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'749os.environ['SINGLESTOREDB_WORKSPACE_GROUP'] = cls.workspace_group.id750751@classmethod752def tearDownClass(cls):753if cls.workspace_group is not None:754cls.workspace_group.terminate(force=True)755if cls.workspace_group_2 is not None:756cls.workspace_group_2.terminate(force=True)757cls.manager = None758cls.workspace_group = None759cls.workspace_group_2 = None760cls.workspace = None761cls.workspace_2 = None762if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:763del os.environ['SINGLESTOREDB_WORKSPACE']764if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP', None) is not None:765del os.environ['SINGLESTOREDB_WORKSPACE_GROUP']766if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:767del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']768769def setUp(self):770self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')771os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'772self.conn = s2.connect(database=type(self).dbname, local_infile=True)773self.cur = self.conn.cursor()774775def tearDown(self):776self._clear_stage()777778if self.enabled:779os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled780else:781del os.environ['SINGLESTOREDB_FUSION_ENABLED']782783try:784if self.cur is not None:785self.cur.close()786except Exception:787# traceback.print_exc()788pass789790try:791if self.conn is not None:792self.conn.close()793except Exception:794# traceback.print_exc()795pass796797def _clear_stage(self):798if self.workspace_group is not None:799self.cur.execute(f'''800show stage files801in group id '{self.workspace_group.id}' recursive802''')803files = list(self.cur)804folders = []805for file in files:806if file[0].endswith('/'):807folders.append(file)808continue809self.cur.execute(f'''810drop stage file '{file[0]}'811in group id '{self.workspace_group.id}'812''')813for folder in folders:814self.cur.execute(f'''815drop stage folder '{folder[0]}'816in group id '{self.workspace_group.id}'817''')818819if self.workspace_group_2 is not None:820self.cur.execute(f'''821show stage files822in group id '{self.workspace_group_2.id}' recursive823''')824files = list(self.cur)825folders = []826for file in files:827if file[0].endswith('/'):828folders.append(file)829continue830self.cur.execute(f'''831drop stage file '{file[0]}'832in group id '{self.workspace_group_2.id}'833''')834for folder in folders:835self.cur.execute(f'''836drop stage folder '{folder[0]}'837in group id '{self.workspace_group_2.id}'838''')839840def test_show_stage(self):841test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')842843# Should be empty844self.cur.execute('''845show stage files846''')847files = list(self.cur)848assert len(files) == 0849850# Copy files to stage851self.cur.execute(852f'upload file to stage "new_test_1.sql" from "{test2_sql}"',853)854self.cur.execute('create stage folder "subdir1"')855self.cur.execute(856f'upload file to stage "subdir1/new_test_2.sql" from "{test2_sql}"',857)858self.cur.execute(859f'upload file to stage "subdir1/new_test_3.sql" from "{test2_sql}"',860)861self.cur.execute('create stage folder "subdir2"')862self.cur.execute(863f'upload file to stage "subdir2/new_test_4.sql" from "{test2_sql}"',864)865self.cur.execute(866f'upload file to stage "subdir2/new_test_5.sql" from "{test2_sql}"',867)868869# Make sure files are there870self.cur.execute('''871show stage files recursive872''')873files = list(self.cur)874assert len(files) == 7875assert list(sorted(x[0] for x in files)) == [876'new_test_1.sql',877'subdir1/',878'subdir1/new_test_2.sql',879'subdir1/new_test_3.sql',880'subdir2/',881'subdir2/new_test_4.sql',882'subdir2/new_test_5.sql',883]884885# Do non-recursive listing886self.cur.execute('''887show stage files888''')889files = list(self.cur)890assert len(files) == 3891assert list(sorted(x[0] for x in files)) == [892'new_test_1.sql',893'subdir1/',894'subdir2/',895]896897# List files in specific workspace group898self.cur.execute(f'''899show stage files in group id '{self.workspace_group.id}'900''')901files = list(self.cur)902assert len(files) == 3903assert list(sorted(x[0] for x in files)) == [904'new_test_1.sql',905'subdir1/',906'subdir2/',907]908909self.cur.execute(f'''910show stage files in id '{self.workspace_group.id}'911''')912files = list(self.cur)913assert len(files) == 3914assert list(sorted(x[0] for x in files)) == [915'new_test_1.sql',916'subdir1/',917'subdir2/',918]919920self.cur.execute(f'''921show stage files in group '{self.workspace_group.name}'922''')923files = list(self.cur)924assert len(files) == 3925assert list(sorted(x[0] for x in files)) == [926'new_test_1.sql',927'subdir1/',928'subdir2/',929]930931self.cur.execute(f'''932show stage files in '{self.workspace_group.name}'933''')934files = list(self.cur)935assert len(files) == 3936assert list(sorted(x[0] for x in files)) == [937'new_test_1.sql',938'subdir1/',939'subdir2/',940]941942# Check other workspace group943self.cur.execute(f'''944show stage files in group '{self.workspace_group_2.name}'945''')946files = list(self.cur)947assert len(files) == 0948949# Limit results950self.cur.execute('''951show stage files recursive limit 5952''')953files = list(self.cur)954assert len(files) == 5955assert list(sorted(x[0] for x in files)) == [956'new_test_1.sql',957'subdir1/',958'subdir1/new_test_2.sql',959'subdir1/new_test_3.sql',960'subdir2/',961]962963# Order by type and name964self.cur.execute('''965show stage files order by type, name recursive extended966''')967files = list(self.cur)968assert len(files) == 7969assert list(x[0] for x in files) == [970'subdir1/',971'subdir2/',972'new_test_1.sql',973'subdir1/new_test_2.sql',974'subdir1/new_test_3.sql',975'subdir2/new_test_4.sql',976'subdir2/new_test_5.sql',977]978979# Order by type and name descending980self.cur.execute('''981show stage files order by type desc, name desc recursive extended982''')983files = list(self.cur)984assert len(files) == 7985assert list(x[0] for x in files) == [986'subdir2/new_test_5.sql',987'subdir2/new_test_4.sql',988'subdir1/new_test_3.sql',989'subdir1/new_test_2.sql',990'new_test_1.sql',991'subdir2/',992'subdir1/',993]994995# List at specific path996self.cur.execute('''997show stage files at 'subdir2/' recursive998''')999files = list(self.cur)1000assert len(files) == 21001assert list(sorted(x[0] for x in files)) == [1002'new_test_4.sql',1003'new_test_5.sql',1004]10051006# LIKE clause1007self.cur.execute('''1008show stage files like '%_4.%' recursive1009''')1010files = list(self.cur)1011assert len(files) == 11012assert list(sorted(x[0] for x in files)) == [1013'subdir2/new_test_4.sql',1014]10151016def test_download_stage(self):1017test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')10181019# Should be empty1020self.cur.execute('''1021show stage files1022''')1023files = list(self.cur)1024assert len(files) == 010251026# Copy file to stage 11027self.cur.execute(f'''1028upload file to stage 'dl_test.sql' from '{test2_sql}'1029''')10301031self.cur.execute('''1032show stage files1033''')1034files = list(self.cur)1035assert len(files) == 11036assert list(sorted(x[0] for x in files)) == ['dl_test.sql']10371038# Copy file to stage 21039self.cur.execute(f'''1040upload file to stage 'dl_test2.sql'1041in group '{self.workspace_group_2.name}'1042from '{test2_sql}'1043''')10441045# Make sure only one file in stage 21046self.cur.execute(f'''1047show stage files in group '{self.workspace_group_2.name}'1048''')1049files = list(self.cur)1050assert len(files) == 11051assert list(sorted(x[0] for x in files)) == ['dl_test2.sql']10521053# Download file from stage 11054with tempfile.TemporaryDirectory() as tmpdir:1055self.cur.execute(f'''1056download stage file 'dl_test.sql' to '{tmpdir}/dl_test.sql'1057''')1058with open(os.path.join(tmpdir, 'dl_test.sql'), 'r') as dl_file:1059assert dl_file.read() == open(test2_sql, 'r').read()10601061# Download file from stage 21062with tempfile.TemporaryDirectory() as tmpdir:1063self.cur.execute(f'''1064download stage file 'dl_test2.sql'1065in group '{self.workspace_group_2.name}'1066to '{tmpdir}/dl_test2.sql'1067''')1068with open(os.path.join(tmpdir, 'dl_test2.sql'), 'r') as dl_file:1069assert dl_file.read() == open(test2_sql, 'r').read()10701071def test_stage_multi_wg_operations(self):1072test_sql = os.path.join(os.path.dirname(__file__), 'test.sql')1073test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')10741075# Should be empty1076self.cur.execute('''1077show stage files1078''')1079files = list(self.cur)1080assert len(files) == 010811082# Copy file to stage 11083self.cur.execute(f'''1084upload file to stage 'new_test.sql' from '{test_sql}'1085''')10861087self.cur.execute('''1088show stage files1089''')1090files = list(self.cur)1091assert len(files) == 110921093# Copy file to stage 21094self.cur.execute(f'''1095upload file to stage 'new_test2.sql'1096in group '{self.workspace_group_2.name}'1097from '{test2_sql}'1098''')10991100# Make sure only one file in stage 11101self.cur.execute('''1102show stage files1103''')1104files = list(self.cur)1105assert len(files) == 11106assert files[0][0] == 'new_test.sql'11071108# Make sure only one file in stage 21109self.cur.execute(f'''1110show stage files in group '{self.workspace_group_2.name}' recursive1111''')1112files = list(self.cur)1113assert len(files) == 11114assert list(sorted(x[0] for x in files)) == ['new_test2.sql']11151116# Make sure only one file in stage 2 (using IN)1117self.cur.execute(f'''1118show stage files in '{self.workspace_group_2.name}' recursive1119''')1120files = list(self.cur)1121assert len(files) == 11122assert list(sorted(x[0] for x in files)) == ['new_test2.sql']11231124# Make subdir1125self.cur.execute(f'''1126create stage folder 'data' in group '{self.workspace_group_2.name}'1127''')11281129# Upload file using workspace ID1130self.cur.execute(f'''1131upload file to stage 'data/new_test2_sub.sql'1132in group id '{self.workspace_group_2.id}'1133from '{test2_sql}'1134''')11351136# Make sure only one file in stage 11137self.cur.execute('''1138show stage files1139''')1140files = list(self.cur)1141assert len(files) == 11142assert files[0][0] == 'new_test.sql'11431144# Make sure two files in stage 21145self.cur.execute(f'''1146show stage files in group id '{self.workspace_group_2.id}' recursive1147''')1148files = list(self.cur)1149assert len(files) == 31150assert list(sorted(x[0] for x in files)) == \1151['data/', 'data/new_test2_sub.sql', 'new_test2.sql']11521153# Test overwrite1154with self.assertRaises(OSError):1155self.cur.execute(f'''1156upload file to stage 'data/new_test2_sub.sql'1157in group id '{self.workspace_group_2.id}'1158from '{test2_sql}'1159''')11601161self.cur.execute(f'''1162upload file to stage 'data/new_test2_sub.sql'1163in group id '{self.workspace_group_2.id}'1164from '{test2_sql}' overwrite1165''')11661167# Make sure two files in stage 21168self.cur.execute(f'''1169show stage files in group id '{self.workspace_group_2.id}' recursive1170''')1171files = list(self.cur)1172assert len(files) == 31173assert list(sorted(x[0] for x in files)) == \1174['data/', 'data/new_test2_sub.sql', 'new_test2.sql']11751176# Test LIKE clause1177self.cur.execute(f'''1178show stage files1179in group id '{self.workspace_group_2.id}'1180like '%_sub%' recursive1181''')1182files = list(self.cur)1183assert len(files) == 11184assert list(sorted(x[0] for x in files)) == ['data/new_test2_sub.sql']11851186# Drop file from default stage1187self.cur.execute('''1188drop stage file 'new_test.sql'1189''')11901191# Make sure no files in stage 11192self.cur.execute('''1193show stage files1194''')1195files = list(self.cur)1196assert len(files) == 011971198# Make sure two files in stage 21199self.cur.execute(f'''1200show stage files in group id '{self.workspace_group_2.id}' recursive1201''')1202files = list(self.cur)1203assert len(files) == 31204assert list(sorted(x[0] for x in files)) == \1205['data/', 'data/new_test2_sub.sql', 'new_test2.sql']12061207# Attempt to drop directory from stage 21208with self.assertRaises(OSError):1209self.cur.execute(f'''1210drop stage folder 'data'1211in group id '{self.workspace_group_2.id}'1212''')12131214self.cur.execute(f'''1215drop stage file 'data/new_test2_sub.sql'1216in group id '{self.workspace_group_2.id}'1217''')12181219# Make sure one file and one directory in stage 21220self.cur.execute(f'''1221show stage files in group id '{self.workspace_group_2.id}' recursive1222''')1223files = list(self.cur)1224assert len(files) == 21225assert list(sorted(x[0] for x in files)) == ['data/', 'new_test2.sql']12261227# Drop stage folder from stage 21228self.cur.execute(f'''1229drop stage folder 'data'1230in group id '{self.workspace_group_2.id}'1231''')12321233# Make sure one file in stage 21234self.cur.execute(f'''1235show stage files in group id '{self.workspace_group_2.id}' recursive1236''')1237files = list(self.cur)1238assert len(files) == 11239assert list(sorted(x[0] for x in files)) == ['new_test2.sql']12401241# Drop last file1242self.cur.execute(f'''1243drop stage file 'new_test2.sql'1244in group id '{self.workspace_group_2.id}'1245''')12461247# Make sure no files in stage 21248self.cur.execute(f'''1249show stage files in group id '{self.workspace_group_2.id}' recursive1250''')1251files = list(self.cur)1252assert len(files) == 0125312541255@pytest.mark.management1256class TestFilesFusion(unittest.TestCase):12571258id: str = secrets.token_hex(8)1259dbname: str = 'information_schema'1260manager: None1261workspace_group: None12621263@classmethod1264def setUpClass(cls):1265cls.manager = s2.manage_workspaces()1266us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]1267cls.workspace_group = cls.manager.create_workspace_group(1268f'Files Fusion Testing {cls.id}',1269region=random.choice(us_regions),1270firewall_ranges=[],1271)1272# Wait for both workspace groups to start1273time.sleep(5)12741275os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'1276os.environ['SINGLESTOREDB_WORKSPACE_GROUP'] = cls.workspace_group.id12771278@classmethod1279def tearDownClass(cls):1280if cls.workspace_group is not None:1281cls.workspace_group.terminate(force=True)1282cls.manager = None1283cls.workspace_group = None1284cls.workspace = None1285if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:1286del os.environ['SINGLESTOREDB_WORKSPACE']1287if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP', None) is not None:1288del os.environ['SINGLESTOREDB_WORKSPACE_GROUP']1289if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:1290del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']12911292def setUp(self):1293self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')1294os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'1295self.conn = s2.connect(database=type(self).dbname, local_infile=True)1296self.cur = self.conn.cursor()12971298def tearDown(self):1299self._clear_files()13001301if self.enabled:1302os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled1303else:1304del os.environ['SINGLESTOREDB_FUSION_ENABLED']13051306try:1307if self.cur is not None:1308self.cur.close()1309except Exception:1310# traceback.print_exc()1311pass13121313try:1314if self.conn is not None:1315self.conn.close()1316except Exception:1317# traceback.print_exc()1318pass13191320def _clear_files(self):1321cls = type(self)1322for prefix in ['show', 'dl', 'drop']:1323for i in range(1, 6):1324try:1325self.cur.execute(1326f'''drop personal file "{prefix}_test_{i}_{cls.id}.ipynb"''',1327)1328except (OSError, s2.ManagementError):1329pass1330for i in range(1, 6):1331try:1332self.cur.execute(1333f'''drop shared file "{prefix}_test_{i}_{cls.id}.ipynb"''',1334)1335except (OSError, s2.ManagementError):1336pass13371338def test_show_personal_files(self):1339return self._test_show_files('personal')13401341def test_show_shared_files(self):1342return self._test_show_files('shared')13431344def _test_show_files(self, ftype):1345cls = type(self)1346nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')13471348# Should be empty1349self.cur.execute(f'''1350show {ftype} files like 'show_%{cls.id}%'1351''')1352files = list(self.cur)1353assert len(files) == 013541355# Upload files1356self.cur.execute(1357f'upload {ftype} file to "show_test_1_{cls.id}.ipynb" from "{nb}"',1358)1359self.cur.execute(1360f'upload {ftype} file to "show_test_2_{cls.id}.ipynb" from "{nb}"',1361)1362self.cur.execute(1363f'upload {ftype} file to "show_test_3_{cls.id}.ipynb" from "{nb}"',1364)1365self.cur.execute(1366f'upload {ftype} file to "show_test_4_{cls.id}.ipynb" from "{nb}"',1367)1368self.cur.execute(1369f'upload {ftype} file to "show_test_5_{cls.id}.ipynb" from "{nb}"',1370)13711372# Make sure files are there1373self.cur.execute(f'''1374show {ftype} files like 'show_%{cls.id}%'1375''')1376files = list(self.cur)1377assert len(files) == 51378assert list(sorted(x[0] for x in files)) == [1379f'show_test_1_{cls.id}.ipynb',1380f'show_test_2_{cls.id}.ipynb',1381f'show_test_3_{cls.id}.ipynb',1382f'show_test_4_{cls.id}.ipynb',1383f'show_test_5_{cls.id}.ipynb',1384]13851386# Test ORDER BY1387self.cur.execute(f'''1388show {ftype} files like 'show_%{cls.id}%' order by name desc1389''')1390files = list(self.cur)1391assert len(files) == 51392assert list(x[0] for x in files) == [1393f'show_test_5_{cls.id}.ipynb',1394f'show_test_4_{cls.id}.ipynb',1395f'show_test_3_{cls.id}.ipynb',1396f'show_test_2_{cls.id}.ipynb',1397f'show_test_1_{cls.id}.ipynb',1398]13991400# Test LIMIT1401self.cur.execute(f'''1402show {ftype} files like 'show_%{cls.id}%' order by name desc limit 31403''')1404files = list(self.cur)1405assert len(files) == 31406assert list(x[0] for x in files) == [1407f'show_test_5_{cls.id}.ipynb',1408f'show_test_4_{cls.id}.ipynb',1409f'show_test_3_{cls.id}.ipynb',1410]14111412# Test EXTENDED1413self.cur.execute(f'''1414show {ftype} files like 'show_%{cls.id}%' extended1415''')1416assert [x[0] for x in self.cur.description] == \1417['Name', 'Type', 'Size', 'Writable', 'CreatedAt', 'LastModifiedAt']14181419def test_download_personal_files(self):1420return self._test_download_files('personal')14211422def test_download_shared_files(self):1423return self._test_download_files('shared')14241425def _test_download_files(self, ftype):1426cls = type(self)1427nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')14281429# Should be empty1430self.cur.execute(f'''1431show {ftype} files like 'dl_%{cls.id}%'1432''')1433files = list(self.cur)1434assert len(files) == 014351436# Upload files1437self.cur.execute(f'upload {ftype} file to "dl_test_1_{cls.id}.ipynb" from "{nb}"')1438self.cur.execute(f'upload {ftype} file to "dl_test_2_{cls.id}.ipynb" from "{nb}"')14391440# Make sure files are there1441self.cur.execute(f'''1442show {ftype} files like 'dl_%{cls.id}%'1443''')1444files = list(self.cur)1445assert len(files) == 21446assert list(sorted(x[0] for x in files)) == [1447f'dl_test_1_{cls.id}.ipynb',1448f'dl_test_2_{cls.id}.ipynb',1449]14501451# Download files1452with tempfile.TemporaryDirectory() as tmpdir:1453self.cur.execute(f'''1454download {ftype} file 'dl_test_1_{cls.id}.ipynb'1455to '{tmpdir}/dl_test_1.ipynb'1456''')1457with open(os.path.join(tmpdir, 'dl_test_1.ipynb'), 'r') as dl_file:1458assert dl_file.read() == open(nb, 'r').read()14591460self.cur.execute(f'''1461download {ftype} file 'dl_test_2_{cls.id}.ipynb'1462to '{tmpdir}/dl_test_2.ipynb'1463''')1464with open(os.path.join(tmpdir, 'dl_test_2.ipynb'), 'r') as dl_file:1465assert dl_file.read() == open(nb, 'r').read()14661467def test_drop_personal_files(self):1468return self._test_drop_files('personal')14691470def test_drop_shared_files(self):1471return self._test_drop_files('shared')14721473def _test_drop_files(self, ftype):1474cls = type(self)1475nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')14761477# Should be empty1478self.cur.execute(f'''1479show {ftype} files like 'drop_%{cls.id}%'1480''')1481files = list(self.cur)1482assert len(files) == 014831484# Upload files1485self.cur.execute(1486f'upload {ftype} file to "drop_test_1_{cls.id}.ipynb" from "{nb}"',1487)1488self.cur.execute(1489f'upload {ftype} file to "drop_test_2_{cls.id}.ipynb" from "{nb}"',1490)14911492# Make sure files are there1493self.cur.execute(f'''1494show {ftype} files like 'drop_%{cls.id}%'1495''')1496files = list(self.cur)1497assert len(files) == 21498assert list(sorted(x[0] for x in files)) == [1499f'drop_test_1_{cls.id}.ipynb',1500f'drop_test_2_{cls.id}.ipynb',1501]15021503# Drop 1 file1504self.cur.execute(f'''1505drop {ftype} file 'drop_test_1_{cls.id}.ipynb'1506''')15071508# Make sure 1 file is there1509self.cur.execute(f'''1510show {ftype} files like 'drop_%{cls.id}%'1511''')1512files = list(self.cur)1513assert len(files) == 11514assert list(x[0] for x in files) == [f'drop_test_2_{cls.id}.ipynb']15151516# Drop 2nd file1517self.cur.execute(f'''1518drop {ftype} file 'drop_test_2_{cls.id}.ipynb'1519''')15201521# Make sure no files are there1522self.cur.execute(f'''1523show {ftype} files like 'drop_%{cls.id}%'1524''')1525files = list(self.cur)1526assert len(files) == 0152715281529