Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/tests/test_management.py
801 views
1
#!/usr/bin/env python
2
# type: ignore
3
"""SingleStoreDB Management API testing."""
4
import os
5
import pathlib
6
import random
7
import re
8
import secrets
9
import unittest
10
11
import pytest
12
13
import singlestoredb as s2
14
from singlestoredb.management.job import Status
15
from singlestoredb.management.job import TargetType
16
from singlestoredb.management.region import Region
17
from singlestoredb.management.utils import NamedList
18
19
20
TEST_DIR = pathlib.Path(os.path.dirname(__file__))
21
22
23
def clean_name(s):
24
"""Change all non-word characters to -."""
25
return re.sub(r'[^\w]', r'-', s).replace('_', '-').lower()
26
27
28
def shared_database_name(s):
29
"""Return a shared database name. Cannot contain special characters except -"""
30
return re.sub(r'[^\w]', '', s).replace('-', '_').lower()
31
32
33
@pytest.mark.skip(reason='Legacy cluster Management API is going away')
34
@pytest.mark.management
35
class TestCluster(unittest.TestCase):
36
37
manager = None
38
cluster = None
39
password = None
40
41
@classmethod
42
def setUpClass(cls):
43
cls.manager = s2.manage_cluster()
44
45
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
46
cls.password = secrets.token_urlsafe(20) + '-x&$'
47
48
cls.cluster = cls.manager.create_cluster(
49
clean_name('cm-test-{}'.format(secrets.token_urlsafe(20)[:20])),
50
region=random.choice(us_regions).id,
51
admin_password=cls.password,
52
firewall_ranges=['0.0.0.0/0'],
53
expires_at='1h',
54
size='S-00',
55
wait_on_active=True,
56
)
57
58
@classmethod
59
def tearDownClass(cls):
60
if cls.cluster is not None:
61
cls.cluster.terminate()
62
cls.cluster = None
63
cls.manager = None
64
cls.password = None
65
66
def test_str(self):
67
assert self.cluster.name in str(self.cluster.name)
68
69
def test_repr(self):
70
assert repr(self.cluster) == str(self.cluster)
71
72
def test_region_str(self):
73
s = str(self.cluster.region)
74
assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s
75
76
def test_region_repr(self):
77
assert repr(self.cluster.region) == str(self.cluster.region)
78
79
def test_regions(self):
80
out = self.manager.regions
81
providers = {x.provider for x in out}
82
names = [x.name for x in out]
83
assert 'Azure' in providers, providers
84
assert 'GCP' in providers, providers
85
assert 'AWS' in providers, providers
86
87
objs = {}
88
ids = []
89
for item in out:
90
ids.append(item.id)
91
objs[item.id] = item
92
if item.name not in objs:
93
objs[item.name] = item
94
95
name = random.choice(names)
96
assert out[name] == objs[name]
97
id = random.choice(ids)
98
assert out[id] == objs[id]
99
100
def test_clusters(self):
101
clusters = self.manager.clusters
102
ids = [x.id for x in clusters]
103
assert self.cluster.id in ids, ids
104
105
def test_get_cluster(self):
106
clus = self.manager.get_cluster(self.cluster.id)
107
assert clus.id == self.cluster.id, clus.id
108
109
with self.assertRaises(s2.ManagementError) as cm:
110
clus = self.manager.get_cluster('bad id')
111
112
assert 'UUID' in cm.exception.msg, cm.exception.msg
113
114
def test_update(self):
115
assert self.cluster.name.startswith('cm-test-')
116
117
name = self.cluster.name.replace('cm-test-', 'cm-foo-')
118
self.cluster.update(name=name)
119
120
clus = self.manager.get_cluster(self.cluster.id)
121
assert clus.name == name, clus.name
122
123
def test_suspend_resume(self):
124
trues = ['1', 'on', 'true']
125
do_test = os.environ.get('SINGLESTOREDB_TEST_SUSPEND', '0').lower() in trues
126
127
if not do_test:
128
self.skipTest(
129
'Suspend / resume tests skipped by default due to '
130
'being time consuming; set SINGLESTOREDB_TEST_SUSPEND=1 '
131
'to enable',
132
)
133
134
assert self.cluster.state != 'Suspended', self.cluster.state
135
136
self.cluster.suspend(wait_on_suspended=True)
137
assert self.cluster.state == 'Suspended', self.cluster.state
138
139
self.cluster.resume(wait_on_resumed=True)
140
assert self.cluster.state == 'Active', self.cluster.state
141
142
def test_no_manager(self):
143
clus = self.manager.get_cluster(self.cluster.id)
144
clus._manager = None
145
146
with self.assertRaises(s2.ManagementError) as cm:
147
clus.refresh()
148
149
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
150
151
with self.assertRaises(s2.ManagementError) as cm:
152
clus.update()
153
154
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
155
156
with self.assertRaises(s2.ManagementError) as cm:
157
clus.suspend()
158
159
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
160
161
with self.assertRaises(s2.ManagementError) as cm:
162
clus.resume()
163
164
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
165
166
with self.assertRaises(s2.ManagementError) as cm:
167
clus.terminate()
168
169
assert 'No cluster manager' in cm.exception.msg, cm.exception.msg
170
171
def test_connect(self):
172
trues = ['1', 'on', 'true']
173
pure_python = os.environ.get('SINGLESTOREDB_PURE_PYTHON', '0').lower() in trues
174
175
self.skipTest('Connection test is disable due to flakey server')
176
177
if pure_python:
178
self.skipTest('Connections through managed service are disabled')
179
180
try:
181
with self.cluster.connect(user='admin', password=self.password) as conn:
182
with conn.cursor() as cur:
183
cur.execute('show databases')
184
assert 'cluster' in [x[0] for x in list(cur)]
185
except s2.ManagementError as exc:
186
if 'endpoint has not been set' not in str(exc):
187
self.skipTest('No endpoint in response. Skipping connection test.')
188
189
# Test missing endpoint
190
clus = self.manager.get_cluster(self.cluster.id)
191
clus.endpoint = None
192
193
with self.assertRaises(s2.ManagementError) as cm:
194
clus.connect(user='admin', password=self.password)
195
196
assert 'endpoint' in cm.exception.msg, cm.exception.msg
197
198
199
@pytest.mark.management
200
class TestWorkspace(unittest.TestCase):
201
202
manager = None
203
workspace_group = None
204
workspace = None
205
password = None
206
207
@classmethod
208
def setUpClass(cls):
209
cls.manager = s2.manage_workspaces()
210
211
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
212
cls.password = secrets.token_urlsafe(20) + '-x&$'
213
214
name = clean_name(secrets.token_urlsafe(20)[:20])
215
216
cls.workspace_group = cls.manager.create_workspace_group(
217
f'wg-test-{name}',
218
region=random.choice(us_regions).id,
219
admin_password=cls.password,
220
firewall_ranges=['0.0.0.0/0'],
221
)
222
223
try:
224
cls.workspace = cls.workspace_group.create_workspace(
225
f'ws-test-{name}-x',
226
wait_on_active=True,
227
)
228
except Exception:
229
cls.workspace_group.terminate(force=True)
230
raise
231
232
@classmethod
233
def tearDownClass(cls):
234
if cls.workspace_group is not None:
235
cls.workspace_group.terminate(force=True)
236
cls.workspace_group = None
237
cls.workspace = None
238
cls.manager = None
239
cls.password = None
240
241
def test_str(self):
242
assert self.workspace.name in str(self.workspace.name)
243
assert self.workspace_group.name in str(self.workspace_group.name)
244
245
def test_repr(self):
246
assert repr(self.workspace) == str(self.workspace)
247
assert repr(self.workspace_group) == str(self.workspace_group)
248
249
def test_region_str(self):
250
s = str(self.workspace_group.region)
251
assert 'Azure' in s or 'GCP' in s or 'AWS' in s, s
252
253
def test_region_repr(self):
254
assert repr(self.workspace_group.region) == str(self.workspace_group.region)
255
256
def test_regions(self):
257
out = self.manager.regions
258
providers = {x.provider for x in out}
259
names = [x.name for x in out]
260
assert 'Azure' in providers, providers
261
assert 'GCP' in providers, providers
262
assert 'AWS' in providers, providers
263
264
objs = {}
265
ids = []
266
for item in out:
267
ids.append(item.id)
268
objs[item.id] = item
269
if item.name not in objs:
270
objs[item.name] = item
271
272
name = random.choice(names)
273
assert out[name] == objs[name]
274
id = random.choice(ids)
275
assert out[id] == objs[id]
276
277
def test_workspace_groups(self):
278
workspace_groups = self.manager.workspace_groups
279
ids = [x.id for x in workspace_groups]
280
names = [x.name for x in workspace_groups]
281
assert self.workspace_group.id in ids
282
assert self.workspace_group.name in names
283
284
assert workspace_groups.ids() == ids
285
assert workspace_groups.names() == names
286
287
objs = {}
288
for item in workspace_groups:
289
objs[item.id] = item
290
objs[item.name] = item
291
292
name = random.choice(names)
293
assert workspace_groups[name] == objs[name]
294
id = random.choice(ids)
295
assert workspace_groups[id] == objs[id]
296
297
def test_workspaces(self):
298
spaces = self.workspace_group.workspaces
299
ids = [x.id for x in spaces]
300
names = [x.name for x in spaces]
301
assert self.workspace.id in ids
302
assert self.workspace.name in names
303
304
assert spaces.ids() == ids
305
assert spaces.names() == names
306
307
objs = {}
308
for item in spaces:
309
objs[item.id] = item
310
objs[item.name] = item
311
312
name = random.choice(names)
313
assert spaces[name] == objs[name]
314
id = random.choice(ids)
315
assert spaces[id] == objs[id]
316
317
def test_get_workspace_group(self):
318
group = self.manager.get_workspace_group(self.workspace_group.id)
319
assert group.id == self.workspace_group.id, group.id
320
321
with self.assertRaises(s2.ManagementError) as cm:
322
group = self.manager.get_workspace_group('bad id')
323
324
assert 'UUID' in cm.exception.msg, cm.exception.msg
325
326
def test_get_workspace(self):
327
space = self.manager.get_workspace(self.workspace.id)
328
assert space.id == self.workspace.id, space.id
329
330
with self.assertRaises(s2.ManagementError) as cm:
331
space = self.manager.get_workspace('bad id')
332
333
assert 'UUID' in cm.exception.msg, cm.exception.msg
334
335
def test_update(self):
336
assert self.workspace_group.name.startswith('wg-test-')
337
338
name = self.workspace_group.name.replace('wg-test-', 'wg-foo-')
339
self.workspace_group.update(name=name)
340
341
group = self.manager.get_workspace_group(self.workspace_group.id)
342
assert group.name == name, group.name
343
344
def test_no_manager(self):
345
space = self.manager.get_workspace(self.workspace.id)
346
space._manager = None
347
348
with self.assertRaises(s2.ManagementError) as cm:
349
space.refresh()
350
351
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
352
353
with self.assertRaises(s2.ManagementError) as cm:
354
space.terminate()
355
356
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
357
358
def test_connect(self):
359
with self.workspace.connect(user='admin', password=self.password) as conn:
360
with conn.cursor() as cur:
361
cur.execute('show databases')
362
assert 'cluster' in [x[0] for x in list(cur)]
363
364
# Test missing endpoint
365
space = self.manager.get_workspace(self.workspace.id)
366
space.endpoint = None
367
368
with self.assertRaises(s2.ManagementError) as cm:
369
space.connect(user='admin', password=self.password)
370
371
assert 'endpoint' in cm.exception.msg, cm.exception.msg
372
373
374
@pytest.mark.management
375
class TestStarterWorkspace(unittest.TestCase):
376
377
manager = None
378
starter_workspace = None
379
380
@classmethod
381
def setUpClass(cls):
382
cls.manager = s2.manage_workspaces()
383
384
shared_tier_regions: NamedList[Region] = [
385
x for x in cls.manager.shared_tier_regions if 'US' in x.name
386
]
387
cls.starter_username = 'starter_user'
388
cls.password = secrets.token_urlsafe(20)
389
390
name = shared_database_name(secrets.token_urlsafe(20)[:20])
391
392
cls.database_name = f'starter_db_{name}'
393
394
shared_tier_region: Region = random.choice(shared_tier_regions)
395
396
if not shared_tier_region:
397
raise ValueError('No shared tier regions found')
398
399
cls.starter_workspace = cls.manager.create_starter_workspace(
400
f'starter-ws-test-{name}',
401
database_name=cls.database_name,
402
provider=shared_tier_region.provider,
403
region_name=shared_tier_region.region_name,
404
)
405
406
cls.starter_workspace.create_user(
407
username=cls.starter_username,
408
password=cls.password,
409
)
410
411
@classmethod
412
def tearDownClass(cls):
413
if cls.starter_workspace is not None:
414
cls.starter_workspace.terminate()
415
cls.manager = None
416
cls.password = None
417
418
def test_str(self):
419
assert self.starter_workspace.name in str(self.starter_workspace.name)
420
421
def test_repr(self):
422
assert repr(self.starter_workspace) == str(self.starter_workspace)
423
424
def test_get_starter_workspace(self):
425
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
426
assert workspace.id == self.starter_workspace.id, workspace.id
427
428
with self.assertRaises(s2.ManagementError) as cm:
429
workspace = self.manager.get_starter_workspace('bad id')
430
431
assert 'UUID' in cm.exception.msg, cm.exception.msg
432
433
def test_starter_workspaces(self):
434
workspaces = self.manager.starter_workspaces
435
ids = [x.id for x in workspaces]
436
names = [x.name for x in workspaces]
437
assert self.starter_workspace.id in ids
438
assert self.starter_workspace.name in names
439
440
objs = {}
441
for item in workspaces:
442
objs[item.id] = item
443
objs[item.name] = item
444
445
name = random.choice(names)
446
assert workspaces[name] == objs[name]
447
id = random.choice(ids)
448
assert workspaces[id] == objs[id]
449
450
def test_no_manager(self):
451
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
452
workspace._manager = None
453
454
with self.assertRaises(s2.ManagementError) as cm:
455
workspace.refresh()
456
457
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
458
459
with self.assertRaises(s2.ManagementError) as cm:
460
workspace.terminate()
461
462
assert 'No workspace manager' in cm.exception.msg, cm.exception.msg
463
464
def test_connect(self):
465
with self.starter_workspace.connect(
466
user=self.starter_username,
467
password=self.password,
468
) as conn:
469
with conn.cursor() as cur:
470
cur.execute('show databases')
471
assert self.database_name in [x[0] for x in list(cur)]
472
473
# Test missing endpoint
474
workspace = self.manager.get_starter_workspace(self.starter_workspace.id)
475
workspace.endpoint = None
476
477
with self.assertRaises(s2.ManagementError) as cm:
478
workspace.connect(user=self.starter_username, password=self.password)
479
480
assert 'endpoint' in cm.exception.msg, cm.exception.msg
481
482
483
@pytest.mark.management
484
class TestStage(unittest.TestCase):
485
486
manager = None
487
wg = None
488
password = None
489
490
@classmethod
491
def setUpClass(cls):
492
cls.manager = s2.manage_workspaces()
493
494
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
495
cls.password = secrets.token_urlsafe(20) + '-x&$'
496
497
name = clean_name(secrets.token_urlsafe(20)[:20])
498
499
cls.wg = cls.manager.create_workspace_group(
500
f'wg-test-{name}',
501
region=random.choice(us_regions).id,
502
admin_password=cls.password,
503
firewall_ranges=['0.0.0.0/0'],
504
)
505
506
@classmethod
507
def tearDownClass(cls):
508
if cls.wg is not None:
509
cls.wg.terminate(force=True)
510
cls.wg = None
511
cls.manager = None
512
cls.password = None
513
514
def test_upload_file(self):
515
st = self.wg.stage
516
517
upload_test_sql = f'upload_test_{id(self)}.sql'
518
upload_test2_sql = f'upload_test2_{id(self)}.sql'
519
520
root = st.info('/')
521
assert str(root.path) == '/'
522
assert root.type == 'directory'
523
524
# Upload file
525
f = st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)
526
assert str(f.path) == upload_test_sql
527
assert f.type == 'file'
528
529
# Download and compare to original
530
txt = f.download(encoding='utf-8')
531
assert txt == open(TEST_DIR / 'test.sql').read()
532
533
# Make sure we can't overwrite
534
with self.assertRaises(OSError):
535
st.upload_file(TEST_DIR / 'test.sql', upload_test_sql)
536
537
# Force overwrite with new content; use file object this time
538
f = st.upload_file(
539
open(TEST_DIR / 'test2.sql', 'r'),
540
upload_test_sql,
541
overwrite=True,
542
)
543
assert str(f.path) == upload_test_sql
544
assert f.type == 'file'
545
546
# Verify new content
547
txt = f.download(encoding='utf-8')
548
assert txt == open(TEST_DIR / 'test2.sql').read()
549
550
# Try to upload folder
551
with self.assertRaises(IsADirectoryError):
552
st.upload_file(TEST_DIR, 'test3.sql')
553
554
lib = st.mkdir('/lib/')
555
assert str(lib.path) == 'lib/'
556
assert lib.type == 'directory'
557
558
# Try to overwrite stage folder with file
559
with self.assertRaises(IsADirectoryError):
560
st.upload_file(TEST_DIR / 'test2.sql', lib.path, overwrite=True)
561
562
# Write file into folder
563
f = st.upload_file(
564
TEST_DIR / 'test2.sql',
565
os.path.join(lib.path, upload_test2_sql),
566
)
567
assert str(f.path) == 'lib/' + upload_test2_sql
568
assert f.type == 'file'
569
570
def test_open(self):
571
st = self.wg.stage
572
573
open_test_sql = f'open_test_{id(self)}.sql'
574
575
# See if error is raised for non-existent file
576
with self.assertRaises(s2.ManagementError):
577
st.open(open_test_sql, 'r')
578
579
# Load test file
580
st.upload_file(TEST_DIR / 'test.sql', open_test_sql)
581
582
# Read file using `open`
583
with st.open(open_test_sql, 'r') as rfile:
584
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
585
586
# Read file using `open` with 'rt' mode
587
with st.open(open_test_sql, 'rt') as rfile:
588
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
589
590
# Read file using `open` with 'rb' mode
591
with st.open(open_test_sql, 'rb') as rfile:
592
assert rfile.read() == open(TEST_DIR / 'test.sql', 'rb').read()
593
594
# Read file using `open` with 'rb' mode
595
with self.assertRaises(ValueError):
596
with st.open(open_test_sql, 'b') as rfile:
597
pass
598
599
# Attempt overwrite file using `open` with mode 'x'
600
with self.assertRaises(OSError):
601
with st.open(open_test_sql, 'x') as wfile:
602
pass
603
604
# Attempt overwrite file using `open` with mode 'w'
605
with st.open(open_test_sql, 'w') as wfile:
606
wfile.write(open(TEST_DIR / 'test2.sql').read())
607
608
txt = st.download_file(open_test_sql, encoding='utf-8')
609
610
assert txt == open(TEST_DIR / 'test2.sql').read()
611
612
open_raw_test_sql = f'open_raw_test_{id(self)}.sql'
613
614
# Test writer without context manager
615
wfile = st.open(open_raw_test_sql, 'w')
616
for line in open(TEST_DIR / 'test.sql'):
617
wfile.write(line)
618
wfile.close()
619
620
txt = st.download_file(open_raw_test_sql, encoding='utf-8')
621
622
assert txt == open(TEST_DIR / 'test.sql').read()
623
624
# Test reader without context manager
625
rfile = st.open(open_raw_test_sql, 'r')
626
txt = ''
627
for line in rfile:
628
txt += line
629
rfile.close()
630
631
assert txt == open(TEST_DIR / 'test.sql').read()
632
633
def test_obj_open(self):
634
st = self.wg.stage
635
636
obj_open_test_sql = f'obj_open_test_{id(self)}.sql'
637
obj_open_dir = f'obj_open_dir_{id(self)}'
638
639
# Load test file
640
f = st.upload_file(TEST_DIR / 'test.sql', obj_open_test_sql)
641
642
# Read file using `open`
643
with f.open() as rfile:
644
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
645
646
# Make sure directories error out
647
d = st.mkdir(obj_open_dir)
648
with self.assertRaises(IsADirectoryError):
649
d.open()
650
651
# Write file using `open`
652
with f.open('w', encoding='utf-8') as wfile:
653
wfile.write(open(TEST_DIR / 'test2.sql').read())
654
655
assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.sql').read()
656
657
# Test writer without context manager
658
wfile = f.open('w')
659
for line in open(TEST_DIR / 'test.sql'):
660
wfile.write(line)
661
wfile.close()
662
663
txt = st.download_file(f.path, encoding='utf-8')
664
665
assert txt == open(TEST_DIR / 'test.sql').read()
666
667
# Test reader without context manager
668
rfile = f.open('r')
669
txt = ''
670
for line in rfile:
671
txt += line
672
rfile.close()
673
674
assert txt == open(TEST_DIR / 'test.sql').read()
675
676
def test_os_directories(self):
677
st = self.wg.stage
678
679
mkdir_test_1 = f'mkdir_test_1_{id(self)}'
680
mkdir_test_2 = f'mkdir_test_2_{id(self)}'
681
mkdir_test_3 = f'mkdir_test_3_{id(self)}'
682
683
# mkdir
684
st.mkdir(mkdir_test_1)
685
st.mkdir(mkdir_test_2)
686
with self.assertRaises(s2.ManagementError):
687
st.mkdir(f'{mkdir_test_2}/nest_1/nest_2')
688
st.mkdir(f'{mkdir_test_2}/nest_1')
689
st.mkdir(f'{mkdir_test_2}/nest_1/nest_2')
690
st.mkdir(f'{mkdir_test_3}')
691
692
assert st.exists(f'{mkdir_test_1}/')
693
assert st.exists(f'{mkdir_test_2}/')
694
assert st.exists(f'{mkdir_test_2}/nest_1/')
695
assert st.exists(f'{mkdir_test_2}/nest_1/nest_2/')
696
assert not st.exists('foo/')
697
assert not st.exists('foo/bar/')
698
699
assert st.is_dir(f'{mkdir_test_1}/')
700
assert st.is_dir(f'{mkdir_test_2}/')
701
assert st.is_dir(f'{mkdir_test_2}/nest_1/')
702
assert st.is_dir(f'{mkdir_test_2}/nest_1/nest_2/')
703
704
assert not st.is_file(f'{mkdir_test_1}/')
705
assert not st.is_file(f'{mkdir_test_2}/')
706
assert not st.is_file(f'{mkdir_test_2}/nest_1/')
707
assert not st.is_file(f'{mkdir_test_2}/nest_1/nest_2/')
708
709
out = st.listdir('/')
710
assert f'{mkdir_test_1}/' in out
711
assert f'{mkdir_test_2}/' in out
712
assert f'{mkdir_test_2}/nest_1/nest_2/' not in out
713
714
out = st.listdir('/', recursive=True)
715
assert f'{mkdir_test_1}/' in out
716
assert f'{mkdir_test_2}/' in out
717
assert f'{mkdir_test_2}/nest_1/nest_2/' in out
718
719
out = st.listdir(mkdir_test_2)
720
assert f'{mkdir_test_1}/' not in out
721
assert 'nest_1/' in out
722
assert 'nest_2/' not in out
723
assert 'nest_1/nest_2/' not in out
724
725
out = st.listdir(mkdir_test_2, recursive=True)
726
assert f'{mkdir_test_1}/' not in out
727
assert 'nest_1/' in out
728
assert 'nest_2/' not in out
729
assert 'nest_1/nest_2/' in out
730
731
# rmdir
732
before = st.listdir('/', recursive=True)
733
st.rmdir(f'{mkdir_test_1}/')
734
after = st.listdir('/', recursive=True)
735
assert f'{mkdir_test_1}/' in before
736
assert f'{mkdir_test_1}/' not in after
737
assert list(sorted(before)) == list(sorted(after + [f'{mkdir_test_1}/']))
738
739
with self.assertRaises(OSError):
740
st.rmdir(f'{mkdir_test_2}/')
741
742
mkdir_test_sql = f'mkdir_test_{id(self)}.sql'
743
744
st.upload_file(TEST_DIR / 'test.sql', mkdir_test_sql)
745
746
with self.assertRaises(NotADirectoryError):
747
st.rmdir(mkdir_test_sql)
748
749
# removedirs
750
before = st.listdir('/')
751
st.removedirs(f'{mkdir_test_2}/')
752
after = st.listdir('/')
753
assert f'{mkdir_test_2}/' in before
754
assert f'{mkdir_test_2}/' not in after
755
assert list(sorted(before)) == list(sorted(after + [f'{mkdir_test_2}/']))
756
757
with self.assertRaises(s2.ManagementError):
758
st.removedirs(mkdir_test_sql)
759
760
def test_listdir_return_objects(self):
761
st = self.wg.stage
762
763
listdir_test_dir = f'listdir_test_{id(self)}'
764
listdir_test_sql = f'listdir_test_{id(self)}.sql'
765
766
# Create test directory structure
767
st.mkdir(listdir_test_dir)
768
st.mkdir(f'{listdir_test_dir}/nest_1')
769
st.upload_file(TEST_DIR / 'test.sql', listdir_test_sql)
770
st.upload_file(
771
TEST_DIR / 'test.sql',
772
f'{listdir_test_dir}/nested_test.sql',
773
)
774
775
# Test return_objects=False (default behavior)
776
out = st.listdir('/')
777
assert isinstance(out, list)
778
assert all(isinstance(item, str) for item in out)
779
assert f'{listdir_test_dir}/' in out
780
assert listdir_test_sql in out
781
782
# Test return_objects=True
783
out_objs = st.listdir('/', return_objects=True)
784
assert isinstance(out_objs, list)
785
assert all(hasattr(item, 'path') for item in out_objs)
786
assert all(hasattr(item, 'type') for item in out_objs)
787
788
# Verify we have the expected items
789
obj_paths = [obj.path for obj in out_objs]
790
assert f'{listdir_test_dir}/' in obj_paths
791
assert listdir_test_sql in obj_paths
792
793
# Verify object types
794
for obj in out_objs:
795
if obj.path == f'{listdir_test_dir}/':
796
assert obj.type == 'directory'
797
elif obj.path == listdir_test_sql:
798
assert obj.type == 'file'
799
800
# Test with subdirectory and return_objects=True
801
out_objs_sub = st.listdir(listdir_test_dir, return_objects=True)
802
assert isinstance(out_objs_sub, list)
803
obj_paths_sub = [obj.path for obj in out_objs_sub]
804
assert 'nest_1/' in obj_paths_sub
805
assert 'nested_test.sql' in obj_paths_sub
806
807
# Test recursive with return_objects=True
808
out_objs_rec = st.listdir('/', recursive=True, return_objects=True)
809
obj_paths_rec = [obj.path for obj in out_objs_rec]
810
assert f'{listdir_test_dir}/' in obj_paths_rec
811
assert f'{listdir_test_dir}/nest_1/' in obj_paths_rec
812
assert f'{listdir_test_dir}/nested_test.sql' in obj_paths_rec
813
814
def test_os_files(self):
815
st = self.wg.stage
816
817
files_test_sql = f'files_test_{id(self)}.sql'
818
files_test_1_dir = f'files_test_1_{id(self)}'
819
820
st.mkdir(files_test_1_dir)
821
st.mkdir(f'{files_test_1_dir}/nest_1')
822
823
st.upload_file(TEST_DIR / 'test.sql', files_test_sql)
824
st.upload_file(
825
TEST_DIR / 'test.sql',
826
f'{files_test_1_dir}/nest_1/nested_files_test.sql',
827
)
828
st.upload_file(
829
TEST_DIR / 'test.sql',
830
f'{files_test_1_dir}/nest_1/nested_files_test_2.sql',
831
)
832
833
# remove
834
with self.assertRaises(IsADirectoryError):
835
st.remove(f'{files_test_1_dir}/')
836
837
before = st.listdir('/')
838
st.remove(files_test_sql)
839
after = st.listdir('/')
840
assert files_test_sql in before
841
assert files_test_sql not in after
842
assert list(sorted(before)) == list(sorted(after + [files_test_sql]))
843
844
before = st.listdir(f'{files_test_1_dir}/nest_1/')
845
st.remove(f'{files_test_1_dir}/nest_1/nested_files_test.sql')
846
after = st.listdir(f'{files_test_1_dir}/nest_1/')
847
assert 'nested_files_test.sql' in before
848
assert 'nested_files_test.sql' not in after
849
assert st.is_dir(f'{files_test_1_dir}/nest_1/')
850
851
# Removing the last file does not remove empty directories
852
st.remove(f'{files_test_1_dir}/nest_1/nested_files_test_2.sql')
853
assert not st.is_file(f'{files_test_1_dir}/nest_1/nested_files_test_2.sql')
854
assert st.is_dir(f'{files_test_1_dir}/nest_1/')
855
assert st.is_dir(f'{files_test_1_dir}/')
856
857
st.removedirs(files_test_1_dir)
858
assert not st.is_dir(f'{files_test_1_dir}/nest_1/')
859
assert not st.is_dir(f'{files_test_1_dir}/')
860
861
def test_os_rename(self):
862
st = self.wg.stage
863
864
rename_test_sql = f'rename_test_{id(self)}.sql'
865
rename_test_2_sql = f'rename_test_2_{id(self)}.sql'
866
rename_test_1_dir = f'rename_test_1_{id(self)}'
867
rename_test_2_dir = f'rename_test_2_{id(self)}'
868
869
st.upload_file(TEST_DIR / 'test.sql', rename_test_sql)
870
871
with self.assertRaises(s2.ManagementError):
872
st.upload_file(
873
TEST_DIR / 'test.sql',
874
f'{rename_test_1_dir}/nest_1/nested_rename_test.sql',
875
)
876
877
st.mkdir(rename_test_1_dir)
878
st.mkdir(f'{rename_test_1_dir}/nest_1')
879
880
assert st.exists(f'/{rename_test_1_dir}/nest_1/')
881
882
st.upload_file(
883
TEST_DIR / 'test.sql',
884
f'{rename_test_1_dir}/nest_1/nested_rename_test.sql',
885
)
886
887
st.upload_file(
888
TEST_DIR / 'test.sql',
889
f'{rename_test_1_dir}/nest_1/nested_rename_test_2.sql',
890
)
891
892
# rename file
893
assert rename_test_sql in st.listdir('/')
894
assert rename_test_2_sql not in st.listdir('/')
895
st.rename(rename_test_sql, rename_test_2_sql)
896
assert rename_test_sql not in st.listdir('/')
897
assert rename_test_2_sql in st.listdir('/')
898
899
# rename directory
900
assert f'{rename_test_1_dir}/' in st.listdir('/')
901
assert f'{rename_test_2_dir}/' not in st.listdir('/')
902
st.rename(f'{rename_test_1_dir}/', f'{rename_test_2_dir}/')
903
assert f'{rename_test_1_dir}/' not in st.listdir('/')
904
assert f'{rename_test_2_dir}/' in st.listdir('/')
905
assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test.sql')
906
assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_2.sql')
907
908
# rename nested
909
assert f'{rename_test_2_dir}/nest_1/nested_rename_test.sql' in st.listdir(
910
'/', recursive=True,
911
)
912
assert f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql' not in st.listdir(
913
'/', recursive=True,
914
)
915
st.rename(
916
f'{rename_test_2_dir}/nest_1/nested_rename_test.sql',
917
f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql',
918
)
919
assert f'{rename_test_2_dir}/nest_1/nested_rename_test.sql' not in st.listdir(
920
'/', recursive=True,
921
)
922
assert f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql' in st.listdir(
923
'/', recursive=True,
924
)
925
assert not st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test.sql')
926
assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_2.sql')
927
assert st.is_file(f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql')
928
929
# non-existent file
930
with self.assertRaises(OSError):
931
st.rename('rename_foo.sql', 'rename_foo_2.sql')
932
933
# overwrite
934
with self.assertRaises(OSError):
935
st.rename(
936
rename_test_2_sql,
937
f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql',
938
)
939
940
st.rename(
941
rename_test_2_sql,
942
f'{rename_test_2_dir}/nest_1/nested_rename_test_3.sql', overwrite=True,
943
)
944
945
def test_file_object(self):
946
st = self.wg.stage
947
948
obj_test_dir = f'obj_test_{id(self)}'
949
950
st.mkdir(obj_test_dir)
951
st.mkdir(f'{obj_test_dir}/nest_1')
952
953
obj_test_sql = f'obj_test_{id(self)}.sql'
954
obj_test_2_sql = f'obj_test_2_{id(self)}.sql'
955
956
f1 = st.upload_file(TEST_DIR / 'test.sql', obj_test_sql)
957
f2 = st.upload_file(
958
TEST_DIR / 'test.sql',
959
f'{obj_test_dir}/nest_1/{obj_test_sql}',
960
)
961
d2 = st.info(f'{obj_test_dir}/nest_1/')
962
963
# is_file / is_dir
964
assert not f1.is_dir()
965
assert f1.is_file()
966
assert not f2.is_dir()
967
assert f2.is_file()
968
assert d2.is_dir()
969
assert not d2.is_file()
970
971
# abspath / basename / dirname / exists
972
assert f1.abspath() == obj_test_sql
973
assert f1.basename() == obj_test_sql
974
assert f1.dirname() == '/'
975
assert f1.exists()
976
assert f2.abspath() == f'{obj_test_dir}/nest_1/{obj_test_sql}'
977
assert f2.basename() == obj_test_sql
978
assert f2.dirname() == f'{obj_test_dir}/nest_1/'
979
assert f2.exists()
980
assert d2.abspath() == f'{obj_test_dir}/nest_1/'
981
assert d2.basename() == 'nest_1'
982
assert d2.dirname() == f'{obj_test_dir}/'
983
assert d2.exists()
984
985
# download
986
assert f1.download(encoding='utf-8') == open(TEST_DIR / 'test.sql', 'r').read()
987
assert f1.download() == open(TEST_DIR / 'test.sql', 'rb').read()
988
989
# remove
990
with self.assertRaises(IsADirectoryError):
991
d2.remove()
992
993
assert st.is_file(obj_test_sql)
994
f1.remove()
995
assert not st.is_file(obj_test_sql)
996
997
# removedirs
998
with self.assertRaises(NotADirectoryError):
999
f2.removedirs()
1000
1001
assert st.exists(d2.path)
1002
d2.removedirs()
1003
assert not st.exists(d2.path)
1004
1005
# rmdir
1006
f1 = st.upload_file(TEST_DIR / 'test.sql', obj_test_sql)
1007
d2 = st.mkdir(f'{obj_test_dir}/nest_1')
1008
1009
assert st.exists(f1.path)
1010
assert st.exists(d2.path)
1011
1012
with self.assertRaises(NotADirectoryError):
1013
f1.rmdir()
1014
1015
assert st.exists(f1.path)
1016
assert st.exists(d2.path)
1017
1018
d2.rmdir()
1019
1020
assert not st.exists(f'{obj_test_dir}/nest_1/')
1021
assert not st.exists(obj_test_dir)
1022
1023
# mtime / ctime
1024
assert f1.getmtime() > 0
1025
assert f1.getctime() > 0
1026
1027
# rename
1028
assert st.exists(obj_test_sql)
1029
assert not st.exists(obj_test_2_sql)
1030
f1.rename(obj_test_2_sql)
1031
assert not st.exists(obj_test_sql)
1032
assert st.exists(obj_test_2_sql)
1033
assert f1.abspath() == obj_test_2_sql
1034
1035
1036
@pytest.mark.management
1037
class TestSecrets(unittest.TestCase):
1038
1039
manager = None
1040
wg = None
1041
password = None
1042
1043
@classmethod
1044
def setUpClass(cls):
1045
cls.manager = s2.manage_workspaces()
1046
1047
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
1048
cls.password = secrets.token_urlsafe(20) + '-x&$'
1049
1050
name = clean_name(secrets.token_urlsafe(20)[:20])
1051
1052
cls.wg = cls.manager.create_workspace_group(
1053
f'wg-test-{name}',
1054
region=random.choice(us_regions).id,
1055
admin_password=cls.password,
1056
firewall_ranges=['0.0.0.0/0'],
1057
)
1058
1059
@classmethod
1060
def tearDownClass(cls):
1061
if cls.wg is not None:
1062
cls.wg.terminate(force=True)
1063
cls.wg = None
1064
cls.manager = None
1065
cls.password = None
1066
1067
def test_get_secret(self):
1068
# manually create secret and then get secret
1069
# try to delete the secret if it exists
1070
try:
1071
secret = self.manager.organizations.current.get_secret('secret_name')
1072
1073
secret_id = secret.id
1074
1075
self.manager._delete(f'secrets/{secret_id}')
1076
except s2.ManagementError:
1077
pass
1078
1079
self.manager._post(
1080
'secrets',
1081
json=dict(
1082
name='secret_name',
1083
value='secret_value',
1084
),
1085
)
1086
1087
secret = self.manager.organizations.current.get_secret('secret_name')
1088
1089
assert secret.name == 'secret_name'
1090
assert secret.value == 'secret_value'
1091
1092
1093
@pytest.mark.management
1094
class TestJob(unittest.TestCase):
1095
1096
manager = None
1097
workspace_group = None
1098
workspace = None
1099
password = None
1100
job_ids = []
1101
1102
@classmethod
1103
def setUpClass(cls):
1104
cls.manager = s2.manage_workspaces()
1105
1106
us_regions = [x for x in cls.manager.regions if 'US' in x.name]
1107
cls.password = secrets.token_urlsafe(20) + '-x&$'
1108
1109
name = clean_name(secrets.token_urlsafe(20)[:20])
1110
1111
cls.workspace_group = cls.manager.create_workspace_group(
1112
f'wg-test-{name}',
1113
region=random.choice(us_regions).id,
1114
admin_password=cls.password,
1115
firewall_ranges=['0.0.0.0/0'],
1116
)
1117
1118
try:
1119
cls.workspace = cls.workspace_group.create_workspace(
1120
f'ws-test-{name}-x',
1121
wait_on_active=True,
1122
)
1123
except Exception:
1124
cls.workspace_group.terminate(force=True)
1125
raise
1126
1127
@classmethod
1128
def tearDownClass(cls):
1129
for job_id in cls.job_ids:
1130
try:
1131
cls.manager.organizations.current.jobs.delete(job_id)
1132
except Exception:
1133
pass
1134
if cls.workspace_group is not None:
1135
cls.workspace_group.terminate(force=True)
1136
cls.workspace_group = None
1137
cls.workspace = None
1138
cls.manager = None
1139
cls.password = None
1140
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
1141
del os.environ['SINGLESTOREDB_WORKSPACE']
1142
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
1143
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
1144
1145
def test_job_without_database_target(self):
1146
"""
1147
Creates job without target database on a specific runtime
1148
Waits for job to finish
1149
Gets the job
1150
Deletes the job
1151
"""
1152
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
1153
del os.environ['SINGLESTOREDB_WORKSPACE']
1154
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
1155
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
1156
1157
job_manager = self.manager.organizations.current.jobs
1158
job = job_manager.run(
1159
'Scheduling Test.ipynb',
1160
'notebooks-cpu-small',
1161
{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},
1162
)
1163
self.job_ids.append(job.job_id)
1164
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1165
assert job.schedule.mode == job_manager.modes().ONCE
1166
assert not job.execution_config.create_snapshot
1167
assert job.completed_executions_count == 0
1168
assert job.name is None
1169
assert job.description is None
1170
assert job.job_metadata == []
1171
assert job.terminated_at is None
1172
assert job.target_config is None
1173
job.wait()
1174
job = job_manager.get(job.job_id)
1175
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1176
assert job.schedule.mode == job_manager.modes().ONCE
1177
assert not job.execution_config.create_snapshot
1178
assert job.completed_executions_count == 1
1179
assert job.name is None
1180
assert job.description is None
1181
assert job.job_metadata != []
1182
assert len(job.job_metadata) == 1
1183
assert job.job_metadata[0].count == 1
1184
assert job.job_metadata[0].status == Status.COMPLETED
1185
assert job.terminated_at is None
1186
assert job.target_config is None
1187
deleted = job.delete()
1188
assert deleted
1189
job = job_manager.get(job.job_id)
1190
assert job.terminated_at is not None
1191
1192
def test_job_with_database_target(self):
1193
"""
1194
Creates job with target database on a specific runtime
1195
Waits for job to finish
1196
Gets the job
1197
Deletes the job
1198
"""
1199
os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'
1200
os.environ['SINGLESTOREDB_WORKSPACE'] = self.workspace.id
1201
1202
job_manager = self.manager.organizations.current.jobs
1203
job = job_manager.run(
1204
'Scheduling Test.ipynb',
1205
'notebooks-cpu-small',
1206
{'strParam': 'string', 'intParam': 1, 'floatParam': 1.0, 'boolParam': True},
1207
)
1208
self.job_ids.append(job.job_id)
1209
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1210
assert job.schedule.mode == job_manager.modes().ONCE
1211
assert not job.execution_config.create_snapshot
1212
assert job.completed_executions_count == 0
1213
assert job.name is None
1214
assert job.description is None
1215
assert job.job_metadata == []
1216
assert job.terminated_at is None
1217
assert job.target_config is not None
1218
assert job.target_config.database_name == 'information_schema'
1219
assert job.target_config.target_id == self.workspace.id
1220
assert job.target_config.target_type == TargetType.WORKSPACE
1221
assert not job.target_config.resume_target
1222
job.wait()
1223
job = job_manager.get(job.job_id)
1224
assert job.execution_config.notebook_path == 'Scheduling Test.ipynb'
1225
assert job.schedule.mode == job_manager.modes().ONCE
1226
assert not job.execution_config.create_snapshot
1227
assert job.completed_executions_count == 1
1228
assert job.name is None
1229
assert job.description is None
1230
assert job.job_metadata != []
1231
assert len(job.job_metadata) == 1
1232
assert job.job_metadata[0].count == 1
1233
assert job.job_metadata[0].status == Status.COMPLETED
1234
assert job.terminated_at is None
1235
assert job.target_config is not None
1236
assert job.target_config.database_name == 'information_schema'
1237
assert job.target_config.target_id == self.workspace.id
1238
assert job.target_config.target_type == TargetType.WORKSPACE
1239
assert not job.target_config.resume_target
1240
deleted = job.delete()
1241
assert deleted
1242
job = job_manager.get(job.job_id)
1243
assert job.terminated_at is not None
1244
1245
1246
@pytest.mark.management
1247
class TestFileSpaces(unittest.TestCase):
1248
1249
manager = None
1250
personal_space = None
1251
shared_space = None
1252
1253
@classmethod
1254
def setUpClass(cls):
1255
cls.manager = s2.manage_files()
1256
cls.personal_space = cls.manager.personal_space
1257
cls.shared_space = cls.manager.shared_space
1258
1259
@classmethod
1260
def tearDownClass(cls):
1261
cls.manager = None
1262
cls.personal_space = None
1263
cls.shared_space = None
1264
1265
def test_upload_file(self):
1266
upload_test_ipynb = f'upload_test_{id(self)}.ipynb'
1267
1268
for space in [self.personal_space, self.shared_space]:
1269
root = space.info('/')
1270
assert str(root.path) == '/'
1271
assert root.type == 'directory'
1272
1273
# Upload files
1274
f = space.upload_file(
1275
TEST_DIR / 'test.ipynb',
1276
upload_test_ipynb,
1277
)
1278
assert str(f.path) == upload_test_ipynb
1279
assert f.type == 'notebook'
1280
1281
# Download and compare to original
1282
txt = f.download(encoding='utf-8')
1283
assert txt == open(TEST_DIR / 'test.ipynb').read()
1284
1285
# Make sure we can't overwrite
1286
with self.assertRaises(OSError):
1287
space.upload_file(
1288
TEST_DIR / 'test.ipynb',
1289
upload_test_ipynb,
1290
)
1291
1292
# Force overwrite with new content
1293
f = space.upload_file(
1294
TEST_DIR / 'test2.ipynb',
1295
upload_test_ipynb, overwrite=True,
1296
)
1297
assert str(f.path) == upload_test_ipynb
1298
assert f.type == 'notebook'
1299
1300
# Verify new content
1301
txt = f.download(encoding='utf-8')
1302
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1303
1304
# Make sure we can't upload a folder
1305
with self.assertRaises(s2.ManagementError):
1306
space.upload_folder(TEST_DIR, 'test')
1307
1308
# Cleanup
1309
space.remove(upload_test_ipynb)
1310
1311
def test_upload_file_io(self):
1312
upload_test_ipynb = f'upload_test_{id(self)}.ipynb'
1313
1314
for space in [self.personal_space, self.shared_space]:
1315
root = space.info('/')
1316
assert str(root.path) == '/'
1317
assert root.type == 'directory'
1318
1319
# Upload files
1320
f = space.upload_file(
1321
open(TEST_DIR / 'test.ipynb', 'r'),
1322
upload_test_ipynb,
1323
)
1324
assert str(f.path) == upload_test_ipynb
1325
assert f.type == 'notebook'
1326
1327
# Download and compare to original
1328
txt = f.download(encoding='utf-8')
1329
assert txt == open(TEST_DIR / 'test.ipynb').read()
1330
1331
# Make sure we can't overwrite
1332
with self.assertRaises(OSError):
1333
space.upload_file(
1334
open(TEST_DIR / 'test.ipynb', 'r'),
1335
upload_test_ipynb,
1336
)
1337
1338
# Force overwrite with new content
1339
f = space.upload_file(
1340
open(TEST_DIR / 'test2.ipynb', 'r'),
1341
upload_test_ipynb, overwrite=True,
1342
)
1343
assert str(f.path) == upload_test_ipynb
1344
assert f.type == 'notebook'
1345
1346
# Verify new content
1347
txt = f.download(encoding='utf-8')
1348
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1349
1350
# Make sure we can't upload a folder
1351
with self.assertRaises(s2.ManagementError):
1352
space.upload_folder(TEST_DIR, 'test')
1353
1354
# Cleanup
1355
space.remove(upload_test_ipynb)
1356
1357
def test_open(self):
1358
for space in [self.personal_space, self.shared_space]:
1359
open_test_ipynb = f'open_test_ipynb_{id(self)}.ipynb'
1360
1361
# See if error is raised for non-existent file
1362
with self.assertRaises(s2.ManagementError):
1363
space.open(open_test_ipynb, 'r')
1364
1365
# Load test file
1366
space.upload_file(TEST_DIR / 'test.ipynb', open_test_ipynb)
1367
1368
# Read file using `open`
1369
with space.open(open_test_ipynb, 'r') as rfile:
1370
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1371
1372
# Read file using `open` with 'rt' mode
1373
with space.open(open_test_ipynb, 'rt') as rfile:
1374
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1375
1376
# Read file using `open` with 'rb' mode
1377
with space.open(open_test_ipynb, 'rb') as rfile:
1378
assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read()
1379
1380
# Read file using `open` with 'rb' mode
1381
with self.assertRaises(ValueError):
1382
with space.open(open_test_ipynb, 'b') as rfile:
1383
pass
1384
1385
# Attempt overwrite file using `open` with mode 'x'
1386
with self.assertRaises(OSError):
1387
with space.open(open_test_ipynb, 'x') as wfile:
1388
pass
1389
1390
# Attempt overwrite file using `open` with mode 'w'
1391
with space.open(open_test_ipynb, 'w') as wfile:
1392
wfile.write(open(TEST_DIR / 'test2.ipynb').read())
1393
1394
txt = space.download_file(open_test_ipynb, encoding='utf-8')
1395
1396
assert txt == open(TEST_DIR / 'test2.ipynb').read()
1397
1398
open_raw_test_ipynb = f'open_raw_test_{id(self)}.ipynb'
1399
1400
# Test writer without context manager
1401
wfile = space.open(open_raw_test_ipynb, 'w')
1402
for line in open(TEST_DIR / 'test.ipynb'):
1403
wfile.write(line)
1404
wfile.close()
1405
1406
txt = space.download_file(
1407
open_raw_test_ipynb,
1408
encoding='utf-8',
1409
)
1410
1411
assert txt == open(TEST_DIR / 'test.ipynb').read()
1412
1413
# Test reader without context manager
1414
rfile = space.open(open_raw_test_ipynb, 'r')
1415
txt = ''
1416
for line in rfile:
1417
txt += line
1418
rfile.close()
1419
1420
assert txt == open(TEST_DIR / 'test.ipynb').read()
1421
1422
# Cleanup
1423
space.remove(open_test_ipynb)
1424
space.remove(open_raw_test_ipynb)
1425
1426
def test_obj_open(self):
1427
for space in [self.personal_space, self.shared_space]:
1428
obj_open_test_ipynb = f'obj_open_test_{id(self)}.ipynb'
1429
obj_open_dir = f'obj_open_dir_{id(self)}'
1430
1431
# Load test file
1432
f = space.upload_file(
1433
TEST_DIR / 'test.ipynb',
1434
obj_open_test_ipynb,
1435
)
1436
1437
# Read file using `open`
1438
with f.open() as rfile:
1439
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
1440
1441
# Make sure directories error out
1442
with self.assertRaises(s2.ManagementError):
1443
space.mkdir(obj_open_dir)
1444
1445
# Write file using `open`
1446
with f.open('w', encoding='utf-8') as wfile:
1447
wfile.write(open(TEST_DIR / 'test2.ipynb').read())
1448
1449
assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.ipynb').read()
1450
1451
# Test writer without context manager
1452
wfile = f.open('w')
1453
for line in open(TEST_DIR / 'test.ipynb'):
1454
wfile.write(line)
1455
wfile.close()
1456
1457
txt = space.download_file(f.path, encoding='utf-8')
1458
1459
assert txt == open(TEST_DIR / 'test.ipynb').read()
1460
1461
# Test reader without context manager
1462
rfile = f.open('r')
1463
txt = ''
1464
for line in rfile:
1465
txt += line
1466
rfile.close()
1467
1468
assert txt == open(TEST_DIR / 'test.ipynb').read()
1469
1470
# Cleanup
1471
space.remove(obj_open_test_ipynb)
1472
1473
def test_os_directories(self):
1474
mkdir_test_1_dir = f'mkdir_test_1_{id(self)}'
1475
1476
for space in [self.personal_space, self.shared_space]:
1477
# Make sure directories error out
1478
with self.assertRaises(s2.ManagementError):
1479
space.mkdir(mkdir_test_1_dir)
1480
1481
with self.assertRaises(s2.ManagementError):
1482
space.exists(f'{mkdir_test_1_dir}/')
1483
1484
out = space.listdir('/')
1485
assert f'{mkdir_test_1_dir}/' not in out
1486
1487
with self.assertRaises(s2.ManagementError):
1488
space.rmdir(f'{mkdir_test_1_dir}/')
1489
1490
def test_os_rename(self):
1491
rename_test_ipynb = f'rename_test_{id(self)}.ipynb'
1492
rename_test_2_ipynb = f'rename_test_2_{id(self)}.ipynb'
1493
rename_test_3_ipynb = f'rename_test_3_{id(self)}.ipynb'
1494
1495
for space in [self.personal_space, self.shared_space]:
1496
space.upload_file(
1497
TEST_DIR / 'test.ipynb',
1498
rename_test_ipynb,
1499
)
1500
assert rename_test_ipynb in space.listdir('/')
1501
assert rename_test_2_ipynb not in space.listdir('/')
1502
1503
space.rename(
1504
rename_test_ipynb,
1505
rename_test_2_ipynb,
1506
)
1507
assert rename_test_ipynb not in space.listdir('/')
1508
assert rename_test_2_ipynb in space.listdir('/')
1509
1510
# non-existent file
1511
with self.assertRaises(OSError):
1512
space.rename('rename_foo.ipynb', 'rename_foo_2.ipynb')
1513
1514
space.upload_file(
1515
TEST_DIR / 'test.ipynb',
1516
rename_test_3_ipynb,
1517
)
1518
1519
# overwrite
1520
with self.assertRaises(OSError):
1521
space.rename(
1522
rename_test_2_ipynb,
1523
rename_test_3_ipynb,
1524
)
1525
1526
space.rename(
1527
rename_test_2_ipynb,
1528
rename_test_3_ipynb, overwrite=True,
1529
)
1530
1531
# Cleanup
1532
space.remove(rename_test_3_ipynb)
1533
1534
def test_file_object(self):
1535
obj_test_ipynb = f'obj_test_{id(self)}.ipynb'
1536
obj_test_2_ipynb = f'obj_test_2_{id(self)}.ipynb'
1537
1538
for space in [self.personal_space, self.shared_space]:
1539
f = space.upload_file(
1540
TEST_DIR / 'test.ipynb',
1541
obj_test_ipynb,
1542
)
1543
1544
assert not f.is_dir()
1545
assert f.is_file()
1546
1547
# abspath / basename / dirname / exists
1548
assert f.abspath() == obj_test_ipynb
1549
assert f.basename() == obj_test_ipynb
1550
assert f.dirname() == '/'
1551
assert f.exists()
1552
1553
# download
1554
assert f.download(encoding='utf-8') == \
1555
open(TEST_DIR / 'test.ipynb', 'r').read()
1556
assert f.download() == open(TEST_DIR / 'test.ipynb', 'rb').read()
1557
1558
assert space.is_file(obj_test_ipynb)
1559
f.remove()
1560
assert not space.is_file(obj_test_ipynb)
1561
1562
# mtime / ctime
1563
assert f.getmtime() > 0
1564
assert f.getctime() > 0
1565
1566
# rename
1567
f = space.upload_file(
1568
TEST_DIR / 'test.ipynb',
1569
obj_test_ipynb,
1570
)
1571
assert space.exists(obj_test_ipynb)
1572
assert not space.exists(obj_test_2_ipynb)
1573
f.rename(obj_test_2_ipynb)
1574
assert not space.exists(obj_test_ipynb)
1575
assert space.exists(obj_test_2_ipynb)
1576
assert f.abspath() == obj_test_2_ipynb
1577
1578
# Cleanup
1579
space.remove(obj_test_2_ipynb)
1580
1581
1582
@pytest.mark.management
1583
class TestRegions(unittest.TestCase):
1584
"""Test cases for region management."""
1585
1586
manager = None
1587
1588
@classmethod
1589
def setUpClass(cls):
1590
"""Set up the test environment."""
1591
cls.manager = s2.manage_regions()
1592
1593
@classmethod
1594
def tearDownClass(cls):
1595
"""Clean up the test environment."""
1596
cls.manager = None
1597
1598
def test_list_regions(self):
1599
"""Test listing all regions."""
1600
regions = self.manager.list_regions()
1601
1602
# Verify we get a NamedList
1603
assert isinstance(regions, NamedList)
1604
1605
# Verify we have at least one region
1606
assert len(regions) > 0
1607
1608
# Verify region properties
1609
region = regions[0]
1610
assert isinstance(region, Region)
1611
assert hasattr(region, 'id')
1612
assert hasattr(region, 'name')
1613
assert hasattr(region, 'provider')
1614
1615
# Verify provider values
1616
providers = {x.provider for x in regions}
1617
assert 'Azure' in providers or 'GCP' in providers or 'AWS' in providers
1618
1619
def test_list_shared_tier_regions(self):
1620
"""Test listing shared tier regions."""
1621
regions = self.manager.list_shared_tier_regions()
1622
1623
# Verify we get a NamedList
1624
assert isinstance(regions, NamedList)
1625
1626
# Verify region properties if we have any shared tier regions
1627
if regions:
1628
region = regions[0]
1629
assert isinstance(region, Region)
1630
assert hasattr(region, 'name')
1631
assert hasattr(region, 'provider')
1632
assert hasattr(region, 'region_name')
1633
1634
# Verify provider values
1635
providers = {x.provider for x in regions}
1636
assert any(p in providers for p in ['Azure', 'GCP', 'AWS'])
1637
1638
def test_str_repr(self):
1639
"""Test string representation of regions."""
1640
regions = self.manager.list_regions()
1641
if not regions:
1642
self.skipTest('No regions available for testing')
1643
1644
region = regions[0]
1645
1646
# Test __str__
1647
s = str(region)
1648
assert region.id in s
1649
assert region.name in s
1650
assert region.provider in s
1651
1652
# Test __repr__
1653
assert repr(region) == str(region)
1654
1655