Path: blob/main/singlestoredb/tests/test_management.py
801 views
#!/usr/bin/env python1# type: ignore2"""SingleStoreDB Management API testing."""3import os4import pathlib5import random6import re7import secrets8import unittest910import pytest1112import singlestoredb as s213from singlestoredb.management.job import Status14from singlestoredb.management.job import TargetType15from singlestoredb.management.region import Region16from singlestoredb.management.utils import NamedList171819TEST_DIR = pathlib.Path(os.path.dirname(__file__))202122def clean_name(s):23"""Change all non-word characters to -."""24return re.sub(r'[^\w]', r'-', s).replace('_', '-').lower()252627def shared_database_name(s):28"""Return a shared database name. Cannot contain special characters except -"""29return re.sub(r'[^\w]', '', s).replace('-', '_').lower()303132@pytest.mark.skip(reason='Legacy cluster Management API is going away')33@pytest.mark.management34class TestCluster(unittest.TestCase):3536manager = None37cluster = None38password = None3940@classmethod41def setUpClass(cls):42cls.manager = s2.manage_cluster()4344us_regions = [x for x in cls.manager.regions if 'US' in x.name]45cls.password = secrets.token_urlsafe(20) + '-x&$'4647cls.cluster = cls.manager.create_cluster(48clean_name('cm-test-{}'.format(secrets.token_urlsafe(20)[:20])),49region=random.choice(us_regions).id,50admin_password=cls.password,51firewall_ranges=['0.0.0.0/0'],52expires_at='1h',53size='S-00',54wait_on_active=True,55)5657@classmethod58def tearDownClass(cls):59if cls.cluster is not None:60cls.cluster.terminate()61cls.cluster = None62cls.manager = None63cls.password = None6465def test_str(self):66assert self.cluster.name in str(self.cluster.name)6768def test_repr(self):69assert repr(self.cluster) == str(self.cluster)7071def test_region_str(self):72s = str(self.cluster.region)73assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s7475def test_region_repr(self):76assert repr(self.cluster.region) == str(self.cluster.region)7778def test_regions(self):79out = self.manager.regions80providers = {x.provider for x in out}81names = [x.name for x in out]82assert 'Azure' in providers, providers83assert 'GCP' in providers, providers84assert 'AWS' in providers, providers8586objs = {}87ids = []88for item in out:89ids.append(item.id)90objs[item.id] = item91if item.name not in objs:92objs[item.name] = item9394name = random.choice(names)95assert out[name] == objs[name]96id = random.choice(ids)97assert out[id] == objs[id]9899def test_clusters(self):100clusters = self.manager.clusters101ids = [x.id for x in clusters]102assert self.cluster.id in ids, ids103104def test_get_cluster(self):105clus = self.manager.get_cluster(self.cluster.id)106assert clus.id == self.cluster.id, clus.id107108with self.assertRaises(s2.ManagementError) as cm:109clus = self.manager.get_cluster('bad id')110111assert 'UUID' in cm.exception.msg, cm.exception.msg112113def test_update(self):114assert self.cluster.name.startswith('cm-test-')115116name = self.cluster.name.replace('cm-test-', 'cm-foo-')117self.cluster.update(name=name)118119clus = self.manager.get_cluster(self.cluster.id)120assert clus.name == name, clus.name121122def test_suspend_resume(self):123trues = ['1', 'on', 'true']124do_test = os.environ.get('SINGLESTOREDB_TEST_SUSPEND', '0').lower() in trues125126if not do_test:127self.skipTest(128'Suspend / resume tests skipped by default due to '129'being time consuming; set SINGLESTOREDB_TEST_SUSPEND=1 '130'to enable',131)132133assert self.cluster.state != 'Suspended', self.cluster.state134135self.cluster.suspend(wait_on_suspended=True)136assert self.cluster.state == 'Suspended', self.cluster.state137138self.cluster.resume(wait_on_resumed=True)139assert self.cluster.state == 'Active', self.cluster.state140141def test_no_manager(self):142clus = self.manager.get_cluster(self.cluster.id)143clus._manager = None144145with self.assertRaises(s2.ManagementError) as cm:146clus.refresh()147148assert 'No cluster manager' in cm.exception.msg, cm.exception.msg149150with self.assertRaises(s2.ManagementError) as cm:151clus.update()152153assert 'No cluster manager' in cm.exception.msg, cm.exception.msg154155with self.assertRaises(s2.ManagementError) as cm:156clus.suspend()157158assert 'No cluster manager' in cm.exception.msg, cm.exception.msg159160with self.assertRaises(s2.ManagementError) as cm:161clus.resume()162163assert 'No cluster manager' in cm.exception.msg, cm.exception.msg164165with self.assertRaises(s2.ManagementError) as cm:166clus.terminate()167168assert 'No cluster manager' in cm.exception.msg, cm.exception.msg169170def test_connect(self):171trues = ['1', 'on', 'true']172pure_python = os.environ.get('SINGLESTOREDB_PURE_PYTHON', '0').lower() in trues173174self.skipTest('Connection test is disable due to flakey server')175176if pure_python:177self.skipTest('Connections through managed service are disabled')178179try:180with self.cluster.connect(user='admin', password=self.password) as conn:181with conn.cursor() as cur:182cur.execute('show databases')183assert 'cluster' in [x[0] for x in list(cur)]184except s2.ManagementError as exc:185if 'endpoint has not been set' not in str(exc):186self.skipTest('No endpoint in response. Skipping connection test.')187188# Test missing endpoint189clus = self.manager.get_cluster(self.cluster.id)190clus.endpoint = None191192with self.assertRaises(s2.ManagementError) as cm:193clus.connect(user='admin', password=self.password)194195assert 'endpoint' in cm.exception.msg, cm.exception.msg196197198@pytest.mark.management199class TestWorkspace(unittest.TestCase):200201manager = None202workspace_group = None203workspace = None204password = None205206@classmethod207def setUpClass(cls):208cls.manager = s2.manage_workspaces()209210us_regions = [x for x in cls.manager.regions if 'US' in x.name]211cls.password = secrets.token_urlsafe(20) + '-x&$'212213name = clean_name(secrets.token_urlsafe(20)[:20])214215cls.workspace_group = cls.manager.create_workspace_group(216f'wg-test-{name}',217region=random.choice(us_regions).id,218admin_password=cls.password,219firewall_ranges=['0.0.0.0/0'],220)221222try:223cls.workspace = cls.workspace_group.create_workspace(224f'ws-test-{name}-x',225wait_on_active=True,226)227except Exception:228cls.workspace_group.terminate(force=True)229raise230231@classmethod232def tearDownClass(cls):233if cls.workspace_group is not None:234cls.workspace_group.terminate(force=True)235cls.workspace_group = None236cls.workspace = None237cls.manager = None238cls.password = None239240def test_str(self):241assert self.workspace.name in str(self.workspace.name)242assert self.workspace_group.name in str(self.workspace_group.name)243244def test_repr(self):245assert repr(self.workspace) == str(self.workspace)246assert repr(self.workspace_group) == str(self.workspace_group)247248def test_region_str(self):249s = str(self.workspace_group.region)250assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s251252def test_region_repr(self):253assert repr(self.workspace_group.region) == str(self.workspace_group.region)254255def test_regions(self):256out = self.manager.regions257providers = {x.provider for x in out}258names = [x.name for x in out]259assert 'Azure' in providers, providers260assert 'GCP' in providers, providers261assert 'AWS' in providers, providers262263objs = {}264ids = []265for item in out:266ids.append(item.id)267objs[item.id] = item268if item.name not in objs:269objs[item.name] = item270271name = random.choice(names)272assert out[name] == objs[name]273id = random.choice(ids)274assert out[id] == objs[id]275276def test_workspace_groups(self):277workspace_groups = self.manager.workspace_groups278ids = [x.id for x in workspace_groups]279names = [x.name for x in workspace_groups]280assert self.workspace_group.id in ids281assert self.workspace_group.name in names282283assert workspace_groups.ids() == ids284assert workspace_groups.names() == names285286objs = {}287for item in workspace_groups:288objs[item.id] = item289objs[item.name] = item290291name = random.choice(names)292assert workspace_groups[name] == objs[name]293id = random.choice(ids)294assert workspace_groups[id] == objs[id]295296def test_workspaces(self):297spaces = self.workspace_group.workspaces298ids = [x.id for x in spaces]299names = [x.name for x in spaces]300assert self.workspace.id in ids301assert self.workspace.name in names302303assert spaces.ids() == ids304assert spaces.names() == names305306objs = {}307for item in spaces:308objs[item.id] = item309objs[item.name] = item310311name = random.choice(names)312assert spaces[name] == objs[name]313id = random.choice(ids)314assert spaces[id] == objs[id]315316def test_get_workspace_group(self):317group = self.manager.get_workspace_group(self.workspace_group.id)318assert group.id == self.workspace_group.id, group.id319320with self.assertRaises(s2.ManagementError) as cm:321group = self.manager.get_workspace_group('bad id')322323assert 'UUID' in cm.exception.msg, cm.exception.msg324325def test_get_workspace(self):326space = self.manager.get_workspace(self.workspace.id)327assert space.id == self.workspace.id, space.id328329with self.assertRaises(s2.ManagementError) as cm:330space = self.manager.get_workspace('bad id')331332assert 'UUID' in cm.exception.msg, cm.exception.msg333334def test_update(self):335assert self.workspace_group.name.startswith('wg-test-')336337name = self.workspace_group.name.replace('wg-test-', 'wg-foo-')338self.workspace_group.update(name=name)339340group = self.manager.get_workspace_group(self.workspace_group.id)341assert group.name == name, group.name342343def test_no_manager(self):344space = self.manager.get_workspace(self.workspace.id)345space._manager = None346347with self.assertRaises(s2.ManagementError) as cm:348space.refresh()349350assert 'No workspace manager' in cm.exception.msg, cm.exception.msg351352with self.assertRaises(s2.ManagementError) as cm:353space.terminate()354355assert 'No workspace manager' in cm.exception.msg, cm.exception.msg356357def test_connect(self):358with self.workspace.connect(user='admin', password=self.password) as conn:359with conn.cursor() as cur:360cur.execute('show databases')361assert 'cluster' in [x[0] for x in list(cur)]362363# Test missing endpoint364space = self.manager.get_workspace(self.workspace.id)365space.endpoint = None366367with self.assertRaises(s2.ManagementError) as cm:368space.connect(user='admin', password=self.password)369370assert 'endpoint' in cm.exception.msg, cm.exception.msg371372373@pytest.mark.management374class TestStarterWorkspace(unittest.TestCase):375376manager = None377starter_workspace = None378379@classmethod380def setUpClass(cls):381cls.manager = s2.manage_workspaces()382383shared_tier_regions: NamedList[Region] = [384x for x in cls.manager.shared_tier_regions if 'US' in x.name385]386cls.starter_username = 'starter_user'387cls.password = secrets.token_urlsafe(20)388389name = shared_database_name(secrets.token_urlsafe(20)[:20])390391cls.database_name = f'starter_db_{name}'392393shared_tier_region: Region = random.choice(shared_tier_regions)394395if not shared_tier_region:396raise ValueError('No shared tier regions found')397398cls.starter_workspace = cls.manager.create_starter_workspace(399f'starter-ws-test-{name}',400database_name=cls.database_name,401provider=shared_tier_region.provider,402region_name=shared_tier_region.region_name,403)404405cls.starter_workspace.create_user(406username=cls.starter_username,407password=cls.password,408)409410@classmethod411def tearDownClass(cls):412if cls.starter_workspace is not None:413cls.starter_workspace.terminate()414cls.manager = None415cls.password = None416417def test_str(self):418assert self.starter_workspace.name in str(self.starter_workspace.name)419420def test_repr(self):421assert repr(self.starter_workspace) == str(self.starter_workspace)422423def test_get_starter_workspace(self):424workspace = self.manager.get_starter_workspace(self.starter_workspace.id)425assert workspace.id == self.starter_workspace.id, workspace.id426427with self.assertRaises(s2.ManagementError) as cm:428workspace = self.manager.get_starter_workspace('bad id')429430assert 'UUID' in cm.exception.msg, cm.exception.msg431432def test_starter_workspaces(self):433workspaces = self.manager.starter_workspaces434ids = [x.id for x in workspaces]435names = [x.name for x in workspaces]436assert self.starter_workspace.id in ids437assert self.starter_workspace.name in names438439objs = {}440for item in workspaces:441objs[item.id] = item442objs[item.name] = item443444name = random.choice(names)445assert workspaces[name] == objs[name]446id = random.choice(ids)447assert workspaces[id] == objs[id]448449def test_no_manager(self):450workspace = self.manager.get_starter_workspace(self.starter_workspace.id)451workspace._manager = None452453with self.assertRaises(s2.ManagementError) as cm:454workspace.refresh()455456assert 'No workspace manager' in cm.exception.msg, cm.exception.msg457458with self.assertRaises(s2.ManagementError) as cm:459workspace.terminate()460461assert 'No workspace manager' in cm.exception.msg, cm.exception.msg462463def test_connect(self):464with self.starter_workspace.connect(465user=self.starter_username,466password=self.password,467) as conn:468with conn.cursor() as cur:469cur.execute('show databases')470assert self.database_name in [x[0] for x in list(cur)]471472# Test missing endpoint473workspace = self.manager.get_starter_workspace(self.starter_workspace.id)474workspace.endpoint = None475476with self.assertRaises(s2.ManagementError) as cm:477workspace.connect(user=self.starter_username, password=self.password)478479assert 'endpoint' in cm.exception.msg, cm.exception.msg480481482@pytest.mark.management483class TestStage(unittest.TestCase):484485manager = None486wg = None487password = None488489@classmethod490def setUpClass(cls):491cls.manager = s2.manage_workspaces()492493us_regions = [x for x in cls.manager.regions if 'US' in x.name]494cls.password = secrets.token_urlsafe(20) + '-x&$'495496name = clean_name(secrets.token_urlsafe(20)[:20])497498cls.wg = cls.manager.create_workspace_group(499f'wg-test-{name}',500region=random.choice(us_regions).id,501admin_password=cls.password,502firewall_ranges=['0.0.0.0/0'],503)504505@classmethod506def tearDownClass(cls):507if cls.wg is not None:508cls.wg.terminate(force=True)509cls.wg = None510cls.manager = None511cls.password = None512513def test_upload_file(self):514st = self.wg.stage515516upload_test_sql = f'upload_test_{id(self)}.sql'517upload_test2_sql = f'upload_test2_{id(self)}.sql'518519root = st.info('/')520assert str(root.path) == '/'521assert root.type == 'directory'522523# Upload file524f = st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)525assert str(f.path) == upload_test_sql526assert f.type == 'file'527528# Download and compare to original529txt = f.download(encoding='utf-8')530assert txt == open(TEST_DIR / 'test.sql').read()531532# Make sure we can't overwrite533with self.assertRaises(OSError):534st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)535536# Force overwrite with new content; use file object this time537f = st.upload_file(538open(TEST_DIR / 'test2.sql', 'r'),539upload_test_sql,540overwrite=True,541)542assert str(f.path) == upload_test_sql543assert f.type == 'file'544545# Verify new content546txt = f.download(encoding='utf-8')547assert txt == open(TEST_DIR / 'test2.sql').read()548549# Try to upload folder550with self.assertRaises(IsADirectoryError):551st.upload_file(TEST_DIR, 'test3.sql')552553lib = st.mkdir('/lib/')554assert str(lib.path) == 'lib/'555assert lib.type == 'directory'556557# Try to overwrite stage folder with file558with self.assertRaises(IsADirectoryError):559st.upload_file(TEST_DIR / 'test2.sql', lib.path, overwrite=True)560561# Write file into folder562f = st.upload_file(563TEST_DIR / 'test2.sql',564os.path.join(lib.path, upload_test2_sql),565)566assert str(f.path) == 'lib/' + upload_test2_sql567assert f.type == 'file'568569def test_open(self):570st = self.wg.stage571572open_test_sql = f'open_test_{id(self)}.sql'573574# See if error is raised for non-existent file575with self.assertRaises(s2.ManagementError):576st.open(open_test_sql, 'r')577578# Load test file579st.upload_file(TEST_DIR / 'test.sql', open_test_sql)580581# Read file using `open`582with st.open(open_test_sql, 'r') as rfile:583assert rfile.read() == open(TEST_DIR / 'test.sql').read()584585# Read file using `open` with 'rt' mode586with st.open(open_test_sql, 'rt') as rfile:587assert rfile.read() == open(TEST_DIR / 'test.sql').read()588589# Read file using `open` with 'rb' mode590with st.open(open_test_sql, 'rb') as rfile:591assert rfile.read() == open(TEST_DIR / 'test.sql', 'rb').read()592593# Read file using `open` with 'rb' mode594with self.assertRaises(ValueError):595with st.open(open_test_sql, 'b') as rfile:596pass597598# Attempt overwrite file using `open` with mode 'x'599with self.assertRaises(OSError):600with st.open(open_test_sql, 'x') as wfile:601pass602603# Attempt overwrite file using `open` with mode 'w'604with st.open(open_test_sql, 'w') as wfile:605wfile.write(open(TEST_DIR / 'test2.sql').read())606607txt = st.download_file(open_test_sql, encoding='utf-8')608609assert txt == open(TEST_DIR / 'test2.sql').read()610611open_raw_test_sql = f'open_raw_test_{id(self)}.sql'612613# Test writer without context manager614wfile = st.open(open_raw_test_sql, 'w')615for line in open(TEST_DIR / 'test.sql'):616wfile.write(line)617wfile.close()618619txt = st.download_file(open_raw_test_sql, encoding='utf-8')620621assert txt == open(TEST_DIR / 'test.sql').read()622623# Test reader without context manager624rfile = st.open(open_raw_test_sql, 'r')625txt = ''626for line in rfile:627txt += line628rfile.close()629630assert txt == open(TEST_DIR / 'test.sql').read()631632def test_obj_open(self):633st = self.wg.stage634635obj_open_test_sql = f'obj_open_test_{id(self)}.sql'636obj_open_dir = f'obj_open_dir_{id(self)}'637638# Load test file639f = st.upload_file(TEST_DIR / 'test.sql', obj_open_test_sql)640641# Read file using `open`642with f.open() as rfile:643assert rfile.read() == open(TEST_DIR / 'test.sql').read()644645# Make sure directories error out646d = st.mkdir(obj_open_dir)647with self.assertRaises(IsADirectoryError):648d.open()649650# Write file using `open`651with f.open('w', encoding='utf-8') as wfile:652wfile.write(open(TEST_DIR / 'test2.sql').read())653654assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.sql').read()655656# Test writer without context manager657wfile = f.open('w')658for line in open(TEST_DIR / 'test.sql'):659wfile.write(line)660wfile.close()661662txt = st.download_file(f.path, encoding='utf-8')663664assert txt == open(TEST_DIR / 'test.sql').read()665666# Test reader without context manager667rfile = f.open('r')668txt = ''669for line in rfile:670txt += line671rfile.close()672673assert txt == open(TEST_DIR / 'test.sql').read()674675def test_os_directories(self):676st = self.wg.stage677678mkdir_test_1 = f'mkdir_test_1_{id(self)}'679mkdir_test_2 = f'mkdir_test_2_{id(self)}'680mkdir_test_3 = f'mkdir_test_3_{id(self)}'681682# mkdir683st.mkdir(mkdir_test_1)684st.mkdir(mkdir_test_2)685with self.assertRaises(s2.ManagementError):686st.mkdir(f'{mkdir_test_2}/nest_1/nest_2')687st.mkdir(f'{mkdir_test_2}/nest_1')688st.mkdir(f'{mkdir_test_2}/nest_1/nest_2')689st.mkdir(f'{mkdir_test_3}')690691assert st.exists(f'{mkdir_test_1}/')692assert st.exists(f'{mkdir_test_2}/')693assert st.exists(f'{mkdir_test_2}/nest_1/')694assert st.exists(f'{mkdir_test_2}/nest_1/nest_2/')695assert not st.exists('foo/')696assert not st.exists('foo/bar/')697698assert st.is_dir(f'{mkdir_test_1}/')699assert st.is_dir(f'{mkdir_test_2}/')700assert st.is_dir(f'{mkdir_test_2}/nest_1/')701assert st.is_dir(f'{mkdir_test_2}/nest_1/nest_2/')702703assert not st.is_file(f'{mkdir_test_1}/')704assert not st.is_file(f'{mkdir_test_2}/')705assert not st.is_file(f'{mkdir_test_2}/nest_1/')706assert not st.is_file(f'{mkdir_test_2}/nest_1/nest_2/')707708out = st.listdir('/')709assert f'{mkdir_test_1}/' in out710assert f'{mkdir_test_2}/' in out711assert f'{mkdir_test_2}/nest_1/nest_2/' not in out712713out = st.listdir('/', recursive=True)714assert f'{mkdir_test_1}/' in out715assert f'{mkdir_test_2}/' in out716assert f'{mkdir_test_2}/nest_1/nest_2/' in out717718out = st.listdir(mkdir_test_2)719assert f'{mkdir_test_1}/' not in out720assert 'nest_1/' in out721assert 'nest_2/' not in out722assert 'nest_1/nest_2/' not in out723724out = st.listdir(mkdir_test_2, recursive=True)725assert f'{mkdir_test_1}/' not in out726assert 'nest_1/' in out727assert 'nest_2/' not in out728assert 'nest_1/nest_2/' in out729730# rmdir731before = st.listdir('/', recursive=True)732st.rmdir(f'{mkdir_test_1}/')733after = st.listdir('/', recursive=True)734assert f'{mkdir_test_1}/' in before735assert f'{mkdir_test_1}/' not in after736assert list(sorted(before)) == list(sorted(after + [f'{mkdir_test_1}/']))737738with self.assertRaises(OSError):739st.rmdir(f'{mkdir_test_2}/')740741mkdir_test_sql = f'mkdir_test_{id(self)}.sql'742743st.upload_file(TEST_DIR / 'test.sql', mkdir_test_sql)744745with self.assertRaises(NotADirectoryError):746st.rmdir(mkdir_test_sql)747748# removedirs749before = st.listdir('/')750st.removedirs(f'{mkdir_test_2}/')751after = st.listdir('/')752assert f'{mkdir_test_2}/' in before753assert f'{mkdir_test_2}/' not in after754assert list(sorted(before)) == list(sorted(after + [f'{mkdir_test_2}/']))755756with self.assertRaises(s2.ManagementError):757st.removedirs(mkdir_test_sql)758759def test_listdir_return_objects(self):760st = self.wg.stage761762listdir_test_dir = f'listdir_test_{id(self)}'763listdir_test_sql = f'listdir_test_{id(self)}.sql'764765# Create test directory structure766st.mkdir(listdir_test_dir)767st.mkdir(f'{listdir_test_dir}/nest_1')768st.upload_file(TEST_DIR / 'test.sql', listdir_test_sql)769st.upload_file(770TEST_DIR / 'test.sql',771f'{listdir_test_dir}/nested_test.sql',772)773774# Test return_objects=False (default behavior)775out = st.listdir('/')776assert isinstance(out, list)777assert all(isinstance(item, str) for item in out)778assert f'{listdir_test_dir}/' in out779assert listdir_test_sql in out780781# Test return_objects=True782out_objs = st.listdir('/', return_objects=True)783assert isinstance(out_objs, list)784assert all(hasattr(item, 'path') for item in out_objs)785assert all(hasattr(item, 'type') for item in out_objs)786787# Verify we have the expected items788obj_paths = [obj.path for obj in out_objs]789assert f'{listdir_test_dir}/' in obj_paths790assert listdir_test_sql in obj_paths791792# Verify object types793for obj in out_objs:794if obj.path == f'{listdir_test_dir}/':795assert obj.type == 'directory'796elif obj.path == listdir_test_sql:797assert obj.type == 'file'798799# Test with subdirectory and return_objects=True800out_objs_sub = st.listdir(listdir_test_dir, return_objects=True)801assert isinstance(out_objs_sub, list)802obj_paths_sub = [obj.path for obj in out_objs_sub]803assert 'nest_1/' in obj_paths_sub804assert 'nested_test.sql' in obj_paths_sub805806# Test recursive with return_objects=True807out_objs_rec = st.listdir('/', recursive=True, return_objects=True)808obj_paths_rec = [obj.path for obj in out_objs_rec]809assert f'{listdir_test_dir}/' in obj_paths_rec810assert f'{listdir_test_dir}/nest_1/' in obj_paths_rec811assert f'{listdir_test_dir}/nested_test.sql' in obj_paths_rec812813def test_os_files(self):814st = self.wg.stage815816files_test_sql = f'files_test_{id(self)}.sql'817files_test_1_dir = f'files_test_1_{id(self)}'818819st.mkdir(files_test_1_dir)820st.mkdir(f'{files_test_1_dir}/nest_1')821822st.upload_file(TEST_DIR / 'test.sql', files_test_sql)823st.upload_file(824TEST_DIR / 'test.sql',825f'{files_test_1_dir}/nest_1/nested_files_test.sql',826)827st.upload_file(828TEST_DIR / 'test.sql',829f'{files_test_1_dir}/nest_1/nested_files_test_2.sql',830)831832# remove833with self.assertRaises(IsADirectoryError):834st.remove(f'{files_test_1_dir}/')835836before = st.listdir('/')837st.remove(files_test_sql)838after = st.listdir('/')839assert files_test_sql in before840assert files_test_sql not in after841assert list(sorted(before)) == list(sorted(after + [files_test_sql]))842843before = st.listdir(f'{files_test_1_dir}/nest_1/')844st.remove(f'{files_test_1_dir}/nest_1/nested_files_test.sql')845after = st.listdir(f'{files_test_1_dir}/nest_1/')846assert 'nested_files_test.sql' in before847assert 'nested_files_test.sql' not in after848assert st.is_dir(f'{files_test_1_dir}/nest_1/')849850# Removing the last file does not remove empty directories851st.remove(f'{files_test_1_dir}/nest_1/nested_files_test_2.sql')852assert not st.is_file(f'{files_test_1_dir}/nest_1/nested_files_test_2.sql')853assert st.is_dir(f'{files_test_1_dir}/nest_1/')854assert st.is_dir(f'{files_test_1_dir}/')855856st.removedirs(files_test_1_dir)857assert not st.is_dir(f'{files_test_1_dir}/nest_1/')858assert not st.is_dir(f'{files_test_1_dir}/')859860def test_os_rename(self):861st = self.wg.stage862863rename_test_sql = f'rename_test_{id(self)}.sql'864rename_test_2_sql = f'rename_test_2_{id(self)}.sql'865rename_test_1_dir = f'rename_test_1_{id(self)}'866rename_test_2_dir = f'rename_test_2_{id(self)}'867868st.upload_file(TEST_DIR / 'test.sql', rename_test_sql)869870with self.assertRaises(s2.ManagementError):871st.upload_file(872TEST_DIR / 'test.sql',873f'{rename_test_1_dir}/nest_1/nested_rename_test.sql',874)875876st.mkdir(rename_test_1_dir)877st.mkdir(f'{rename_test_1_dir}/nest_1')878879assert st.exists(f'/{rename_test_1_dir}/nest_1/')880881st.upload_file(882TEST_DIR / 'test.sql',883f'{rename_test_1_dir}/nest_1/nested_rename_test.sql',884)885886st.upload_file(887TEST_DIR / 'test.sql',888f'{rename_test_1_dir}/nest_1/nested_rename_test_2.sql',889)890891# rename file892assert rename_test_sql in st.listdir('/')893assert rename_test_2_sql not in st.listdir('/')894st.rename(rename_test_sql, rename_test_2_sql)895assert rename_test_sql not in st.listdir('/')896assert rename_test_2_sql in st.listdir('/')897898# rename directory899assert f'{rename_test_1_dir}/' in st.listdir('/')900assert f'{rename_test_2_dir}/' not in st.listdir('/')901st.rename(f'{rename_test_1_dir}/', f'{rename_test_2_dir}/')902assert f'{rename_test_1_dir}/' not in st.listdir('/')903assert f'{rename_test_2_dir}/' in st.listdir('/')904assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test.sql')905assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_2.sql')906907# rename nested908assert f'{rename_test_2_dir}/nest_1/nested_rename_test.sql' in st.listdir(909'/', recursive=True,910)911assert f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql' not in st.listdir(912'/', recursive=True,913)914st.rename(915f'{rename_test_2_dir}/nest_1/nested_rename_test.sql',916f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql',917)918assert f'{rename_test_2_dir}/nest_1/nested_rename_test.sql' not in st.listdir(919'/', recursive=True,920)921assert f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql' in st.listdir(922'/', recursive=True,923)924assert not st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test.sql')925assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_2.sql')926assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql')927928# non-existent file929with self.assertRaises(OSError):930st.rename('rename_foo.sql', 'rename_foo_2.sql')931932# overwrite933with self.assertRaises(OSError):934st.rename(935rename_test_2_sql,936f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql',937)938939st.rename(940rename_test_2_sql,941f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql', overwrite=True,942)943944def test_file_object(self):945st = self.wg.stage946947obj_test_dir = f'obj_test_{id(self)}'948949st.mkdir(obj_test_dir)950st.mkdir(f'{obj_test_dir}/nest_1')951952obj_test_sql = f'obj_test_{id(self)}.sql'953obj_test_2_sql = f'obj_test_2_{id(self)}.sql'954955f1 = st.upload_file(TEST_DIR / 'test.sql', obj_test_sql)956f2 = st.upload_file(957TEST_DIR / 'test.sql',958f'{obj_test_dir}/nest_1/{obj_test_sql}',959)960d2 = st.info(f'{obj_test_dir}/nest_1/')961962# is_file / is_dir963assert not f1.is_dir()964assert f1.is_file()965assert not f2.is_dir()966assert f2.is_file()967assert d2.is_dir()968assert not d2.is_file()969970# abspath / basename / dirname / exists971assert f1.abspath() == obj_test_sql972assert f1.basename() == obj_test_sql973assert f1.dirname() == '/'974assert f1.exists()975assert f2.abspath() == f'{obj_test_dir}/nest_1/{obj_test_sql}'976assert f2.basename() == obj_test_sql977assert f2.dirname() == f'{obj_test_dir}/nest_1/'978assert f2.exists()979assert d2.abspath() == f'{obj_test_dir}/nest_1/'980assert d2.basename() == 'nest_1'981assert d2.dirname() == f'{obj_test_dir}/'982assert d2.exists()983984# download985assert f1.download(encoding='utf-8') == open(TEST_DIR / 'test.sql', 'r').read()986assert f1.download() == open(TEST_DIR / 'test.sql', 'rb').read()987988# remove989with self.assertRaises(IsADirectoryError):990d2.remove()991992assert st.is_file(obj_test_sql)993f1.remove()994assert not st.is_file(obj_test_sql)995996# removedirs997with self.assertRaises(NotADirectoryError):998f2.removedirs()9991000assert st.exists(d2.path)1001d2.removedirs()1002assert not st.exists(d2.path)10031004# rmdir1005f1 = st.upload_file(TEST_DIR / 'test.sql', obj_test_sql)1006d2 = st.mkdir(f'{obj_test_dir}/nest_1')10071008assert st.exists(f1.path)1009assert st.exists(d2.path)10101011with self.assertRaises(NotADirectoryError):1012f1.rmdir()10131014assert st.exists(f1.path)1015assert st.exists(d2.path)10161017d2.rmdir()10181019assert not st.exists(f'{obj_test_dir}/nest_1/')1020assert not st.exists(obj_test_dir)10211022# mtime / ctime1023assert f1.getmtime() > 01024assert f1.getctime() > 010251026# rename1027assert st.exists(obj_test_sql)1028assert not st.exists(obj_test_2_sql)1029f1.rename(obj_test_2_sql)1030assert not st.exists(obj_test_sql)1031assert st.exists(obj_test_2_sql)1032assert f1.abspath() == obj_test_2_sql103310341035@pytest.mark.management1036class TestSecrets(unittest.TestCase):10371038manager = None1039wg = None1040password = None10411042@classmethod1043def setUpClass(cls):1044cls.manager = s2.manage_workspaces()10451046us_regions = [x for x in cls.manager.regions if 'US' in x.name]1047cls.password = secrets.token_urlsafe(20) + '-x&$'10481049name = clean_name(secrets.token_urlsafe(20)[:20])10501051cls.wg = cls.manager.create_workspace_group(1052f'wg-test-{name}',1053region=random.choice(us_regions).id,1054admin_password=cls.password,1055firewall_ranges=['0.0.0.0/0'],1056)10571058@classmethod1059def tearDownClass(cls):1060if cls.wg is not None:1061cls.wg.terminate(force=True)1062cls.wg = None1063cls.manager = None1064cls.password = None10651066def test_get_secret(self):1067# manually create secret and then get secret1068# try to delete the secret if it exists1069try:1070secret = self.manager.organizations.current.get_secret('secret_name')10711072secret_id = secret.id10731074self.manager._delete(f'secrets/{secret_id}')1075except s2.ManagementError:1076pass10771078self.manager._post(1079'secrets',1080json=dict(1081name='secret_name',1082value='secret_value',1083),1084)10851086secret = self.manager.organizations.current.get_secret('secret_name')10871088assert secret.name == 'secret_name'1089assert secret.value == 'secret_value'109010911092@pytest.mark.management1093class TestJob(unittest.TestCase):10941095manager = None1096workspace_group = None1097workspace = None1098password = None1099job_ids = []11001101@classmethod1102def setUpClass(cls):1103cls.manager = s2.manage_workspaces()11041105us_regions = [x for x in cls.manager.regions if 'US' in x.name]1106cls.password = secrets.token_urlsafe(20) + '-x&$'11071108name = clean_name(secrets.token_urlsafe(20)[:20])11091110cls.workspace_group = cls.manager.create_workspace_group(1111f'wg-test-{name}',1112region=random.choice(us_regions).id,1113admin_password=cls.password,1114firewall_ranges=['0.0.0.0/0'],1115)11161117try:1118cls.workspace = cls.workspace_group.create_workspace(1119f'ws-test-{name}-x',1120wait_on_active=True,1121)1122except Exception:1123cls.workspace_group.terminate(force=True)1124raise11251126@classmethod1127def tearDownClass(cls):1128for job_id in cls.job_ids:1129try:1130cls.manager.organizations.current.jobs.delete(job_id)1131except Exception:1132pass1133if cls.workspace_group is not None:1134cls.workspace_group.terminate(force=True)1135cls.workspace_group = None1136cls.workspace = None1137cls.manager = None1138cls.password = None1139if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:1140del os.environ['SINGLESTOREDB_WORKSPACE']1141if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:1142del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']11431144def test_job_without_database_target(self):1145"""1146Creates job without target database on a specific runtime1147Waits for job to finish1148Gets the job1149Deletes the job1150"""1151if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:1152del os.environ['SINGLESTOREDB_WORKSPACE']1153if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:1154del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']11551156job_manager = self.manager.organizations.current.jobs1157job = job_manager.run(1158'Scheduling Test.ipynb',1159'notebooks-cpu-small',1160{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},1161)1162self.job_ids.append(job.job_id)1163assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'1164assert job.schedule.mode == job_manager.modes().ONCE1165assert not job.execution_config.create_snapshot1166assert job.completed_executions_count == 01167assert job.name is None1168assert job.description is None1169assert job.job_metadata == []1170assert job.terminated_at is None1171assert job.target_config is None1172job.wait()1173job = job_manager.get(job.job_id)1174assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'1175assert job.schedule.mode == job_manager.modes().ONCE1176assert not job.execution_config.create_snapshot1177assert job.completed_executions_count == 11178assert job.name is None1179assert job.description is None1180assert job.job_metadata != []1181assert len(job.job_metadata) == 11182assert job.job_metadata[0].count == 11183assert job.job_metadata[0].status == Status.COMPLETED1184assert job.terminated_at is None1185assert job.target_config is None1186deleted = job.delete()1187assert deleted1188job = job_manager.get(job.job_id)1189assert job.terminated_at is not None11901191def test_job_with_database_target(self):1192"""1193Creates job with target database on a specific runtime1194Waits for job to finish1195Gets the job1196Deletes the job1197"""1198os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'1199os.environ['SINGLESTOREDB_WORKSPACE'] = self.workspace.id12001201job_manager = self.manager.organizations.current.jobs1202job = job_manager.run(1203'Scheduling Test.ipynb',1204'notebooks-cpu-small',1205{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},1206)1207self.job_ids.append(job.job_id)1208assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'1209assert job.schedule.mode == job_manager.modes().ONCE1210assert not job.execution_config.create_snapshot1211assert job.completed_executions_count == 01212assert job.name is None1213assert job.description is None1214assert job.job_metadata == []1215assert job.terminated_at is None1216assert job.target_config is not None1217assert job.target_config.database_name == 'information_schema'1218assert job.target_config.target_id == self.workspace.id1219assert job.target_config.target_type == TargetType.WORKSPACE1220assert not job.target_config.resume_target1221job.wait()1222job = job_manager.get(job.job_id)1223assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'1224assert job.schedule.mode == job_manager.modes().ONCE1225assert not job.execution_config.create_snapshot1226assert job.completed_executions_count == 11227assert job.name is None1228assert job.description is None1229assert job.job_metadata != []1230assert len(job.job_metadata) == 11231assert job.job_metadata[0].count == 11232assert job.job_metadata[0].status == Status.COMPLETED1233assert job.terminated_at is None1234assert job.target_config is not None1235assert job.target_config.database_name == 'information_schema'1236assert job.target_config.target_id == self.workspace.id1237assert job.target_config.target_type == TargetType.WORKSPACE1238assert not job.target_config.resume_target1239deleted = job.delete()1240assert deleted1241job = job_manager.get(job.job_id)1242assert job.terminated_at is not None124312441245@pytest.mark.management1246class TestFileSpaces(unittest.TestCase):12471248manager = None1249personal_space = None1250shared_space = None12511252@classmethod1253def setUpClass(cls):1254cls.manager = s2.manage_files()1255cls.personal_space = cls.manager.personal_space1256cls.shared_space = cls.manager.shared_space12571258@classmethod1259def tearDownClass(cls):1260cls.manager = None1261cls.personal_space = None1262cls.shared_space = None12631264def test_upload_file(self):1265upload_test_ipynb = f'upload_test_{id(self)}.ipynb'12661267for space in [self.personal_space, self.shared_space]:1268root = space.info('/')1269assert str(root.path) == '/'1270assert root.type == 'directory'12711272# Upload files1273f = space.upload_file(1274TEST_DIR / 'test.ipynb',1275upload_test_ipynb,1276)1277assert str(f.path) == upload_test_ipynb1278assert f.type == 'notebook'12791280# Download and compare to original1281txt = f.download(encoding='utf-8')1282assert txt == open(TEST_DIR / 'test.ipynb').read()12831284# Make sure we can't overwrite1285with self.assertRaises(OSError):1286space.upload_file(1287TEST_DIR / 'test.ipynb',1288upload_test_ipynb,1289)12901291# Force overwrite with new content1292f = space.upload_file(1293TEST_DIR / 'test2.ipynb',1294upload_test_ipynb, overwrite=True,1295)1296assert str(f.path) == upload_test_ipynb1297assert f.type == 'notebook'12981299# Verify new content1300txt = f.download(encoding='utf-8')1301assert txt == open(TEST_DIR / 'test2.ipynb').read()13021303# Make sure we can't upload a folder1304with self.assertRaises(s2.ManagementError):1305space.upload_folder(TEST_DIR, 'test')13061307# Cleanup1308space.remove(upload_test_ipynb)13091310def test_upload_file_io(self):1311upload_test_ipynb = f'upload_test_{id(self)}.ipynb'13121313for space in [self.personal_space, self.shared_space]:1314root = space.info('/')1315assert str(root.path) == '/'1316assert root.type == 'directory'13171318# Upload files1319f = space.upload_file(1320open(TEST_DIR / 'test.ipynb', 'r'),1321upload_test_ipynb,1322)1323assert str(f.path) == upload_test_ipynb1324assert f.type == 'notebook'13251326# Download and compare to original1327txt = f.download(encoding='utf-8')1328assert txt == open(TEST_DIR / 'test.ipynb').read()13291330# Make sure we can't overwrite1331with self.assertRaises(OSError):1332space.upload_file(1333open(TEST_DIR / 'test.ipynb', 'r'),1334upload_test_ipynb,1335)13361337# Force overwrite with new content1338f = space.upload_file(1339open(TEST_DIR / 'test2.ipynb', 'r'),1340upload_test_ipynb, overwrite=True,1341)1342assert str(f.path) == upload_test_ipynb1343assert f.type == 'notebook'13441345# Verify new content1346txt = f.download(encoding='utf-8')1347assert txt == open(TEST_DIR / 'test2.ipynb').read()13481349# Make sure we can't upload a folder1350with self.assertRaises(s2.ManagementError):1351space.upload_folder(TEST_DIR, 'test')13521353# Cleanup1354space.remove(upload_test_ipynb)13551356def test_open(self):1357for space in [self.personal_space, self.shared_space]:1358open_test_ipynb = f'open_test_ipynb_{id(self)}.ipynb'13591360# See if error is raised for non-existent file1361with self.assertRaises(s2.ManagementError):1362space.open(open_test_ipynb, 'r')13631364# Load test file1365space.upload_file(TEST_DIR / 'test.ipynb', open_test_ipynb)13661367# Read file using `open`1368with space.open(open_test_ipynb, 'r') as rfile:1369assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()13701371# Read file using `open` with 'rt' mode1372with space.open(open_test_ipynb, 'rt') as rfile:1373assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()13741375# Read file using `open` with 'rb' mode1376with space.open(open_test_ipynb, 'rb') as rfile:1377assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read()13781379# Read file using `open` with 'rb' mode1380with self.assertRaises(ValueError):1381with space.open(open_test_ipynb, 'b') as rfile:1382pass13831384# Attempt overwrite file using `open` with mode 'x'1385with self.assertRaises(OSError):1386with space.open(open_test_ipynb, 'x') as wfile:1387pass13881389# Attempt overwrite file using `open` with mode 'w'1390with space.open(open_test_ipynb, 'w') as wfile:1391wfile.write(open(TEST_DIR / 'test2.ipynb').read())13921393txt = space.download_file(open_test_ipynb, encoding='utf-8')13941395assert txt == open(TEST_DIR / 'test2.ipynb').read()13961397open_raw_test_ipynb = f'open_raw_test_{id(self)}.ipynb'13981399# Test writer without context manager1400wfile = space.open(open_raw_test_ipynb, 'w')1401for line in open(TEST_DIR / 'test.ipynb'):1402wfile.write(line)1403wfile.close()14041405txt = space.download_file(1406open_raw_test_ipynb,1407encoding='utf-8',1408)14091410assert txt == open(TEST_DIR / 'test.ipynb').read()14111412# Test reader without context manager1413rfile = space.open(open_raw_test_ipynb, 'r')1414txt = ''1415for line in rfile:1416txt += line1417rfile.close()14181419assert txt == open(TEST_DIR / 'test.ipynb').read()14201421# Cleanup1422space.remove(open_test_ipynb)1423space.remove(open_raw_test_ipynb)14241425def test_obj_open(self):1426for space in [self.personal_space, self.shared_space]:1427obj_open_test_ipynb = f'obj_open_test_{id(self)}.ipynb'1428obj_open_dir = f'obj_open_dir_{id(self)}'14291430# Load test file1431f = space.upload_file(1432TEST_DIR / 'test.ipynb',1433obj_open_test_ipynb,1434)14351436# Read file using `open`1437with f.open() as rfile:1438assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()14391440# Make sure directories error out1441with self.assertRaises(s2.ManagementError):1442space.mkdir(obj_open_dir)14431444# Write file using `open`1445with f.open('w', encoding='utf-8') as wfile:1446wfile.write(open(TEST_DIR / 'test2.ipynb').read())14471448assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.ipynb').read()14491450# Test writer without context manager1451wfile = f.open('w')1452for line in open(TEST_DIR / 'test.ipynb'):1453wfile.write(line)1454wfile.close()14551456txt = space.download_file(f.path, encoding='utf-8')14571458assert txt == open(TEST_DIR / 'test.ipynb').read()14591460# Test reader without context manager1461rfile = f.open('r')1462txt = ''1463for line in rfile:1464txt += line1465rfile.close()14661467assert txt == open(TEST_DIR / 'test.ipynb').read()14681469# Cleanup1470space.remove(obj_open_test_ipynb)14711472def test_os_directories(self):1473mkdir_test_1_dir = f'mkdir_test_1_{id(self)}'14741475for space in [self.personal_space, self.shared_space]:1476# Make sure directories error out1477with self.assertRaises(s2.ManagementError):1478space.mkdir(mkdir_test_1_dir)14791480with self.assertRaises(s2.ManagementError):1481space.exists(f'{mkdir_test_1_dir}/')14821483out = space.listdir('/')1484assert f'{mkdir_test_1_dir}/' not in out14851486with self.assertRaises(s2.ManagementError):1487space.rmdir(f'{mkdir_test_1_dir}/')14881489def test_os_rename(self):1490rename_test_ipynb = f'rename_test_{id(self)}.ipynb'1491rename_test_2_ipynb = f'rename_test_2_{id(self)}.ipynb'1492rename_test_3_ipynb = f'rename_test_3_{id(self)}.ipynb'14931494for space in [self.personal_space, self.shared_space]:1495space.upload_file(1496TEST_DIR / 'test.ipynb',1497rename_test_ipynb,1498)1499assert rename_test_ipynb in space.listdir('/')1500assert rename_test_2_ipynb not in space.listdir('/')15011502space.rename(1503rename_test_ipynb,1504rename_test_2_ipynb,1505)1506assert rename_test_ipynb not in space.listdir('/')1507assert rename_test_2_ipynb in space.listdir('/')15081509# non-existent file1510with self.assertRaises(OSError):1511space.rename('rename_foo.ipynb', 'rename_foo_2.ipynb')15121513space.upload_file(1514TEST_DIR / 'test.ipynb',1515rename_test_3_ipynb,1516)15171518# overwrite1519with self.assertRaises(OSError):1520space.rename(1521rename_test_2_ipynb,1522rename_test_3_ipynb,1523)15241525space.rename(1526rename_test_2_ipynb,1527rename_test_3_ipynb, overwrite=True,1528)15291530# Cleanup1531space.remove(rename_test_3_ipynb)15321533def test_file_object(self):1534obj_test_ipynb = f'obj_test_{id(self)}.ipynb'1535obj_test_2_ipynb = f'obj_test_2_{id(self)}.ipynb'15361537for space in [self.personal_space, self.shared_space]:1538f = space.upload_file(1539TEST_DIR / 'test.ipynb',1540obj_test_ipynb,1541)15421543assert not f.is_dir()1544assert f.is_file()15451546# abspath / basename / dirname / exists1547assert f.abspath() == obj_test_ipynb1548assert f.basename() == obj_test_ipynb1549assert f.dirname() == '/'1550assert f.exists()15511552# download1553assert f.download(encoding='utf-8') == \1554open(TEST_DIR / 'test.ipynb', 'r').read()1555assert f.download() == open(TEST_DIR / 'test.ipynb', 'rb').read()15561557assert space.is_file(obj_test_ipynb)1558f.remove()1559assert not space.is_file(obj_test_ipynb)15601561# mtime / ctime1562assert f.getmtime() > 01563assert f.getctime() > 015641565# rename1566f = space.upload_file(1567TEST_DIR / 'test.ipynb',1568obj_test_ipynb,1569)1570assert space.exists(obj_test_ipynb)1571assert not space.exists(obj_test_2_ipynb)1572f.rename(obj_test_2_ipynb)1573assert not space.exists(obj_test_ipynb)1574assert space.exists(obj_test_2_ipynb)1575assert f.abspath() == obj_test_2_ipynb15761577# Cleanup1578space.remove(obj_test_2_ipynb)157915801581@pytest.mark.management1582class TestRegions(unittest.TestCase):1583"""Test cases for region management."""15841585manager = None15861587@classmethod1588def setUpClass(cls):1589"""Set up the test environment."""1590cls.manager = s2.manage_regions()15911592@classmethod1593def tearDownClass(cls):1594"""Clean up the test environment."""1595cls.manager = None15961597def test_list_regions(self):1598"""Test listing all regions."""1599regions = self.manager.list_regions()16001601# Verify we get a NamedList1602assert isinstance(regions, NamedList)16031604# Verify we have at least one region1605assert len(regions) > 016061607# Verify region properties1608region = regions[0]1609assert isinstance(region, Region)1610assert hasattr(region, 'id')1611assert hasattr(region, 'name')1612assert hasattr(region, 'provider')16131614# Verify provider values1615providers = {x.provider for x in regions}1616assert 'Azure' in providers or 'GCP' in providers or 'AWS' in providers16171618def test_list_shared_tier_regions(self):1619"""Test listing shared tier regions."""1620regions = self.manager.list_shared_tier_regions()16211622# Verify we get a NamedList1623assert isinstance(regions, NamedList)16241625# Verify region properties if we have any shared tier regions1626if regions:1627region = regions[0]1628assert isinstance(region, Region)1629assert hasattr(region, 'name')1630assert hasattr(region, 'provider')1631assert hasattr(region, 'region_name')16321633# Verify provider values1634providers = {x.provider for x in regions}1635assert any(p in providers for p in ['Azure', 'GCP', 'AWS'])16361637def test_str_repr(self):1638"""Test string representation of regions."""1639regions = self.manager.list_regions()1640if not regions:1641self.skipTest('No regions available for testing')16421643region = regions[0]16441645# Test __str__1646s = str(region)1647assert region.id in s1648assert region.name in s1649assert region.provider in s16501651# Test __repr__1652assert repr(region) == str(region)165316541655