Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/tests/test_management.py
469 views
1
#!/usr/bin/env python
2
# type: ignore
3
"""SingleStoreDB Management API testing."""
4
import os
5
import pathlib
6
import random
7
import re
8
import secrets
9
import unittest
10
11
import pytest
12
13
import singlestoredb as s2
14
from singlestoredb.management.job import Status
15
from singlestoredb.management.job import TargetType
16
from singlestoredb.management.region import Region
17
from singlestoredb.management.utils import NamedList
18
19
20
TEST_DIR = pathlib.Path(os.path.dirname(__file__))
21
22
23
def clean_name(s):
24
"""Change all non-word characters to -."""
25
return re.sub(r'[^\w]', r'-', s).replace('_', '-').lower()
26
27
28
def shared_database_name(s):
29
"""Return a shared database name. Cannot contain special characters except -"""
30
return re.sub(r'[^\w]', '', s).replace('-', '_').lower()
31
32
33
@pytest.mark.management
34
class TestCluster(unittest.TestCase):
35
36
manager = None
37
cluster = None
38
password = None
39
40
@classmethod
41
def setUpClass(cls):
42
cls.manager = s2.manage_cluster()
43
44
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
45
cls.password = secrets.token_urlsafe(20) + '-x&$'
46
47
cls.cluster = cls.manager.create_cluster(
48
clean_name('cm-test-{}'.format(secrets.token_urlsafe(20)[:20])),
49
region=random.choice(us_regions).id,
50
admin_password=cls.password,
51
firewall_ranges=['0.0.0.0/0'],
52
expires_at='1h',
53
size='S-00',
54
wait_on_active=True,
55
)
56
57
@classmethod
58
def tearDownClass(cls):
59
if cls.cluster is not None:
60
cls.cluster.terminate()
61
cls.cluster = None
62
cls.manager = None
63
cls.password = None
64
65
def test_str(self):
66
assert self.cluster.name in str(self.cluster.name)
67
68
def test_repr(self):
69
assert repr(self.cluster) == str(self.cluster)
70
71
def test_region_str(self):
72
s = str(self.cluster.region)
73
assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s
74
75
def test_region_repr(self):
76
assert repr(self.cluster.region) == str(self.cluster.region)
77
78
def test_regions(self):
79
out = self.manager.regions
80
providers = {x.provider for x in out}
81
names = [x.name for x in out]
82
assert 'Azure' in providers, providers
83
assert 'GCP' in providers, providers
84
assert 'AWS' in providers, providers
85
86
objs = {}
87
ids = []
88
for item in out:
89
ids.append(item.id)
90
objs[item.id] = item
91
if item.name not in objs:
92
objs[item.name] = item
93
94
name = random.choice(names)
95
assert out[name] == objs[name]
96
id = random.choice(ids)
97
assert out[id] == objs[id]
98
99
def test_clusters(self):
100
clusters = self.manager.clusters
101
ids = [x.id for x in clusters]
102
assert self.cluster.id in ids, ids
103
104
def test_get_cluster(self):
105
clus = self.manager.get_cluster(self.cluster.id)
106
assert clus.id == self.cluster.id, clus.id
107
108
with self.assertRaises(s2.ManagementError) as cm:
109
clus = self.manager.get_cluster('bad id')
110
111
assert 'UUID' in cm.exception.msg, cm.exception.msg
112
113
def test_update(self):
114
assert self.cluster.name.startswith('cm-test-')
115
116
name = self.cluster.name.replace('cm-test-', 'cm-foo-')
117
self.cluster.update(name=name)
118
119
clus = self.manager.get_cluster(self.cluster.id)
120
assert clus.name == name, clus.name
121
122
def test_suspend_resume(self):
123
trues = ['1', 'on', 'true']
124
do_test = os.environ.get('SINGLESTOREDB_TEST_SUSPEND', '0').lower() in trues
125
126
if not do_test:
127
self.skipTest(
128
'Suspend / resume tests skipped by default due to '
129
'being time consuming; set SINGLESTOREDB_TEST_SUSPEND=1 '
130
'to enable',
131
)
132
133
assert self.cluster.state != 'Suspended', self.cluster.state
134
135
self.cluster.suspend(wait_on_suspended=True)
136
assert self.cluster.state == 'Suspended', self.cluster.state
137
138
self.cluster.resume(wait_on_resumed=True)
139
assert self.cluster.state == 'Active', self.cluster.state
140
141
def test_no_manager(self):
142
clus = self.manager.get_cluster(self.cluster.id)
143
clus._manager = None
144
145
with self.assertRaises(s2.ManagementError) as cm:
146
clus.refresh()
147
148
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
149
150
with self.assertRaises(s2.ManagementError) as cm:
151
clus.update()
152
153
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
154
155
with self.assertRaises(s2.ManagementError) as cm:
156
clus.suspend()
157
158
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
159
160
with self.assertRaises(s2.ManagementError) as cm:
161
clus.resume()
162
163
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
164
165
with self.assertRaises(s2.ManagementError) as cm:
166
clus.terminate()
167
168
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
169
170
def test_connect(self):
171
trues = ['1', 'on', 'true']
172
pure_python = os.environ.get('SINGLESTOREDB_PURE_PYTHON', '0').lower() in trues
173
174
self.skipTest('Connection test is disable due to flakey server')
175
176
if pure_python:
177
self.skipTest('Connections through managed service are disabled')
178
179
try:
180
with self.cluster.connect(user='admin', password=self.password) as conn:
181
with conn.cursor() as cur:
182
cur.execute('show databases')
183
assert 'cluster' in [x[0] for x in list(cur)]
184
except s2.ManagementError as exc:
185
if 'endpoint has not been set' not in str(exc):
186
self.skipTest('No endpoint in response. Skipping connection test.')
187
188
# Test missing endpoint
189
clus = self.manager.get_cluster(self.cluster.id)
190
clus.endpoint = None
191
192
with self.assertRaises(s2.ManagementError) as cm:
193
clus.connect(user='admin', password=self.password)
194
195
assert 'endpoint' in cm.exception.msg, cm.exception.msg
196
197
198
@pytest.mark.management
199
class TestWorkspace(unittest.TestCase):
200
201
manager = None
202
workspace_group = None
203
workspace = None
204
password = None
205
206
@classmethod
207
def setUpClass(cls):
208
cls.manager = s2.manage_workspaces()
209
210
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
211
cls.password = secrets.token_urlsafe(20) + '-x&$'
212
213
name = clean_name(secrets.token_urlsafe(20)[:20])
214
215
cls.workspace_group = cls.manager.create_workspace_group(
216
f'wg-test-{name}',
217
region=random.choice(us_regions).id,
218
admin_password=cls.password,
219
firewall_ranges=['0.0.0.0/0'],
220
)
221
222
try:
223
cls.workspace = cls.workspace_group.create_workspace(
224
f'ws-test-{name}-x',
225
wait_on_active=True,
226
)
227
except Exception:
228
cls.workspace_group.terminate(force=True)
229
raise
230
231
@classmethod
232
def tearDownClass(cls):
233
if cls.workspace_group is not None:
234
cls.workspace_group.terminate(force=True)
235
cls.workspace_group = None
236
cls.workspace = None
237
cls.manager = None
238
cls.password = None
239
240
def test_str(self):
241
assert self.workspace.name in str(self.workspace.name)
242
assert self.workspace_group.name in str(self.workspace_group.name)
243
244
def test_repr(self):
245
assert repr(self.workspace) == str(self.workspace)
246
assert repr(self.workspace_group) == str(self.workspace_group)
247
248
def test_region_str(self):
249
s = str(self.workspace_group.region)
250
assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s
251
252
def test_region_repr(self):
253
assert repr(self.workspace_group.region) == str(self.workspace_group.region)
254
255
def test_regions(self):
256
out = self.manager.regions
257
providers = {x.provider for x in out}
258
names = [x.name for x in out]
259
assert 'Azure' in providers, providers
260
assert 'GCP' in providers, providers
261
assert 'AWS' in providers, providers
262
263
objs = {}
264
ids = []
265
for item in out:
266
ids.append(item.id)
267
objs[item.id] = item
268
if item.name not in objs:
269
objs[item.name] = item
270
271
name = random.choice(names)
272
assert out[name] == objs[name]
273
id = random.choice(ids)
274
assert out[id] == objs[id]
275
276
def test_workspace_groups(self):
277
workspace_groups = self.manager.workspace_groups
278
ids = [x.id for x in workspace_groups]
279
names = [x.name for x in workspace_groups]
280
assert self.workspace_group.id in ids
281
assert self.workspace_group.name in names
282
283
assert workspace_groups.ids() == ids
284
assert workspace_groups.names() == names
285
286
objs = {}
287
for item in workspace_groups:
288
objs[item.id] = item
289
objs[item.name] = item
290
291
name = random.choice(names)
292
assert workspace_groups[name] == objs[name]
293
id = random.choice(ids)
294
assert workspace_groups[id] == objs[id]
295
296
def test_workspaces(self):
297
spaces = self.workspace_group.workspaces
298
ids = [x.id for x in spaces]
299
names = [x.name for x in spaces]
300
assert self.workspace.id in ids
301
assert self.workspace.name in names
302
303
assert spaces.ids() == ids
304
assert spaces.names() == names
305
306
objs = {}
307
for item in spaces:
308
objs[item.id] = item
309
objs[item.name] = item
310
311
name = random.choice(names)
312
assert spaces[name] == objs[name]
313
id = random.choice(ids)
314
assert spaces[id] == objs[id]
315
316
def test_get_workspace_group(self):
317
group = self.manager.get_workspace_group(self.workspace_group.id)
318
assert group.id == self.workspace_group.id, group.id
319
320
with self.assertRaises(s2.ManagementError) as cm:
321
group = self.manager.get_workspace_group('bad id')
322
323
assert 'UUID' in cm.exception.msg, cm.exception.msg
324
325
def test_get_workspace(self):
326
space = self.manager.get_workspace(self.workspace.id)
327
assert space.id == self.workspace.id, space.id
328
329
with self.assertRaises(s2.ManagementError) as cm:
330
space = self.manager.get_workspace('bad id')
331
332
assert 'UUID' in cm.exception.msg, cm.exception.msg
333
334
def test_update(self):
335
assert self.workspace_group.name.startswith('wg-test-')
336
337
name = self.workspace_group.name.replace('wg-test-', 'wg-foo-')
338
self.workspace_group.update(name=name)
339
340
group = self.manager.get_workspace_group(self.workspace_group.id)
341
assert group.name == name, group.name
342
343
def test_no_manager(self):
344
space = self.manager.get_workspace(self.workspace.id)
345
space._manager = None
346
347
with self.assertRaises(s2.ManagementError) as cm:
348
space.refresh()
349
350
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
351
352
with self.assertRaises(s2.ManagementError) as cm:
353
space.terminate()
354
355
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
356
357
def test_connect(self):
358
with self.workspace.connect(user='admin', password=self.password) as conn:
359
with conn.cursor() as cur:
360
cur.execute('show databases')
361
assert 'cluster' in [x[0] for x in list(cur)]
362
363
# Test missing endpoint
364
space = self.manager.get_workspace(self.workspace.id)
365
space.endpoint = None
366
367
with self.assertRaises(s2.ManagementError) as cm:
368
space.connect(user='admin', password=self.password)
369
370
assert 'endpoint' in cm.exception.msg, cm.exception.msg
371
372
373
@pytest.mark.management
374
class TestStarterWorkspace(unittest.TestCase):
375
376
manager = None
377
starter_workspace = None
378
379
@classmethod
380
def setUpClass(cls):
381
cls.manager = s2.manage_workspaces()
382
383
shared_tier_regions: NamedList[Region] = [
384
x for x in cls.manager.shared_tier_regions if 'US' in x.name
385
]
386
cls.starter_username = 'starter_user'
387
cls.password = secrets.token_urlsafe(20)
388
389
name = shared_database_name(secrets.token_urlsafe(20)[:20])
390
391
cls.database_name = f'starter_db_{name}'
392
393
shared_tier_region: Region = random.choice(shared_tier_regions)
394
395
if not shared_tier_region:
396
raise ValueError('No shared tier regions found')
397
398
cls.starter_workspace = cls.manager.create_starter_workspace(
399
f'starter-ws-test-{name}',
400
database_name=cls.database_name,
401
provider=shared_tier_region.provider,
402
region_name=shared_tier_region.region_name,
403
)
404
405
cls.starter_workspace.create_user(
406
username=cls.starter_username,
407
password=cls.password,
408
)
409
410
@classmethod
411
def tearDownClass(cls):
412
if cls.starter_workspace is not None:
413
cls.starter_workspace.terminate()
414
cls.manager = None
415
cls.password = None
416
417
def test_str(self):
418
assert self.starter_workspace.name in str(self.starter_workspace.name)
419
420
def test_repr(self):
421
assert repr(self.starter_workspace) == str(self.starter_workspace)
422
423
def test_get_starter_workspace(self):
424
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
425
assert workspace.id == self.starter_workspace.id, workspace.id
426
427
with self.assertRaises(s2.ManagementError) as cm:
428
workspace = self.manager.get_starter_workspace('bad id')
429
430
assert 'UUID' in cm.exception.msg, cm.exception.msg
431
432
def test_starter_workspaces(self):
433
workspaces = self.manager.starter_workspaces
434
ids = [x.id for x in workspaces]
435
names = [x.name for x in workspaces]
436
assert self.starter_workspace.id in ids
437
assert self.starter_workspace.name in names
438
439
objs = {}
440
for item in workspaces:
441
objs[item.id] = item
442
objs[item.name] = item
443
444
name = random.choice(names)
445
assert workspaces[name] == objs[name]
446
id = random.choice(ids)
447
assert workspaces[id] == objs[id]
448
449
def test_no_manager(self):
450
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
451
workspace._manager = None
452
453
with self.assertRaises(s2.ManagementError) as cm:
454
workspace.refresh()
455
456
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
457
458
with self.assertRaises(s2.ManagementError) as cm:
459
workspace.terminate()
460
461
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
462
463
def test_connect(self):
464
with self.starter_workspace.connect(
465
user=self.starter_username,
466
password=self.password,
467
) as conn:
468
with conn.cursor() as cur:
469
cur.execute('show databases')
470
assert self.database_name in [x[0] for x in list(cur)]
471
472
# Test missing endpoint
473
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
474
workspace.endpoint = None
475
476
with self.assertRaises(s2.ManagementError) as cm:
477
workspace.connect(user=self.starter_username, password=self.password)
478
479
assert 'endpoint' in cm.exception.msg, cm.exception.msg
480
481
482
@pytest.mark.management
483
class TestStage(unittest.TestCase):
484
485
manager = None
486
wg = None
487
password = None
488
489
@classmethod
490
def setUpClass(cls):
491
cls.manager = s2.manage_workspaces()
492
493
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
494
cls.password = secrets.token_urlsafe(20) + '-x&$'
495
496
name = clean_name(secrets.token_urlsafe(20)[:20])
497
498
cls.wg = cls.manager.create_workspace_group(
499
f'wg-test-{name}',
500
region=random.choice(us_regions).id,
501
admin_password=cls.password,
502
firewall_ranges=['0.0.0.0/0'],
503
)
504
505
@classmethod
506
def tearDownClass(cls):
507
if cls.wg is not None:
508
cls.wg.terminate(force=True)
509
cls.wg = None
510
cls.manager = None
511
cls.password = None
512
513
def test_upload_file(self):
514
st = self.wg.stage
515
516
upload_test_sql = f'upload_test_{id(self)}.sql'
517
upload_test2_sql = f'upload_test2_{id(self)}.sql'
518
519
root = st.info('/')
520
assert str(root.path) == '/'
521
assert root.type == 'directory'
522
523
# Upload file
524
f = st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)
525
assert str(f.path) == upload_test_sql
526
assert f.type == 'file'
527
528
# Download and compare to original
529
txt = f.download(encoding='utf-8')
530
assert txt == open(TEST_DIR / 'test.sql').read()
531
532
# Make sure we can't overwrite
533
with self.assertRaises(OSError):
534
st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)
535
536
# Force overwrite with new content; use file object this time
537
f = st.upload_file(
538
open(TEST_DIR / 'test2.sql', 'r'),
539
upload_test_sql,
540
overwrite=True,
541
)
542
assert str(f.path) == upload_test_sql
543
assert f.type == 'file'
544
545
# Verify new content
546
txt = f.download(encoding='utf-8')
547
assert txt == open(TEST_DIR / 'test2.sql').read()
548
549
# Try to upload folder
550
with self.assertRaises(IsADirectoryError):
551
st.upload_file(TEST_DIR, 'test3.sql')
552
553
lib = st.mkdir('/lib/')
554
assert str(lib.path) == 'lib/'
555
assert lib.type == 'directory'
556
557
# Try to overwrite stage folder with file
558
with self.assertRaises(IsADirectoryError):
559
st.upload_file(TEST_DIR / 'test2.sql', lib.path, overwrite=True)
560
561
# Write file into folder
562
f = st.upload_file(
563
TEST_DIR / 'test2.sql',
564
os.path.join(lib.path, upload_test2_sql),
565
)
566
assert str(f.path) == 'lib/' + upload_test2_sql
567
assert f.type == 'file'
568
569
def test_open(self):
570
st = self.wg.stage
571
572
open_test_sql = f'open_test_{id(self)}.sql'
573
574
# See if error is raised for non-existent file
575
with self.assertRaises(s2.ManagementError):
576
st.open(open_test_sql, 'r')
577
578
# Load test file
579
st.upload_file(TEST_DIR / 'test.sql', open_test_sql)
580
581
# Read file using `open`
582
with st.open(open_test_sql, 'r') as rfile:
583
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
584
585
# Read file using `open` with 'rt' mode
586
with st.open(open_test_sql, 'rt') as rfile:
587
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
588
589
# Read file using `open` with 'rb' mode
590
with st.open(open_test_sql, 'rb') as rfile:
591
assert rfile.read() == open(TEST_DIR / 'test.sql', 'rb').read()
592
593
# Read file using `open` with 'rb' mode
594
with self.assertRaises(ValueError):
595
with st.open(open_test_sql, 'b') as rfile:
596
pass
597
598
# Attempt overwrite file using `open` with mode 'x'
599
with self.assertRaises(OSError):
600
with st.open(open_test_sql, 'x') as wfile:
601
pass
602
603
# Attempt overwrite file using `open` with mode 'w'
604
with st.open(open_test_sql, 'w') as wfile:
605
wfile.write(open(TEST_DIR / 'test2.sql').read())
606
607
txt = st.download_file(open_test_sql, encoding='utf-8')
608
609
assert txt == open(TEST_DIR / 'test2.sql').read()
610
611
open_raw_test_sql = f'open_raw_test_{id(self)}.sql'
612
613
# Test writer without context manager
614
wfile = st.open(open_raw_test_sql, 'w')
615
for line in open(TEST_DIR / 'test.sql'):
616
wfile.write(line)
617
wfile.close()
618
619
txt = st.download_file(open_raw_test_sql, encoding='utf-8')
620
621
assert txt == open(TEST_DIR / 'test.sql').read()
622
623
# Test reader without context manager
624
rfile = st.open(open_raw_test_sql, 'r')
625
txt = ''
626
for line in rfile:
627
txt += line
628
rfile.close()
629
630
assert txt == open(TEST_DIR / 'test.sql').read()
631
632
def test_obj_open(self):
633
st = self.wg.stage
634
635
obj_open_test_sql = f'obj_open_test_{id(self)}.sql'
636
obj_open_dir = f'obj_open_dir_{id(self)}'
637
638
# Load test file
639
f = st.upload_file(TEST_DIR / 'test.sql', obj_open_test_sql)
640
641
# Read file using `open`
642
with f.open() as rfile:
643
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
644
645
# Make sure directories error out
646
d = st.mkdir(obj_open_dir)
647
with self.assertRaises(IsADirectoryError):
648
d.open()
649
650
# Write file using `open`
651
with f.open('w', encoding='utf-8') as wfile:
652
wfile.write(open(TEST_DIR / 'test2.sql').read())
653
654
assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.sql').read()
655
656
# Test writer without context manager
657
wfile = f.open('w')
658
for line in open(TEST_DIR / 'test.sql'):
659
wfile.write(line)
660
wfile.close()
661
662
txt = st.download_file(f.path, encoding='utf-8')
663
664
assert txt == open(TEST_DIR / 'test.sql').read()
665
666
# Test reader without context manager
667
rfile = f.open('r')
668
txt = ''
669
for line in rfile:
670
txt += line
671
rfile.close()
672
673
assert txt == open(TEST_DIR / 'test.sql').read()
674
675
def test_os_directories(self):
676
st = self.wg.stage
677
678
# mkdir
679
st.mkdir('mkdir_test_1')
680
st.mkdir('mkdir_test_2')
681
with self.assertRaises(s2.ManagementError):
682
st.mkdir('mkdir_test_2/nest_1/nest_2')
683
st.mkdir('mkdir_test_2/nest_1')
684
st.mkdir('mkdir_test_2/nest_1/nest_2')
685
st.mkdir('mkdir_test_3')
686
687
assert st.exists('mkdir_test_1/')
688
assert st.exists('mkdir_test_2/')
689
assert st.exists('mkdir_test_2/nest_1/')
690
assert st.exists('mkdir_test_2/nest_1/nest_2/')
691
assert not st.exists('foo/')
692
assert not st.exists('foo/bar/')
693
694
assert st.is_dir('mkdir_test_1/')
695
assert st.is_dir('mkdir_test_2/')
696
assert st.is_dir('mkdir_test_2/nest_1/')
697
assert st.is_dir('mkdir_test_2/nest_1/nest_2/')
698
699
assert not st.is_file('mkdir_test_1/')
700
assert not st.is_file('mkdir_test_2/')
701
assert not st.is_file('mkdir_test_2/nest_1/')
702
assert not st.is_file('mkdir_test_2/nest_1/nest_2/')
703
704
out = st.listdir('/')
705
assert 'mkdir_test_1/' in out
706
assert 'mkdir_test_2/' in out
707
assert 'mkdir_test_2/nest_1/nest_2/' not in out
708
709
out = st.listdir('/', recursive=True)
710
assert 'mkdir_test_1/' in out
711
assert 'mkdir_test_2/' in out
712
assert 'mkdir_test_2/nest_1/nest_2/' in out
713
714
out = st.listdir('mkdir_test_2')
715
assert 'mkdir_test_1/' not in out
716
assert 'nest_1/' in out
717
assert 'nest_2/' not in out
718
assert 'nest_1/nest_2/' not in out
719
720
out = st.listdir('mkdir_test_2', recursive=True)
721
assert 'mkdir_test_1/' not in out
722
assert 'nest_1/' in out
723
assert 'nest_2/' not in out
724
assert 'nest_1/nest_2/' in out
725
726
# rmdir
727
before = st.listdir('/', recursive=True)
728
st.rmdir('mkdir_test_1/')
729
after = st.listdir('/', recursive=True)
730
assert 'mkdir_test_1/' in before
731
assert 'mkdir_test_1/' not in after
732
assert list(sorted(before)) == list(sorted(after + ['mkdir_test_1/']))
733
734
with self.assertRaises(OSError):
735
st.rmdir('mkdir_test_2/')
736
737
st.upload_file(TEST_DIR / 'test.sql', 'mkdir_test.sql')
738
739
with self.assertRaises(NotADirectoryError):
740
st.rmdir('mkdir_test.sql')
741
742
# removedirs
743
before = st.listdir('/')
744
st.removedirs('mkdir_test_2/')
745
after = st.listdir('/')
746
assert 'mkdir_test_2/' in before
747
assert 'mkdir_test_2/' not in after
748
assert list(sorted(before)) == list(sorted(after + ['mkdir_test_2/']))
749
750
with self.assertRaises(s2.ManagementError):
751
st.removedirs('mkdir_test.sql')
752
753
def test_os_files(self):
754
st = self.wg.stage
755
756
st.mkdir('files_test_1')
757
st.mkdir('files_test_1/nest_1')
758
759
st.upload_file(TEST_DIR / 'test.sql', 'files_test.sql')
760
st.upload_file(TEST_DIR / 'test.sql', 'files_test_1/nest_1/nested_files_test.sql')
761
st.upload_file(
762
TEST_DIR / 'test.sql',
763
'files_test_1/nest_1/nested_files_test_2.sql',
764
)
765
766
# remove
767
with self.assertRaises(IsADirectoryError):
768
st.remove('files_test_1/')
769
770
before = st.listdir('/')
771
st.remove('files_test.sql')
772
after = st.listdir('/')
773
assert 'files_test.sql' in before
774
assert 'files_test.sql' not in after
775
assert list(sorted(before)) == list(sorted(after + ['files_test.sql']))
776
777
before = st.listdir('files_test_1/nest_1/')
778
st.remove('files_test_1/nest_1/nested_files_test.sql')
779
after = st.listdir('files_test_1/nest_1/')
780
assert 'nested_files_test.sql' in before
781
assert 'nested_files_test.sql' not in after
782
assert st.is_dir('files_test_1/nest_1/')
783
784
# Removing the last file does not remove empty directories
785
st.remove('files_test_1/nest_1/nested_files_test_2.sql')
786
assert not st.is_file('files_test_1/nest_1/nested_files_test_2.sql')
787
assert st.is_dir('files_test_1/nest_1/')
788
assert st.is_dir('files_test_1/')
789
790
st.removedirs('files_test_1')
791
assert not st.is_dir('files_test_1/nest_1/')
792
assert not st.is_dir('files_test_1/')
793
794
def test_os_rename(self):
795
st = self.wg.stage
796
797
st.upload_file(TEST_DIR / 'test.sql', 'rename_test.sql')
798
799
with self.assertRaises(s2.ManagementError):
800
st.upload_file(
801
TEST_DIR / 'test.sql',
802
'rename_test_1/nest_1/nested_rename_test.sql',
803
)
804
805
st.mkdir('rename_test_1')
806
st.mkdir('rename_test_1/nest_1')
807
808
assert st.exists('/rename_test_1/nest_1/')
809
810
st.upload_file(
811
TEST_DIR / 'test.sql',
812
'rename_test_1/nest_1/nested_rename_test.sql',
813
)
814
815
st.upload_file(
816
TEST_DIR / 'test.sql',
817
'rename_test_1/nest_1/nested_rename_test_2.sql',
818
)
819
820
# rename file
821
assert 'rename_test.sql' in st.listdir('/')
822
assert 'rename_test_2.sql' not in st.listdir('/')
823
st.rename('rename_test.sql', 'rename_test_2.sql')
824
assert 'rename_test.sql' not in st.listdir('/')
825
assert 'rename_test_2.sql' in st.listdir('/')
826
827
# rename directory
828
assert 'rename_test_1/' in st.listdir('/')
829
assert 'rename_test_2/' not in st.listdir('/')
830
st.rename('rename_test_1/', 'rename_test_2/')
831
assert 'rename_test_1/' not in st.listdir('/')
832
assert 'rename_test_2/' in st.listdir('/')
833
assert st.is_file('rename_test_2/nest_1/nested_rename_test.sql')
834
assert st.is_file('rename_test_2/nest_1/nested_rename_test_2.sql')
835
836
# rename nested
837
assert 'rename_test_2/nest_1/nested_rename_test.sql' in st.listdir(
838
'/', recursive=True,
839
)
840
assert 'rename_test_2/nest_1/nested_rename_test_3.sql' not in st.listdir(
841
'/', recursive=True,
842
)
843
st.rename(
844
'rename_test_2/nest_1/nested_rename_test.sql',
845
'rename_test_2/nest_1/nested_rename_test_3.sql',
846
)
847
assert 'rename_test_2/nest_1/nested_rename_test.sql' not in st.listdir(
848
'/', recursive=True,
849
)
850
assert 'rename_test_2/nest_1/nested_rename_test_3.sql' in st.listdir(
851
'/', recursive=True,
852
)
853
assert not st.is_file('rename_test_2/nest_1/nested_rename_test.sql')
854
assert st.is_file('rename_test_2/nest_1/nested_rename_test_2.sql')
855
assert st.is_file('rename_test_2/nest_1/nested_rename_test_3.sql')
856
857
# non-existent file
858
with self.assertRaises(OSError):
859
st.rename('rename_foo.sql', 'rename_foo_2.sql')
860
861
# overwrite
862
with self.assertRaises(OSError):
863
st.rename(
864
'rename_test_2.sql',
865
'rename_test_2/nest_1/nested_rename_test_3.sql',
866
)
867
868
st.rename(
869
'rename_test_2.sql',
870
'rename_test_2/nest_1/nested_rename_test_3.sql', overwrite=True,
871
)
872
873
def test_file_object(self):
874
st = self.wg.stage
875
876
st.mkdir('obj_test')
877
st.mkdir('obj_test/nest_1')
878
879
f1 = st.upload_file(TEST_DIR / 'test.sql', 'obj_test.sql')
880
f2 = st.upload_file(TEST_DIR / 'test.sql', 'obj_test/nest_1/obj_test.sql')
881
d2 = st.info('obj_test/nest_1/')
882
883
# is_file / is_dir
884
assert not f1.is_dir()
885
assert f1.is_file()
886
assert not f2.is_dir()
887
assert f2.is_file()
888
assert d2.is_dir()
889
assert not d2.is_file()
890
891
# abspath / basename / dirname / exists
892
assert f1.abspath() == 'obj_test.sql'
893
assert f1.basename() == 'obj_test.sql'
894
assert f1.dirname() == '/'
895
assert f1.exists()
896
assert f2.abspath() == 'obj_test/nest_1/obj_test.sql'
897
assert f2.basename() == 'obj_test.sql'
898
assert f2.dirname() == 'obj_test/nest_1/'
899
assert f2.exists()
900
assert d2.abspath() == 'obj_test/nest_1/'
901
assert d2.basename() == 'nest_1'
902
assert d2.dirname() == 'obj_test/'
903
assert d2.exists()
904
905
# download
906
assert f1.download(encoding='utf-8') == open(TEST_DIR / 'test.sql', 'r').read()
907
assert f1.download() == open(TEST_DIR / 'test.sql', 'rb').read()
908
909
# remove
910
with self.assertRaises(IsADirectoryError):
911
d2.remove()
912
913
assert st.is_file('obj_test.sql')
914
f1.remove()
915
assert not st.is_file('obj_test.sql')
916
917
# removedirs
918
with self.assertRaises(NotADirectoryError):
919
f2.removedirs()
920
921
assert st.exists(d2.path)
922
d2.removedirs()
923
assert not st.exists(d2.path)
924
925
# rmdir
926
f1 = st.upload_file(TEST_DIR / 'test.sql', 'obj_test.sql')
927
d2 = st.mkdir('obj_test/nest_1')
928
929
assert st.exists(f1.path)
930
assert st.exists(d2.path)
931
932
with self.assertRaises(NotADirectoryError):
933
f1.rmdir()
934
935
assert st.exists(f1.path)
936
assert st.exists(d2.path)
937
938
d2.rmdir()
939
940
assert not st.exists('obj_test/nest_1/')
941
assert not st.exists('obj_test')
942
943
# mtime / ctime
944
assert f1.getmtime() > 0
945
assert f1.getctime() > 0
946
947
# rename
948
assert st.exists('obj_test.sql')
949
assert not st.exists('obj_test_2.sql')
950
f1.rename('obj_test_2.sql')
951
assert not st.exists('obj_test.sql')
952
assert st.exists('obj_test_2.sql')
953
assert f1.abspath() == 'obj_test_2.sql'
954
955
956
@pytest.mark.management
957
class TestSecrets(unittest.TestCase):
958
959
manager = None
960
wg = None
961
password = None
962
963
@classmethod
964
def setUpClass(cls):
965
cls.manager = s2.manage_workspaces()
966
967
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
968
cls.password = secrets.token_urlsafe(20) + '-x&$'
969
970
name = clean_name(secrets.token_urlsafe(20)[:20])
971
972
cls.wg = cls.manager.create_workspace_group(
973
f'wg-test-{name}',
974
region=random.choice(us_regions).id,
975
admin_password=cls.password,
976
firewall_ranges=['0.0.0.0/0'],
977
)
978
979
@classmethod
980
def tearDownClass(cls):
981
if cls.wg is not None:
982
cls.wg.terminate(force=True)
983
cls.wg = None
984
cls.manager = None
985
cls.password = None
986
987
def test_get_secret(self):
988
# manually create secret and then get secret
989
# try to delete the secret if it exists
990
try:
991
secret = self.manager.organizations.current.get_secret('secret_name')
992
993
secret_id = secret.id
994
995
self.manager._delete(f'secrets/{secret_id}')
996
except s2.ManagementError:
997
pass
998
999
self.manager._post(
1000
'secrets',
1001
json=dict(
1002
name='secret_name',
1003
value='secret_value',
1004
),
1005
)
1006
1007
secret = self.manager.organizations.current.get_secret('secret_name')
1008
1009
assert secret.name == 'secret_name'
1010
assert secret.value == 'secret_value'
1011
1012
1013
@pytest.mark.management
1014
class TestJob(unittest.TestCase):
1015
1016
manager = None
1017
workspace_group = None
1018
workspace = None
1019
password = None
1020
job_ids = []
1021
1022
@classmethod
1023
def setUpClass(cls):
1024
cls.manager = s2.manage_workspaces()
1025
1026
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
1027
cls.password = secrets.token_urlsafe(20) + '-x&$'
1028
1029
name = clean_name(secrets.token_urlsafe(20)[:20])
1030
1031
cls.workspace_group = cls.manager.create_workspace_group(
1032
f'wg-test-{name}',
1033
region=random.choice(us_regions).id,
1034
admin_password=cls.password,
1035
firewall_ranges=['0.0.0.0/0'],
1036
)
1037
1038
try:
1039
cls.workspace = cls.workspace_group.create_workspace(
1040
f'ws-test-{name}-x',
1041
wait_on_active=True,
1042
)
1043
except Exception:
1044
cls.workspace_group.terminate(force=True)
1045
raise
1046
1047
@classmethod
1048
def tearDownClass(cls):
1049
for job_id in cls.job_ids:
1050
try:
1051
cls.manager.organizations.current.jobs.delete(job_id)
1052
except Exception:
1053
pass
1054
if cls.workspace_group is not None:
1055
cls.workspace_group.terminate(force=True)
1056
cls.workspace_group = None
1057
cls.workspace = None
1058
cls.manager = None
1059
cls.password = None
1060
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
1061
del os.environ['SINGLESTOREDB_WORKSPACE']
1062
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
1063
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
1064
1065
def test_job_without_database_target(self):
1066
"""
1067
Creates job without target database on a specific runtime
1068
Waits for job to finish
1069
Gets the job
1070
Deletes the job
1071
"""
1072
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
1073
del os.environ['SINGLESTOREDB_WORKSPACE']
1074
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
1075
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
1076
1077
job_manager = self.manager.organizations.current.jobs
1078
job = job_manager.run(
1079
'Scheduling Test.ipynb',
1080
'notebooks-cpu-small',
1081
{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},
1082
)
1083
self.job_ids.append(job.job_id)
1084
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1085
assert job.schedule.mode == job_manager.modes().ONCE
1086
assert not job.execution_config.create_snapshot
1087
assert job.completed_executions_count == 0
1088
assert job.name is None
1089
assert job.description is None
1090
assert job.job_metadata == []
1091
assert job.terminated_at is None
1092
assert job.target_config is None
1093
job.wait()
1094
job = job_manager.get(job.job_id)
1095
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1096
assert job.schedule.mode == job_manager.modes().ONCE
1097
assert not job.execution_config.create_snapshot
1098
assert job.completed_executions_count == 1
1099
assert job.name is None
1100
assert job.description is None
1101
assert job.job_metadata != []
1102
assert len(job.job_metadata) == 1
1103
assert job.job_metadata[0].count == 1
1104
assert job.job_metadata[0].status == Status.COMPLETED
1105
assert job.terminated_at is None
1106
assert job.target_config is None
1107
deleted = job.delete()
1108
assert deleted
1109
job = job_manager.get(job.job_id)
1110
assert job.terminated_at is not None
1111
1112
def test_job_with_database_target(self):
1113
"""
1114
Creates job with target database on a specific runtime
1115
Waits for job to finish
1116
Gets the job
1117
Deletes the job
1118
"""
1119
os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'
1120
os.environ['SINGLESTOREDB_WORKSPACE'] = self.workspace.id
1121
1122
job_manager = self.manager.organizations.current.jobs
1123
job = job_manager.run(
1124
'Scheduling Test.ipynb',
1125
'notebooks-cpu-small',
1126
{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},
1127
)
1128
self.job_ids.append(job.job_id)
1129
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1130
assert job.schedule.mode == job_manager.modes().ONCE
1131
assert not job.execution_config.create_snapshot
1132
assert job.completed_executions_count == 0
1133
assert job.name is None
1134
assert job.description is None
1135
assert job.job_metadata == []
1136
assert job.terminated_at is None
1137
assert job.target_config is not None
1138
assert job.target_config.database_name == 'information_schema'
1139
assert job.target_config.target_id == self.workspace.id
1140
assert job.target_config.target_type == TargetType.WORKSPACE
1141
assert not job.target_config.resume_target
1142
job.wait()
1143
job = job_manager.get(job.job_id)
1144
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1145
assert job.schedule.mode == job_manager.modes().ONCE
1146
assert not job.execution_config.create_snapshot
1147
assert job.completed_executions_count == 1
1148
assert job.name is None
1149
assert job.description is None
1150
assert job.job_metadata != []
1151
assert len(job.job_metadata) == 1
1152
assert job.job_metadata[0].count == 1
1153
assert job.job_metadata[0].status == Status.COMPLETED
1154
assert job.terminated_at is None
1155
assert job.target_config is not None
1156
assert job.target_config.database_name == 'information_schema'
1157
assert job.target_config.target_id == self.workspace.id
1158
assert job.target_config.target_type == TargetType.WORKSPACE
1159
assert not job.target_config.resume_target
1160
deleted = job.delete()
1161
assert deleted
1162
job = job_manager.get(job.job_id)
1163
assert job.terminated_at is not None
1164
1165
1166
@pytest.mark.management
1167
class TestFileSpaces(unittest.TestCase):
1168
1169
manager = None
1170
personal_space = None
1171
shared_space = None
1172
1173
@classmethod
1174
def setUpClass(cls):
1175
cls.manager = s2.manage_files()
1176
cls.personal_space = cls.manager.personal_space
1177
cls.shared_space = cls.manager.shared_space
1178
1179
@classmethod
1180
def tearDownClass(cls):
1181
cls.manager = None
1182
cls.personal_space = None
1183
cls.shared_space = None
1184
1185
def test_upload_file(self):
1186
upload_test_ipynb = f'upload_test_{id(self)}.ipynb'
1187
1188
for space in [self.personal_space, self.shared_space]:
1189
root = space.info('/')
1190
assert str(root.path) == '/'
1191
assert root.type == 'directory'
1192
1193
# Upload files
1194
f = space.upload_file(
1195
TEST_DIR / 'test.ipynb',
1196
upload_test_ipynb,
1197
)
1198
assert str(f.path) == upload_test_ipynb
1199
assert f.type == 'notebook'
1200
1201
# Download and compare to original
1202
txt = f.download(encoding='utf-8')
1203
assert txt == open(TEST_DIR / 'test.ipynb').read()
1204
1205
# Make sure we can't overwrite
1206
with self.assertRaises(OSError):
1207
space.upload_file(
1208
TEST_DIR / 'test.ipynb',
1209
upload_test_ipynb,
1210
)
1211
1212
# Force overwrite with new content
1213
f = space.upload_file(
1214
TEST_DIR / 'test2.ipynb',
1215
upload_test_ipynb, overwrite=True,
1216
)
1217
assert str(f.path) == upload_test_ipynb
1218
assert f.type == 'notebook'
1219
1220
# Verify new content
1221
txt = f.download(encoding='utf-8')
1222
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1223
1224
# Make sure we can't upload a folder
1225
with self.assertRaises(s2.ManagementError):
1226
space.upload_folder(TEST_DIR, 'test')
1227
1228
# Cleanup
1229
space.remove(upload_test_ipynb)
1230
1231
def test_upload_file_io(self):
1232
upload_test_ipynb = f'upload_test_{id(self)}.ipynb'
1233
1234
for space in [self.personal_space, self.shared_space]:
1235
root = space.info('/')
1236
assert str(root.path) == '/'
1237
assert root.type == 'directory'
1238
1239
# Upload files
1240
f = space.upload_file(
1241
open(TEST_DIR / 'test.ipynb', 'r'),
1242
upload_test_ipynb,
1243
)
1244
assert str(f.path) == upload_test_ipynb
1245
assert f.type == 'notebook'
1246
1247
# Download and compare to original
1248
txt = f.download(encoding='utf-8')
1249
assert txt == open(TEST_DIR / 'test.ipynb').read()
1250
1251
# Make sure we can't overwrite
1252
with self.assertRaises(OSError):
1253
space.upload_file(
1254
open(TEST_DIR / 'test.ipynb', 'r'),
1255
upload_test_ipynb,
1256
)
1257
1258
# Force overwrite with new content
1259
f = space.upload_file(
1260
open(TEST_DIR / 'test2.ipynb', 'r'),
1261
upload_test_ipynb, overwrite=True,
1262
)
1263
assert str(f.path) == upload_test_ipynb
1264
assert f.type == 'notebook'
1265
1266
# Verify new content
1267
txt = f.download(encoding='utf-8')
1268
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1269
1270
# Make sure we can't upload a folder
1271
with self.assertRaises(s2.ManagementError):
1272
space.upload_folder(TEST_DIR, 'test')
1273
1274
# Cleanup
1275
space.remove(upload_test_ipynb)
1276
1277
def test_open(self):
1278
for space in [self.personal_space, self.shared_space]:
1279
open_test_ipynb = f'open_test_ipynb_{id(self)}.ipynb'
1280
1281
# See if error is raised for non-existent file
1282
with self.assertRaises(s2.ManagementError):
1283
space.open(open_test_ipynb, 'r')
1284
1285
# Load test file
1286
space.upload_file(TEST_DIR / 'test.ipynb', open_test_ipynb)
1287
1288
# Read file using `open`
1289
with space.open(open_test_ipynb, 'r') as rfile:
1290
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1291
1292
# Read file using `open` with 'rt' mode
1293
with space.open(open_test_ipynb, 'rt') as rfile:
1294
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1295
1296
# Read file using `open` with 'rb' mode
1297
with space.open(open_test_ipynb, 'rb') as rfile:
1298
assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read()
1299
1300
# Read file using `open` with 'rb' mode
1301
with self.assertRaises(ValueError):
1302
with space.open(open_test_ipynb, 'b') as rfile:
1303
pass
1304
1305
# Attempt overwrite file using `open` with mode 'x'
1306
with self.assertRaises(OSError):
1307
with space.open(open_test_ipynb, 'x') as wfile:
1308
pass
1309
1310
# Attempt overwrite file using `open` with mode 'w'
1311
with space.open(open_test_ipynb, 'w') as wfile:
1312
wfile.write(open(TEST_DIR / 'test2.ipynb').read())
1313
1314
txt = space.download_file(open_test_ipynb, encoding='utf-8')
1315
1316
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1317
1318
open_raw_test_ipynb = f'open_raw_test_{id(self)}.ipynb'
1319
1320
# Test writer without context manager
1321
wfile = space.open(open_raw_test_ipynb, 'w')
1322
for line in open(TEST_DIR / 'test.ipynb'):
1323
wfile.write(line)
1324
wfile.close()
1325
1326
txt = space.download_file(
1327
open_raw_test_ipynb,
1328
encoding='utf-8',
1329
)
1330
1331
assert txt == open(TEST_DIR / 'test.ipynb').read()
1332
1333
# Test reader without context manager
1334
rfile = space.open(open_raw_test_ipynb, 'r')
1335
txt = ''
1336
for line in rfile:
1337
txt += line
1338
rfile.close()
1339
1340
assert txt == open(TEST_DIR / 'test.ipynb').read()
1341
1342
# Cleanup
1343
space.remove(open_test_ipynb)
1344
space.remove(open_raw_test_ipynb)
1345
1346
def test_obj_open(self):
1347
for space in [self.personal_space, self.shared_space]:
1348
obj_open_test_ipynb = f'obj_open_test_{id(self)}.ipynb'
1349
obj_open_dir = f'obj_open_dir_{id(self)}'
1350
1351
# Load test file
1352
f = space.upload_file(
1353
TEST_DIR / 'test.ipynb',
1354
obj_open_test_ipynb,
1355
)
1356
1357
# Read file using `open`
1358
with f.open() as rfile:
1359
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1360
1361
# Make sure directories error out
1362
with self.assertRaises(s2.ManagementError):
1363
space.mkdir(obj_open_dir)
1364
1365
# Write file using `open`
1366
with f.open('w', encoding='utf-8') as wfile:
1367
wfile.write(open(TEST_DIR / 'test2.ipynb').read())
1368
1369
assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.ipynb').read()
1370
1371
# Test writer without context manager
1372
wfile = f.open('w')
1373
for line in open(TEST_DIR / 'test.ipynb'):
1374
wfile.write(line)
1375
wfile.close()
1376
1377
txt = space.download_file(f.path, encoding='utf-8')
1378
1379
assert txt == open(TEST_DIR / 'test.ipynb').read()
1380
1381
# Test reader without context manager
1382
rfile = f.open('r')
1383
txt = ''
1384
for line in rfile:
1385
txt += line
1386
rfile.close()
1387
1388
assert txt == open(TEST_DIR / 'test.ipynb').read()
1389
1390
# Cleanup
1391
space.remove(obj_open_test_ipynb)
1392
1393
def test_os_directories(self):
1394
for space in [self.personal_space, self.shared_space]:
1395
# Make sure directories error out
1396
with self.assertRaises(s2.ManagementError):
1397
space.mkdir('mkdir_test_1')
1398
1399
with self.assertRaises(s2.ManagementError):
1400
space.exists('mkdir_test_1/')
1401
1402
out = space.listdir('/')
1403
assert 'mkdir_test_1/' not in out
1404
1405
with self.assertRaises(s2.ManagementError):
1406
space.rmdir('mkdir_test_1/')
1407
1408
def test_os_rename(self):
1409
for space in [self.personal_space, self.shared_space]:
1410
space.upload_file(
1411
TEST_DIR / 'test.ipynb',
1412
'rename_test.ipynb',
1413
)
1414
assert 'rename_test.ipynb' in space.listdir('/')
1415
assert 'rename_test_2.ipynb' not in space.listdir('/')
1416
1417
space.rename(
1418
'rename_test.ipynb',
1419
'rename_test_2.ipynb',
1420
)
1421
assert 'rename_test.ipynb' not in space.listdir('/')
1422
assert 'rename_test_2.ipynb' in space.listdir('/')
1423
1424
# non-existent file
1425
with self.assertRaises(OSError):
1426
space.rename('rename_foo.ipynb', 'rename_foo_2.ipynb')
1427
1428
space.upload_file(
1429
TEST_DIR / 'test.ipynb',
1430
'rename_test_3.ipynb',
1431
)
1432
1433
# overwrite
1434
with self.assertRaises(OSError):
1435
space.rename(
1436
'rename_test_2.ipynb',
1437
'rename_test_3.ipynb',
1438
)
1439
1440
space.rename(
1441
'rename_test_2.ipynb',
1442
'rename_test_3.ipynb', overwrite=True,
1443
)
1444
1445
# Cleanup
1446
space.remove('rename_test_3.ipynb')
1447
1448
def test_file_object(self):
1449
for space in [self.personal_space, self.shared_space]:
1450
f = space.upload_file(
1451
TEST_DIR / 'test.ipynb',
1452
'obj_test.ipynb',
1453
)
1454
1455
assert not f.is_dir()
1456
assert f.is_file()
1457
1458
# abspath / basename / dirname / exists
1459
assert f.abspath() == 'obj_test.ipynb'
1460
assert f.basename() == 'obj_test.ipynb'
1461
assert f.dirname() == '/'
1462
assert f.exists()
1463
1464
# download
1465
assert f.download(encoding='utf-8') == \
1466
open(TEST_DIR / 'test.ipynb', 'r').read()
1467
assert f.download() == open(TEST_DIR / 'test.ipynb', 'rb').read()
1468
1469
assert space.is_file('obj_test.ipynb')
1470
f.remove()
1471
assert not space.is_file('obj_test.ipynb')
1472
1473
# mtime / ctime
1474
assert f.getmtime() > 0
1475
assert f.getctime() > 0
1476
1477
# rename
1478
f = space.upload_file(
1479
TEST_DIR / 'test.ipynb',
1480
'obj_test.ipynb',
1481
)
1482
assert space.exists('obj_test.ipynb')
1483
assert not space.exists('obj_test_2.ipynb')
1484
f.rename('obj_test_2.ipynb')
1485
assert not space.exists('obj_test.ipynb')
1486
assert space.exists('obj_test_2.ipynb')
1487
assert f.abspath() == 'obj_test_2.ipynb'
1488
1489
# Cleanup
1490
space.remove('obj_test_2.ipynb')
1491
1492
1493
@pytest.mark.management
1494
class TestRegions(unittest.TestCase):
1495
"""Test cases for region management."""
1496
1497
manager = None
1498
1499
@classmethod
1500
def setUpClass(cls):
1501
"""Set up the test environment."""
1502
cls.manager = s2.manage_regions()
1503
1504
@classmethod
1505
def tearDownClass(cls):
1506
"""Clean up the test environment."""
1507
cls.manager = None
1508
1509
def test_list_regions(self):
1510
"""Test listing all regions."""
1511
regions = self.manager.list_regions()
1512
1513
# Verify we get a NamedList
1514
assert isinstance(regions, NamedList)
1515
1516
# Verify we have at least one region
1517
assert len(regions) > 0
1518
1519
# Verify region properties
1520
region = regions[0]
1521
assert isinstance(region, Region)
1522
assert hasattr(region, 'id')
1523
assert hasattr(region, 'name')
1524
assert hasattr(region, 'provider')
1525
1526
# Verify provider values
1527
providers = {x.provider for x in regions}
1528
assert 'Azure' in providers or 'GCP' in providers or 'AWS' in providers
1529
1530
def test_list_shared_tier_regions(self):
1531
"""Test listing shared tier regions."""
1532
regions = self.manager.list_shared_tier_regions()
1533
1534
# Verify we get a NamedList
1535
assert isinstance(regions, NamedList)
1536
1537
# Verify region properties if we have any shared tier regions
1538
if regions:
1539
region = regions[0]
1540
assert isinstance(region, Region)
1541
assert hasattr(region, 'name')
1542
assert hasattr(region, 'provider')
1543
assert hasattr(region, 'region_name')
1544
1545
# Verify provider values
1546
providers = {x.provider for x in regions}
1547
assert any(p in providers for p in ['Azure', 'GCP', 'AWS'])
1548
1549
def test_str_repr(self):
1550
"""Test string representation of regions."""
1551
regions = self.manager.list_regions()
1552
if not regions:
1553
self.skipTest('No regions available for testing')
1554
1555
region = regions[0]
1556
1557
# Test __str__
1558
s = str(region)
1559
assert region.id in s
1560
assert region.name in s
1561
assert region.provider in s
1562
1563
# Test __repr__
1564
assert repr(region) == str(region)
1565
1566