Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/tests/test_fusion.py
469 views
1
#!/usr/bin/env python
2
# type: ignore
3
"""SingleStoreDB Fusion testing."""
4
import os
5
import random
6
import secrets
7
import tempfile
8
import time
9
import unittest
10
from typing import Any
11
from typing import List
12
13
import pytest
14
15
import singlestoredb as s2
16
from singlestoredb.tests import utils
17
18
19
class TestFusion(unittest.TestCase):
20
21
dbname: str = ''
22
dbexisted: bool = False
23
24
@classmethod
25
def setUpClass(cls):
26
sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')
27
cls.dbname, cls.dbexisted = utils.load_sql(sql_file)
28
29
@classmethod
30
def tearDownClass(cls):
31
if not cls.dbexisted:
32
utils.drop_database(cls.dbname)
33
34
def setUp(self):
35
self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')
36
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'
37
self.conn = s2.connect(database=type(self).dbname, local_infile=True)
38
self.cur = self.conn.cursor()
39
40
def tearDown(self):
41
if self.enabled:
42
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled
43
else:
44
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
45
46
try:
47
if self.cur is not None:
48
self.cur.close()
49
except Exception:
50
# traceback.print_exc()
51
pass
52
53
try:
54
if self.conn is not None:
55
self.conn.close()
56
except Exception:
57
# traceback.print_exc()
58
pass
59
60
def test_env_var(self):
61
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '0'
62
63
with self.assertRaises(s2.ProgrammingError):
64
self.cur.execute('show fusion commands')
65
66
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
67
68
with self.assertRaises(s2.ProgrammingError):
69
self.cur.execute('show fusion commands')
70
71
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = 'yes'
72
73
self.cur.execute('show fusion commands')
74
assert list(self.cur)
75
76
def test_show_commands(self):
77
self.cur.execute('show fusion commands')
78
cmds = [x[0] for x in self.cur.fetchall()]
79
assert cmds
80
assert [x for x in cmds if x.strip().startswith('SHOW FUSION GRAMMAR')], cmds
81
82
self.cur.execute('show fusion commands like "create%"')
83
cmds = [x[0] for x in self.cur.fetchall()]
84
assert cmds
85
assert [x for x in cmds if x.strip().startswith('CREATE')] == cmds, cmds
86
87
def test_show_grammar(self):
88
self.cur.execute('show fusion grammar for "create workspace"')
89
cmds = [x[0] for x in self.cur.fetchall()]
90
assert cmds
91
assert [x for x in cmds if x.strip().startswith('CREATE WORKSPACE')], cmds
92
93
94
@pytest.mark.management
95
class TestWorkspaceFusion(unittest.TestCase):
96
97
id: str = secrets.token_hex(8)
98
dbname: str = ''
99
dbexisted: bool = False
100
workspace_groups: List[Any] = []
101
102
@classmethod
103
def setUpClass(cls):
104
sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')
105
cls.dbname, cls.dbexisted = utils.load_sql(sql_file)
106
mgr = s2.manage_workspaces()
107
us_regions = [x for x in mgr.regions if x.name.startswith('US')]
108
non_us_regions = [x for x in mgr.regions if not x.name.startswith('US')]
109
wg = mgr.create_workspace_group(
110
f'A Fusion Testing {cls.id}',
111
region=random.choice(us_regions),
112
firewall_ranges=[],
113
)
114
cls.workspace_groups.append(wg)
115
wg = mgr.create_workspace_group(
116
f'B Fusion Testing {cls.id}',
117
region=random.choice(us_regions),
118
firewall_ranges=[],
119
)
120
cls.workspace_groups.append(wg)
121
wg = mgr.create_workspace_group(
122
f'C Fusion Testing {cls.id}',
123
region=random.choice(non_us_regions),
124
firewall_ranges=[],
125
)
126
cls.workspace_groups.append(wg)
127
128
@classmethod
129
def tearDownClass(cls):
130
if not cls.dbexisted:
131
utils.drop_database(cls.dbname)
132
while cls.workspace_groups:
133
cls.workspace_groups.pop().terminate(force=True)
134
135
def setUp(self):
136
self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')
137
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'
138
self.conn = s2.connect(database=type(self).dbname, local_infile=True)
139
self.cur = self.conn.cursor()
140
141
def tearDown(self):
142
if self.enabled:
143
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled
144
else:
145
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
146
147
try:
148
if self.cur is not None:
149
self.cur.close()
150
except Exception:
151
# traceback.print_exc()
152
pass
153
154
try:
155
if self.conn is not None:
156
self.conn.close()
157
except Exception:
158
# traceback.print_exc()
159
pass
160
161
def test_show_regions(self):
162
self.cur.execute('show regions')
163
regs = list(self.cur)
164
desc = self.cur.description
165
166
us_regs = [x for x in regs if x[0].startswith('US')]
167
168
assert len(desc) == 3
169
assert len(regs) > 5
170
assert len(us_regs) > 5
171
172
# LIKE
173
self.cur.execute('show regions like "US%"')
174
regs = list(self.cur)
175
assert regs == us_regs
176
177
# LIMIT
178
self.cur.execute('show regions like "US%" limit 3')
179
regs = list(self.cur)
180
assert len(regs) == 3
181
182
# ORDER BY
183
self.cur.execute('show regions like "US%" limit 3 order by name')
184
regs = list(self.cur)
185
assert len(regs) == 3
186
assert regs == list(sorted(regs, key=lambda x: x[0]))
187
188
# Wrong column
189
with self.assertRaises(KeyError):
190
self.cur.execute('show regions like "US%" limit 3 order by foo')
191
192
def test_show_workspace_groups(self):
193
self.cur.execute('show workspace groups')
194
wgs = list(self.cur)
195
desc = self.cur.description
196
197
assert len(desc) == 4
198
assert desc[0].name == 'Name'
199
assert desc[1].name == 'ID'
200
assert desc[2].name == 'Region'
201
assert desc[3].name == 'FirewallRanges'
202
assert len(wgs) >= 3
203
204
names = [x[0] for x in wgs]
205
assert f'A Fusion Testing {self.id}' in names
206
assert f'B Fusion Testing {self.id}' in names
207
assert f'C Fusion Testing {self.id}' in names
208
209
# LIKE clause
210
self.cur.execute(f'show workspace groups like "A%sion Testing {self.id}"')
211
wgs = list(self.cur)
212
213
names = [x[0] for x in wgs]
214
assert f'A Fusion Testing {self.id}' in names
215
assert f'B Fusion Testing {self.id}' not in names
216
assert f'C Fusion Testing {self.id}' not in names
217
218
# LIMIT clause
219
self.cur.execute('show workspace groups limit 2')
220
wgs = list(self.cur)
221
assert len(wgs) == 2
222
223
# EXTENDED attributes
224
self.cur.execute('show workspace groups extended')
225
wgs = list(self.cur)
226
desc = self.cur.description
227
228
assert len(desc) == 6
229
assert desc[4].name == 'CreatedAt'
230
assert desc[5].name == 'TerminatedAt'
231
232
# ORDER BY
233
self.cur.execute(
234
f'show workspace groups like "% Fusion Testing {self.id}" order by name desc',
235
)
236
wgs = list(self.cur)
237
238
names = [x[0] for x in wgs]
239
assert names == [
240
f'C Fusion Testing {self.id}',
241
f'B Fusion Testing {self.id}',
242
f'A Fusion Testing {self.id}',
243
]
244
245
# All options
246
self.cur.execute(
247
f'show workspace groups like "% Fusion Testing {self.id}" '
248
'extended order by name desc limit 2',
249
)
250
wgs = list(self.cur)
251
desc = self.cur.description
252
names = [x[0] for x in wgs]
253
254
assert len(desc) == 6
255
assert names == [f'C Fusion Testing {self.id}', f'B Fusion Testing {self.id}']
256
257
def test_show_workspaces(self):
258
mgr = s2.manage_workspaces()
259
wg = mgr.workspace_groups[f'B Fusion Testing {self.id}']
260
261
self.cur.execute(
262
'create workspace show-ws-1 in group '
263
f'"B Fusion Testing {self.id}" with size S-00',
264
)
265
self.cur.execute(
266
'create workspace show-ws-2 in group '
267
f'"B Fusion Testing {self.id}" with size S-00',
268
)
269
self.cur.execute(
270
'create workspace show-ws-3 in group '
271
f'"B Fusion Testing {self.id}" with size S-00',
272
)
273
274
time.sleep(30)
275
iterations = 20
276
while True:
277
wgs = wg.workspaces
278
states = [
279
x.state for x in wgs
280
if x.name in ('show-ws-1', 'show-ws-2', 'show-ws-3')
281
]
282
if len(states) == 3 and states.count('ACTIVE') == 3:
283
break
284
iterations -= 1
285
if not iterations:
286
raise RuntimeError('timed out waiting for workspaces to start')
287
time.sleep(30)
288
289
# SHOW
290
self.cur.execute(f'show workspaces in group "B Fusion Testing {self.id}"')
291
desc = self.cur.description
292
out = list(self.cur)
293
names = [x[0] for x in out]
294
assert len(desc) == 4
295
assert [x[0] for x in desc] == ['Name', 'ID', 'Size', 'State']
296
assert len(out) >= 3
297
assert 'show-ws-1' in names
298
assert 'show-ws-2' in names
299
assert 'show-ws-3' in names
300
301
# SHOW ID
302
self.cur.execute(f'show workspaces in group id {wg.id}')
303
desc = self.cur.description
304
out = list(self.cur)
305
names = [x[0] for x in out]
306
assert len(desc) == 4
307
assert [x[0] for x in desc] == ['Name', 'ID', 'Size', 'State']
308
assert len(out) >= 3
309
assert 'show-ws-1' in names
310
assert 'show-ws-2' in names
311
assert 'show-ws-3' in names
312
313
# LIKE clause
314
self.cur.execute(
315
'show workspaces in group '
316
f'"B Fusion Testing {self.id}" like "%2"',
317
)
318
out = list(self.cur)
319
names = [x[0] for x in out]
320
assert len(out) >= 1
321
assert [x for x in names if x.endswith('2')]
322
assert 'show-ws-1' not in names
323
assert 'show-ws-2' in names
324
assert 'show-ws-3' not in names
325
326
# Extended attributes
327
self.cur.execute(
328
'show workspaces in group '
329
f'"B Fusion Testing {self.id}" extended',
330
)
331
desc = self.cur.description
332
out = list(self.cur)
333
assert len(desc) == 7
334
assert [x[0] for x in desc] == [
335
'Name', 'ID', 'Size', 'State',
336
'Endpoint', 'CreatedAt', 'TerminatedAt',
337
]
338
339
# ORDER BY
340
self.cur.execute(
341
'show workspaces in group '
342
f'"B Fusion Testing {self.id}" order by name desc',
343
)
344
out = list(self.cur)
345
desc = self.cur.description
346
assert len(desc) == 4
347
names = [x[0] for x in out]
348
assert names == ['show-ws-3', 'show-ws-2', 'show-ws-1']
349
350
# LIMIT clause
351
self.cur.execute(
352
'show workspaces in group '
353
f'"B Fusion Testing {self.id}" order by name desc limit 2',
354
)
355
out = list(self.cur)
356
desc = self.cur.description
357
assert len(desc) == 4
358
names = [x[0] for x in out]
359
assert names == ['show-ws-3', 'show-ws-2']
360
361
# All options
362
self.cur.execute(
363
f'show workspaces in group "B Fusion Testing {self.id}" '
364
'like "show-ws%" extended order by name desc limit 2',
365
)
366
out = list(self.cur)
367
desc = self.cur.description
368
assert len(desc) == 7
369
names = [x[0] for x in out]
370
assert names == ['show-ws-3', 'show-ws-2']
371
372
def test_create_drop_workspace(self):
373
mgr = s2.manage_workspaces()
374
wg = mgr.workspace_groups[f'A Fusion Testing {self.id}']
375
376
self.cur.execute(
377
f'create workspace foobar-1 in group "A Fusion Testing {self.id}" '
378
'with size S-00 wait on active',
379
)
380
foobar_1 = [x for x in wg.workspaces if x.name == 'foobar-1']
381
assert len(foobar_1) == 1
382
383
self.cur.execute(
384
f'create workspace foobar-2 in group "A Fusion Testing {self.id}" '
385
'with size S-00 wait on active',
386
)
387
foobar_2 = [x for x in wg.workspaces if x.name == 'foobar-2']
388
assert len(foobar_2) == 1
389
390
# Drop by name
391
self.cur.execute(
392
f'drop workspace "foobar-1" in group "A Fusion Testing {self.id}" '
393
'wait on terminated',
394
)
395
foobar_1 = [x for x in wg.workspaces if x.name == 'foobar-1']
396
assert len(foobar_1) == 0
397
398
# Drop by ID
399
foobar_2_id = foobar_2[0].id
400
self.cur.execute(
401
f'drop workspace id {foobar_2_id} in group '
402
f'"A Fusion Testing {self.id}" wait on terminated',
403
)
404
foobar_2 = [x for x in wg.workspaces if x.name == 'foobar-2']
405
assert len(foobar_2) == 0
406
407
# Drop non-existent by ID
408
with self.assertRaises(KeyError):
409
self.cur.execute(
410
f'drop workspace id {foobar_2_id} '
411
f'in group "A Fusion Testing {self.id}"',
412
)
413
414
# Drop non-existent by ID with IF EXISTS
415
self.cur.execute(
416
f'drop workspace IF EXISTS id {foobar_2_id} '
417
f'in group "A Fusion Testing {self.id}"',
418
)
419
420
def test_create_drop_workspace_group(self):
421
mgr = s2.manage_workspaces()
422
423
reg = [x for x in mgr.regions if x.name.startswith('US')][0]
424
wg_name = f'Create WG Test {id(self)}'
425
426
try:
427
self.cur.execute(
428
f'create workspace group "{wg_name}" '
429
f'in region "{reg.name}"',
430
)
431
wg = [x for x in mgr.workspace_groups if x.name == wg_name]
432
assert len(wg) == 1
433
434
# Drop it by name
435
self.cur.execute(
436
f'drop workspace group "{wg_name}" '
437
'wait on terminated',
438
)
439
wg = [x for x in mgr.workspace_groups if x.name == wg_name]
440
assert len(wg) == 0
441
442
# Create it again
443
self.cur.execute(
444
f'create workspace group "{wg_name}" in region "{reg.name}"',
445
)
446
wg = [x for x in mgr.workspace_groups if x.name == wg_name]
447
assert len(wg) == 1
448
449
# Drop it by ID
450
wg_id = wg[0].id
451
self.cur.execute(f'drop workspace group id {wg_id} wait on terminated')
452
wg = [x for x in mgr.workspace_groups if x.name == wg_name]
453
assert len(wg) == 0
454
455
# Drop non-existent
456
with self.assertRaises(KeyError):
457
self.cur.execute(f'drop workspace group id {wg_id}')
458
459
# Drop non-existent with IF EXISTS
460
self.cur.execute(f'drop workspace group if exists id {wg_id}')
461
462
finally:
463
try:
464
mgr.workspace_groups[wg_name].terminate(force=True)
465
except Exception:
466
pass
467
468
469
@pytest.mark.management
470
class TestJobsFusion(unittest.TestCase):
471
472
id: str = secrets.token_hex(8)
473
notebook_name: str = 'Scheduling Test.ipynb'
474
dbname: str = ''
475
dbexisted: bool = False
476
manager: None
477
workspace_group: None
478
workspace: None
479
job_ids = []
480
481
@classmethod
482
def setUpClass(cls):
483
sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')
484
cls.dbname, cls.dbexisted = utils.load_sql(sql_file)
485
cls.manager = s2.manage_workspaces()
486
us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]
487
cls.workspace_group = cls.manager.create_workspace_group(
488
f'Jobs Fusion Testing {cls.id}',
489
region=random.choice(us_regions),
490
firewall_ranges=[],
491
)
492
cls.workspace = cls.workspace_group.create_workspace(
493
f'jobs-test-{cls.id}',
494
wait_on_active=True,
495
)
496
os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = cls.dbname
497
os.environ['SINGLESTOREDB_WORKSPACE'] = cls.workspace.id
498
499
@classmethod
500
def tearDownClass(cls):
501
for job_id in cls.job_ids:
502
try:
503
cls.manager.organizations.current.jobs.delete(job_id)
504
except Exception:
505
pass
506
if cls.workspace_group is not None:
507
cls.workspace_group.terminate(force=True)
508
cls.manager = None
509
cls.workspace_group = None
510
cls.workspace = None
511
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
512
del os.environ['SINGLESTOREDB_WORKSPACE']
513
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
514
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
515
516
def setUp(self):
517
self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')
518
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'
519
self.conn = s2.connect(database=type(self).dbname, local_infile=True)
520
self.cur = self.conn.cursor()
521
522
def tearDown(self):
523
if self.enabled:
524
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled
525
else:
526
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
527
528
try:
529
if self.cur is not None:
530
self.cur.close()
531
except Exception:
532
# traceback.print_exc()
533
pass
534
535
try:
536
if self.conn is not None:
537
self.conn.close()
538
except Exception:
539
# traceback.print_exc()
540
pass
541
542
def test_schedule_drop_job(self):
543
# schedule recurring job
544
self.cur.execute(
545
f'schedule job using notebook "{self.notebook_name}" '
546
'with mode "recurring" '
547
'execute every 5 minutes '
548
'with name "recurring-job" '
549
'create snapshot '
550
'resume target '
551
'with runtime "notebooks-cpu-small" '
552
'with parameters '
553
'{"strParam": "string", "intParam": 1, '
554
'"floatParam": 1.0, "boolParam": true}',
555
)
556
out = list(self.cur)
557
job_id = out[0][0]
558
self.job_ids.append(job_id)
559
desc = self.cur.description
560
assert len(desc) == 1
561
assert desc[0][0] == 'JobID'
562
assert len(out) == 1
563
assert out[0][0] == job_id
564
565
# drop job
566
self.cur.execute(f'drop jobs {job_id}')
567
out = list(self.cur)
568
desc = self.cur.description
569
assert len(desc) == 2
570
assert [x[0] for x in desc] == [
571
'JobID', 'Success',
572
]
573
assert len(out) == 1
574
res = out[0]
575
assert res[0] == job_id
576
assert res[1] == 1
577
578
def test_run_wait_drop_job(self):
579
# run job
580
self.cur.execute(
581
f'run job using notebook "{self.notebook_name}" '
582
'with runtime "notebooks-cpu-small" '
583
'with parameters '
584
'{"strParam": "string", "intParam": 1, '
585
'"floatParam": 1.0, "boolParam": true}',
586
)
587
out = list(self.cur)
588
job_id = out[0][0]
589
self.job_ids.append(job_id)
590
desc = self.cur.description
591
assert len(desc) == 1
592
assert desc[0][0] == 'JobID'
593
assert len(out) == 1
594
assert out[0][0] == job_id
595
596
# wait on job
597
self.cur.execute(f'wait on jobs {job_id}')
598
out = list(self.cur)
599
desc = self.cur.description
600
assert len(desc) == 1
601
assert desc[0][0] == 'Success'
602
assert out[0][0] == 1
603
604
# drop job
605
self.cur.execute(f'drop jobs {job_id}')
606
out = list(self.cur)
607
desc = self.cur.description
608
assert len(desc) == 2
609
assert [x[0] for x in desc] == [
610
'JobID', 'Success',
611
]
612
assert len(out) == 1
613
res = out[0]
614
assert res[0] == job_id
615
assert res[1] == 1
616
617
def test_show_jobs_and_executions(self):
618
# schedule recurring job
619
self.cur.execute(
620
f'schedule job using notebook "{self.notebook_name}" '
621
'with mode "recurring" '
622
'execute every 5 minutes '
623
'with name "show-job" '
624
'with runtime "notebooks-cpu-small" '
625
'with parameters '
626
'{"strParam": "string", "intParam": 1, '
627
'"floatParam": 1.0, "boolParam": true}',
628
)
629
out = list(self.cur)
630
job_id = out[0][0]
631
self.job_ids.append(job_id)
632
desc = self.cur.description
633
assert len(desc) == 1
634
assert desc[0][0] == 'JobID'
635
assert len(out) == 1
636
assert out[0][0] == job_id
637
638
# show jobs with name like "show-job"
639
self.cur.execute(f'show jobs {job_id} like "show-job"')
640
out = list(self.cur)
641
desc = self.cur.description
642
assert len(desc) == 9
643
assert [x[0] for x in desc] == [
644
'JobID', 'Name', 'CreatedAt', 'EnqueuedBy',
645
'CompletedExecutions', 'NotebookPath', 'DatabaseName', 'TargetID',
646
'TargetType',
647
]
648
assert len(out) == 1
649
job = out[0]
650
assert job[0] == job_id
651
assert job[1] == 'show-job'
652
assert job[5] == self.notebook_name
653
assert job[6] == self.dbname
654
assert job[7] == self.workspace.id
655
assert job[8] == 'Workspace'
656
657
# show jobs with name like "show-job" extended
658
self.cur.execute(f'show jobs {job_id} like "show-job" extended')
659
out = list(self.cur)
660
desc = self.cur.description
661
assert len(desc) == 17
662
assert [x[0] for x in desc] == [
663
'JobID', 'Name', 'CreatedAt', 'EnqueuedBy',
664
'CompletedExecutions', 'NotebookPath', 'DatabaseName', 'TargetID',
665
'TargetType', 'Description', 'TerminatedAt', 'CreateSnapshot',
666
'MaxDurationInMins', 'ExecutionIntervalInMins', 'Mode', 'StartAt',
667
'ResumeTarget',
668
]
669
assert len(out) == 1
670
job = out[0]
671
assert job[0] == job_id
672
assert job[1] == 'show-job'
673
assert job[5] == self.notebook_name
674
assert job[6] == self.dbname
675
assert job[7] == self.workspace.id
676
assert job[8] == 'Workspace'
677
assert not job[11]
678
assert job[13] == 5
679
assert job[14] == 'Recurring'
680
assert not job[16]
681
682
# show executions for job with id job_id from 1 to 5
683
self.cur.execute(f'show job executions for {job_id} from 1 to 5')
684
out = list(self.cur)
685
desc = self.cur.description
686
assert len(desc) == 7
687
assert [x[0] for x in desc] == [
688
'ExecutionID', 'ExecutionNumber', 'JobID',
689
'Status', 'ScheduledStartTime', 'StartedAt', 'FinishedAt',
690
]
691
exec_job_ids = [x[2] for x in out]
692
for x in exec_job_ids:
693
assert x == job_id
694
695
# show executions for job with id job_id from 1 to 5 extended
696
self.cur.execute(f'show job executions for {job_id} from 1 to 5 extended')
697
out = list(self.cur)
698
desc = self.cur.description
699
assert len(desc) == 8
700
assert [x[0] for x in desc] == [
701
'ExecutionID', 'ExecutionNumber', 'JobID',
702
'Status', 'ScheduledStartTime', 'StartedAt', 'FinishedAt',
703
'SnapshotNotebookPath',
704
]
705
exec_job_ids = [x[2] for x in out]
706
for x in exec_job_ids:
707
assert x == job_id
708
709
# drop job
710
self.cur.execute(f'drop jobs {job_id}')
711
out = list(self.cur)
712
desc = self.cur.description
713
assert len(desc) == 2
714
assert [x[0] for x in desc] == [
715
'JobID', 'Success',
716
]
717
assert len(out) == 1
718
res = out[0]
719
assert res[0] == job_id
720
assert res[1] == 1
721
722
723
@pytest.mark.management
724
class TestStageFusion(unittest.TestCase):
725
726
id: str = secrets.token_hex(8)
727
dbname: str = 'information_schema'
728
manager: None
729
workspace_group: None
730
workspace_group_2: None
731
732
@classmethod
733
def setUpClass(cls):
734
cls.manager = s2.manage_workspaces()
735
us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]
736
cls.workspace_group = cls.manager.create_workspace_group(
737
f'Stage Fusion Testing 1 {cls.id}',
738
region=random.choice(us_regions),
739
firewall_ranges=[],
740
)
741
cls.workspace_group_2 = cls.manager.create_workspace_group(
742
f'Stage Fusion Testing 2 {cls.id}',
743
region=random.choice(us_regions),
744
firewall_ranges=[],
745
)
746
# Wait for both workspace groups to start
747
time.sleep(5)
748
749
os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'
750
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'] = cls.workspace_group.id
751
752
@classmethod
753
def tearDownClass(cls):
754
if cls.workspace_group is not None:
755
cls.workspace_group.terminate(force=True)
756
if cls.workspace_group_2 is not None:
757
cls.workspace_group_2.terminate(force=True)
758
cls.manager = None
759
cls.workspace_group = None
760
cls.workspace_group_2 = None
761
cls.workspace = None
762
cls.workspace_2 = None
763
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
764
del os.environ['SINGLESTOREDB_WORKSPACE']
765
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP', None) is not None:
766
del os.environ['SINGLESTOREDB_WORKSPACE_GROUP']
767
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
768
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
769
770
def setUp(self):
771
self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')
772
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'
773
self.conn = s2.connect(database=type(self).dbname, local_infile=True)
774
self.cur = self.conn.cursor()
775
776
def tearDown(self):
777
self._clear_stage()
778
779
if self.enabled:
780
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled
781
else:
782
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
783
784
try:
785
if self.cur is not None:
786
self.cur.close()
787
except Exception:
788
# traceback.print_exc()
789
pass
790
791
try:
792
if self.conn is not None:
793
self.conn.close()
794
except Exception:
795
# traceback.print_exc()
796
pass
797
798
def _clear_stage(self):
799
if self.workspace_group is not None:
800
self.cur.execute(f'''
801
show stage files
802
in group id '{self.workspace_group.id}' recursive
803
''')
804
files = list(self.cur)
805
folders = []
806
for file in files:
807
if file[0].endswith('/'):
808
folders.append(file)
809
continue
810
self.cur.execute(f'''
811
drop stage file '{file[0]}'
812
in group id '{self.workspace_group.id}'
813
''')
814
for folder in folders:
815
self.cur.execute(f'''
816
drop stage folder '{folder[0]}'
817
in group id '{self.workspace_group.id}'
818
''')
819
820
if self.workspace_group_2 is not None:
821
self.cur.execute(f'''
822
show stage files
823
in group id '{self.workspace_group_2.id}' recursive
824
''')
825
files = list(self.cur)
826
folders = []
827
for file in files:
828
if file[0].endswith('/'):
829
folders.append(file)
830
continue
831
self.cur.execute(f'''
832
drop stage file '{file[0]}'
833
in group id '{self.workspace_group_2.id}'
834
''')
835
for folder in folders:
836
self.cur.execute(f'''
837
drop stage folder '{folder[0]}'
838
in group id '{self.workspace_group_2.id}'
839
''')
840
841
def test_show_stage(self):
842
test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')
843
844
# Should be empty
845
self.cur.execute('''
846
show stage files
847
''')
848
files = list(self.cur)
849
assert len(files) == 0
850
851
# Copy files to stage
852
self.cur.execute(
853
f'upload file to stage "new_test_1.sql" from "{test2_sql}"',
854
)
855
self.cur.execute('create stage folder "subdir1"')
856
self.cur.execute(
857
f'upload file to stage "subdir1/new_test_2.sql" from "{test2_sql}"',
858
)
859
self.cur.execute(
860
f'upload file to stage "subdir1/new_test_3.sql" from "{test2_sql}"',
861
)
862
self.cur.execute('create stage folder "subdir2"')
863
self.cur.execute(
864
f'upload file to stage "subdir2/new_test_4.sql" from "{test2_sql}"',
865
)
866
self.cur.execute(
867
f'upload file to stage "subdir2/new_test_5.sql" from "{test2_sql}"',
868
)
869
870
# Make sure files are there
871
self.cur.execute('''
872
show stage files recursive
873
''')
874
files = list(self.cur)
875
assert len(files) == 7
876
assert list(sorted(x[0] for x in files)) == [
877
'new_test_1.sql',
878
'subdir1/',
879
'subdir1/new_test_2.sql',
880
'subdir1/new_test_3.sql',
881
'subdir2/',
882
'subdir2/new_test_4.sql',
883
'subdir2/new_test_5.sql',
884
]
885
886
# Do non-recursive listing
887
self.cur.execute('''
888
show stage files
889
''')
890
files = list(self.cur)
891
assert len(files) == 3
892
assert list(sorted(x[0] for x in files)) == [
893
'new_test_1.sql',
894
'subdir1/',
895
'subdir2/',
896
]
897
898
# List files in specific workspace group
899
self.cur.execute(f'''
900
show stage files in group id '{self.workspace_group.id}'
901
''')
902
files = list(self.cur)
903
assert len(files) == 3
904
assert list(sorted(x[0] for x in files)) == [
905
'new_test_1.sql',
906
'subdir1/',
907
'subdir2/',
908
]
909
910
self.cur.execute(f'''
911
show stage files in id '{self.workspace_group.id}'
912
''')
913
files = list(self.cur)
914
assert len(files) == 3
915
assert list(sorted(x[0] for x in files)) == [
916
'new_test_1.sql',
917
'subdir1/',
918
'subdir2/',
919
]
920
921
self.cur.execute(f'''
922
show stage files in group '{self.workspace_group.name}'
923
''')
924
files = list(self.cur)
925
assert len(files) == 3
926
assert list(sorted(x[0] for x in files)) == [
927
'new_test_1.sql',
928
'subdir1/',
929
'subdir2/',
930
]
931
932
self.cur.execute(f'''
933
show stage files in '{self.workspace_group.name}'
934
''')
935
files = list(self.cur)
936
assert len(files) == 3
937
assert list(sorted(x[0] for x in files)) == [
938
'new_test_1.sql',
939
'subdir1/',
940
'subdir2/',
941
]
942
943
# Check other workspace group
944
self.cur.execute(f'''
945
show stage files in group '{self.workspace_group_2.name}'
946
''')
947
files = list(self.cur)
948
assert len(files) == 0
949
950
# Limit results
951
self.cur.execute('''
952
show stage files recursive limit 5
953
''')
954
files = list(self.cur)
955
assert len(files) == 5
956
assert list(sorted(x[0] for x in files)) == [
957
'new_test_1.sql',
958
'subdir1/',
959
'subdir1/new_test_2.sql',
960
'subdir1/new_test_3.sql',
961
'subdir2/',
962
]
963
964
# Order by type and name
965
self.cur.execute('''
966
show stage files order by type, name recursive extended
967
''')
968
files = list(self.cur)
969
assert len(files) == 7
970
assert list(x[0] for x in files) == [
971
'subdir1/',
972
'subdir2/',
973
'new_test_1.sql',
974
'subdir1/new_test_2.sql',
975
'subdir1/new_test_3.sql',
976
'subdir2/new_test_4.sql',
977
'subdir2/new_test_5.sql',
978
]
979
980
# Order by type and name descending
981
self.cur.execute('''
982
show stage files order by type desc, name desc recursive extended
983
''')
984
files = list(self.cur)
985
assert len(files) == 7
986
assert list(x[0] for x in files) == [
987
'subdir2/new_test_5.sql',
988
'subdir2/new_test_4.sql',
989
'subdir1/new_test_3.sql',
990
'subdir1/new_test_2.sql',
991
'new_test_1.sql',
992
'subdir2/',
993
'subdir1/',
994
]
995
996
# List at specific path
997
self.cur.execute('''
998
show stage files at 'subdir2/' recursive
999
''')
1000
files = list(self.cur)
1001
assert len(files) == 2
1002
assert list(sorted(x[0] for x in files)) == [
1003
'new_test_4.sql',
1004
'new_test_5.sql',
1005
]
1006
1007
# LIKE clause
1008
self.cur.execute('''
1009
show stage files like '%_4.%' recursive
1010
''')
1011
files = list(self.cur)
1012
assert len(files) == 1
1013
assert list(sorted(x[0] for x in files)) == [
1014
'subdir2/new_test_4.sql',
1015
]
1016
1017
def test_download_stage(self):
1018
test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')
1019
1020
# Should be empty
1021
self.cur.execute('''
1022
show stage files
1023
''')
1024
files = list(self.cur)
1025
assert len(files) == 0
1026
1027
# Copy file to stage 1
1028
self.cur.execute(f'''
1029
upload file to stage 'dl_test.sql' from '{test2_sql}'
1030
''')
1031
1032
self.cur.execute('''
1033
show stage files
1034
''')
1035
files = list(self.cur)
1036
assert len(files) == 1
1037
assert list(sorted(x[0] for x in files)) == ['dl_test.sql']
1038
1039
# Copy file to stage 2
1040
self.cur.execute(f'''
1041
upload file to stage 'dl_test2.sql'
1042
in group '{self.workspace_group_2.name}'
1043
from '{test2_sql}'
1044
''')
1045
1046
# Make sure only one file in stage 2
1047
self.cur.execute(f'''
1048
show stage files in group '{self.workspace_group_2.name}'
1049
''')
1050
files = list(self.cur)
1051
assert len(files) == 1
1052
assert list(sorted(x[0] for x in files)) == ['dl_test2.sql']
1053
1054
# Download file from stage 1
1055
with tempfile.TemporaryDirectory() as tmpdir:
1056
self.cur.execute(f'''
1057
download stage file 'dl_test.sql' to '{tmpdir}/dl_test.sql'
1058
''')
1059
with open(os.path.join(tmpdir, 'dl_test.sql'), 'r') as dl_file:
1060
assert dl_file.read() == open(test2_sql, 'r').read()
1061
1062
# Download file from stage 2
1063
with tempfile.TemporaryDirectory() as tmpdir:
1064
self.cur.execute(f'''
1065
download stage file 'dl_test2.sql'
1066
in group '{self.workspace_group_2.name}'
1067
to '{tmpdir}/dl_test2.sql'
1068
''')
1069
with open(os.path.join(tmpdir, 'dl_test2.sql'), 'r') as dl_file:
1070
assert dl_file.read() == open(test2_sql, 'r').read()
1071
1072
def test_stage_multi_wg_operations(self):
1073
test_sql = os.path.join(os.path.dirname(__file__), 'test.sql')
1074
test2_sql = os.path.join(os.path.dirname(__file__), 'test2.sql')
1075
1076
# Should be empty
1077
self.cur.execute('''
1078
show stage files
1079
''')
1080
files = list(self.cur)
1081
assert len(files) == 0
1082
1083
# Copy file to stage 1
1084
self.cur.execute(f'''
1085
upload file to stage 'new_test.sql' from '{test_sql}'
1086
''')
1087
1088
self.cur.execute('''
1089
show stage files
1090
''')
1091
files = list(self.cur)
1092
assert len(files) == 1
1093
1094
# Copy file to stage 2
1095
self.cur.execute(f'''
1096
upload file to stage 'new_test2.sql'
1097
in group '{self.workspace_group_2.name}'
1098
from '{test2_sql}'
1099
''')
1100
1101
# Make sure only one file in stage 1
1102
self.cur.execute('''
1103
show stage files
1104
''')
1105
files = list(self.cur)
1106
assert len(files) == 1
1107
assert files[0][0] == 'new_test.sql'
1108
1109
# Make sure only one file in stage 2
1110
self.cur.execute(f'''
1111
show stage files in group '{self.workspace_group_2.name}' recursive
1112
''')
1113
files = list(self.cur)
1114
assert len(files) == 1
1115
assert list(sorted(x[0] for x in files)) == ['new_test2.sql']
1116
1117
# Make sure only one file in stage 2 (using IN)
1118
self.cur.execute(f'''
1119
show stage files in '{self.workspace_group_2.name}' recursive
1120
''')
1121
files = list(self.cur)
1122
assert len(files) == 1
1123
assert list(sorted(x[0] for x in files)) == ['new_test2.sql']
1124
1125
# Make subdir
1126
self.cur.execute(f'''
1127
create stage folder 'data' in group '{self.workspace_group_2.name}'
1128
''')
1129
1130
# Upload file using workspace ID
1131
self.cur.execute(f'''
1132
upload file to stage 'data/new_test2_sub.sql'
1133
in group id '{self.workspace_group_2.id}'
1134
from '{test2_sql}'
1135
''')
1136
1137
# Make sure only one file in stage 1
1138
self.cur.execute('''
1139
show stage files
1140
''')
1141
files = list(self.cur)
1142
assert len(files) == 1
1143
assert files[0][0] == 'new_test.sql'
1144
1145
# Make sure two files in stage 2
1146
self.cur.execute(f'''
1147
show stage files in group id '{self.workspace_group_2.id}' recursive
1148
''')
1149
files = list(self.cur)
1150
assert len(files) == 3
1151
assert list(sorted(x[0] for x in files)) == \
1152
['data/', 'data/new_test2_sub.sql', 'new_test2.sql']
1153
1154
# Test overwrite
1155
with self.assertRaises(OSError):
1156
self.cur.execute(f'''
1157
upload file to stage 'data/new_test2_sub.sql'
1158
in group id '{self.workspace_group_2.id}'
1159
from '{test2_sql}'
1160
''')
1161
1162
self.cur.execute(f'''
1163
upload file to stage 'data/new_test2_sub.sql'
1164
in group id '{self.workspace_group_2.id}'
1165
from '{test2_sql}' overwrite
1166
''')
1167
1168
# Make sure two files in stage 2
1169
self.cur.execute(f'''
1170
show stage files in group id '{self.workspace_group_2.id}' recursive
1171
''')
1172
files = list(self.cur)
1173
assert len(files) == 3
1174
assert list(sorted(x[0] for x in files)) == \
1175
['data/', 'data/new_test2_sub.sql', 'new_test2.sql']
1176
1177
# Test LIKE clause
1178
self.cur.execute(f'''
1179
show stage files
1180
in group id '{self.workspace_group_2.id}'
1181
like '%_sub%' recursive
1182
''')
1183
files = list(self.cur)
1184
assert len(files) == 1
1185
assert list(sorted(x[0] for x in files)) == ['data/new_test2_sub.sql']
1186
1187
# Drop file from default stage
1188
self.cur.execute('''
1189
drop stage file 'new_test.sql'
1190
''')
1191
1192
# Make sure no files in stage 1
1193
self.cur.execute('''
1194
show stage files
1195
''')
1196
files = list(self.cur)
1197
assert len(files) == 0
1198
1199
# Make sure two files in stage 2
1200
self.cur.execute(f'''
1201
show stage files in group id '{self.workspace_group_2.id}' recursive
1202
''')
1203
files = list(self.cur)
1204
assert len(files) == 3
1205
assert list(sorted(x[0] for x in files)) == \
1206
['data/', 'data/new_test2_sub.sql', 'new_test2.sql']
1207
1208
# Attempt to drop directory from stage 2
1209
with self.assertRaises(OSError):
1210
self.cur.execute(f'''
1211
drop stage folder 'data'
1212
in group id '{self.workspace_group_2.id}'
1213
''')
1214
1215
self.cur.execute(f'''
1216
drop stage file 'data/new_test2_sub.sql'
1217
in group id '{self.workspace_group_2.id}'
1218
''')
1219
1220
# Make sure one file and one directory in stage 2
1221
self.cur.execute(f'''
1222
show stage files in group id '{self.workspace_group_2.id}' recursive
1223
''')
1224
files = list(self.cur)
1225
assert len(files) == 2
1226
assert list(sorted(x[0] for x in files)) == ['data/', 'new_test2.sql']
1227
1228
# Drop stage folder from stage 2
1229
self.cur.execute(f'''
1230
drop stage folder 'data'
1231
in group id '{self.workspace_group_2.id}'
1232
''')
1233
1234
# Make sure one file in stage 2
1235
self.cur.execute(f'''
1236
show stage files in group id '{self.workspace_group_2.id}' recursive
1237
''')
1238
files = list(self.cur)
1239
assert len(files) == 1
1240
assert list(sorted(x[0] for x in files)) == ['new_test2.sql']
1241
1242
# Drop last file
1243
self.cur.execute(f'''
1244
drop stage file 'new_test2.sql'
1245
in group id '{self.workspace_group_2.id}'
1246
''')
1247
1248
# Make sure no files in stage 2
1249
self.cur.execute(f'''
1250
show stage files in group id '{self.workspace_group_2.id}' recursive
1251
''')
1252
files = list(self.cur)
1253
assert len(files) == 0
1254
1255
1256
@pytest.mark.management
1257
class TestFilesFusion(unittest.TestCase):
1258
1259
id: str = secrets.token_hex(8)
1260
dbname: str = 'information_schema'
1261
manager: None
1262
workspace_group: None
1263
1264
@classmethod
1265
def setUpClass(cls):
1266
cls.manager = s2.manage_workspaces()
1267
us_regions = [x for x in cls.manager.regions if x.name.startswith('US')]
1268
cls.workspace_group = cls.manager.create_workspace_group(
1269
f'Files Fusion Testing {cls.id}',
1270
region=random.choice(us_regions),
1271
firewall_ranges=[],
1272
)
1273
# Wait for both workspace groups to start
1274
time.sleep(5)
1275
1276
os.environ['SINGLESTOREDB_DEFAULT_DATABASE'] = 'information_schema'
1277
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'] = cls.workspace_group.id
1278
1279
@classmethod
1280
def tearDownClass(cls):
1281
if cls.workspace_group is not None:
1282
cls.workspace_group.terminate(force=True)
1283
cls.manager = None
1284
cls.workspace_group = None
1285
cls.workspace = None
1286
if os.environ.get('SINGLESTOREDB_WORKSPACE', None) is not None:
1287
del os.environ['SINGLESTOREDB_WORKSPACE']
1288
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP', None) is not None:
1289
del os.environ['SINGLESTOREDB_WORKSPACE_GROUP']
1290
if os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE', None) is not None:
1291
del os.environ['SINGLESTOREDB_DEFAULT_DATABASE']
1292
1293
def setUp(self):
1294
self.enabled = os.environ.get('SINGLESTOREDB_FUSION_ENABLED')
1295
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = '1'
1296
self.conn = s2.connect(database=type(self).dbname, local_infile=True)
1297
self.cur = self.conn.cursor()
1298
1299
def tearDown(self):
1300
self._clear_files()
1301
1302
if self.enabled:
1303
os.environ['SINGLESTOREDB_FUSION_ENABLED'] = self.enabled
1304
else:
1305
del os.environ['SINGLESTOREDB_FUSION_ENABLED']
1306
1307
try:
1308
if self.cur is not None:
1309
self.cur.close()
1310
except Exception:
1311
# traceback.print_exc()
1312
pass
1313
1314
try:
1315
if self.conn is not None:
1316
self.conn.close()
1317
except Exception:
1318
# traceback.print_exc()
1319
pass
1320
1321
def _clear_files(self):
1322
cls = type(self)
1323
for prefix in ['show', 'dl', 'drop']:
1324
for i in range(1, 6):
1325
try:
1326
self.cur.execute(
1327
f'''drop personal file "{prefix}_test_{i}_{cls.id}.ipynb"''',
1328
)
1329
except (OSError, s2.ManagementError):
1330
pass
1331
for i in range(1, 6):
1332
try:
1333
self.cur.execute(
1334
f'''drop shared file "{prefix}_test_{i}_{cls.id}.ipynb"''',
1335
)
1336
except (OSError, s2.ManagementError):
1337
pass
1338
1339
def test_show_personal_files(self):
1340
return self._test_show_files('personal')
1341
1342
def test_show_shared_files(self):
1343
return self._test_show_files('shared')
1344
1345
def _test_show_files(self, ftype):
1346
cls = type(self)
1347
nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')
1348
1349
# Should be empty
1350
self.cur.execute(f'''
1351
show {ftype} files like 'show_%{cls.id}%'
1352
''')
1353
files = list(self.cur)
1354
assert len(files) == 0
1355
1356
# Upload files
1357
self.cur.execute(
1358
f'upload {ftype} file to "show_test_1_{cls.id}.ipynb" from "{nb}"',
1359
)
1360
self.cur.execute(
1361
f'upload {ftype} file to "show_test_2_{cls.id}.ipynb" from "{nb}"',
1362
)
1363
self.cur.execute(
1364
f'upload {ftype} file to "show_test_3_{cls.id}.ipynb" from "{nb}"',
1365
)
1366
self.cur.execute(
1367
f'upload {ftype} file to "show_test_4_{cls.id}.ipynb" from "{nb}"',
1368
)
1369
self.cur.execute(
1370
f'upload {ftype} file to "show_test_5_{cls.id}.ipynb" from "{nb}"',
1371
)
1372
1373
# Make sure files are there
1374
self.cur.execute(f'''
1375
show {ftype} files like 'show_%{cls.id}%'
1376
''')
1377
files = list(self.cur)
1378
assert len(files) == 5
1379
assert list(sorted(x[0] for x in files)) == [
1380
f'show_test_1_{cls.id}.ipynb',
1381
f'show_test_2_{cls.id}.ipynb',
1382
f'show_test_3_{cls.id}.ipynb',
1383
f'show_test_4_{cls.id}.ipynb',
1384
f'show_test_5_{cls.id}.ipynb',
1385
]
1386
1387
# Test ORDER BY
1388
self.cur.execute(f'''
1389
show {ftype} files like 'show_%{cls.id}%' order by name desc
1390
''')
1391
files = list(self.cur)
1392
assert len(files) == 5
1393
assert list(x[0] for x in files) == [
1394
f'show_test_5_{cls.id}.ipynb',
1395
f'show_test_4_{cls.id}.ipynb',
1396
f'show_test_3_{cls.id}.ipynb',
1397
f'show_test_2_{cls.id}.ipynb',
1398
f'show_test_1_{cls.id}.ipynb',
1399
]
1400
1401
# Test LIMIT
1402
self.cur.execute(f'''
1403
show {ftype} files like 'show_%{cls.id}%' order by name desc limit 3
1404
''')
1405
files = list(self.cur)
1406
assert len(files) == 3
1407
assert list(x[0] for x in files) == [
1408
f'show_test_5_{cls.id}.ipynb',
1409
f'show_test_4_{cls.id}.ipynb',
1410
f'show_test_3_{cls.id}.ipynb',
1411
]
1412
1413
# Test EXTENDED
1414
self.cur.execute(f'''
1415
show {ftype} files like 'show_%{cls.id}%' extended
1416
''')
1417
assert [x[0] for x in self.cur.description] == \
1418
['Name', 'Type', 'Size', 'Writable', 'CreatedAt', 'LastModifiedAt']
1419
1420
def test_download_personal_files(self):
1421
return self._test_download_files('personal')
1422
1423
def test_download_shared_files(self):
1424
return self._test_download_files('shared')
1425
1426
def _test_download_files(self, ftype):
1427
cls = type(self)
1428
nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')
1429
1430
# Should be empty
1431
self.cur.execute(f'''
1432
show {ftype} files like 'dl_%{cls.id}%'
1433
''')
1434
files = list(self.cur)
1435
assert len(files) == 0
1436
1437
# Upload files
1438
self.cur.execute(f'upload {ftype} file to "dl_test_1_{cls.id}.ipynb" from "{nb}"')
1439
self.cur.execute(f'upload {ftype} file to "dl_test_2_{cls.id}.ipynb" from "{nb}"')
1440
1441
# Make sure files are there
1442
self.cur.execute(f'''
1443
show {ftype} files like 'dl_%{cls.id}%'
1444
''')
1445
files = list(self.cur)
1446
assert len(files) == 2
1447
assert list(sorted(x[0] for x in files)) == [
1448
f'dl_test_1_{cls.id}.ipynb',
1449
f'dl_test_2_{cls.id}.ipynb',
1450
]
1451
1452
# Download files
1453
with tempfile.TemporaryDirectory() as tmpdir:
1454
self.cur.execute(f'''
1455
download {ftype} file 'dl_test_1_{cls.id}.ipynb'
1456
to '{tmpdir}/dl_test_1.ipynb'
1457
''')
1458
with open(os.path.join(tmpdir, 'dl_test_1.ipynb'), 'r') as dl_file:
1459
assert dl_file.read() == open(nb, 'r').read()
1460
1461
self.cur.execute(f'''
1462
download {ftype} file 'dl_test_2_{cls.id}.ipynb'
1463
to '{tmpdir}/dl_test_2.ipynb'
1464
''')
1465
with open(os.path.join(tmpdir, 'dl_test_2.ipynb'), 'r') as dl_file:
1466
assert dl_file.read() == open(nb, 'r').read()
1467
1468
def test_drop_personal_files(self):
1469
return self._test_drop_files('personal')
1470
1471
def test_drop_shared_files(self):
1472
return self._test_drop_files('shared')
1473
1474
def _test_drop_files(self, ftype):
1475
cls = type(self)
1476
nb = os.path.join(os.path.dirname(__file__), 'test.ipynb')
1477
1478
# Should be empty
1479
self.cur.execute(f'''
1480
show {ftype} files like 'drop_%{cls.id}%'
1481
''')
1482
files = list(self.cur)
1483
assert len(files) == 0
1484
1485
# Upload files
1486
self.cur.execute(
1487
f'upload {ftype} file to "drop_test_1_{cls.id}.ipynb" from "{nb}"',
1488
)
1489
self.cur.execute(
1490
f'upload {ftype} file to "drop_test_2_{cls.id}.ipynb" from "{nb}"',
1491
)
1492
1493
# Make sure files are there
1494
self.cur.execute(f'''
1495
show {ftype} files like 'drop_%{cls.id}%'
1496
''')
1497
files = list(self.cur)
1498
assert len(files) == 2
1499
assert list(sorted(x[0] for x in files)) == [
1500
f'drop_test_1_{cls.id}.ipynb',
1501
f'drop_test_2_{cls.id}.ipynb',
1502
]
1503
1504
# Drop 1 file
1505
self.cur.execute(f'''
1506
drop {ftype} file 'drop_test_1_{cls.id}.ipynb'
1507
''')
1508
1509
# Make sure 1 file is there
1510
self.cur.execute(f'''
1511
show {ftype} files like 'drop_%{cls.id}%'
1512
''')
1513
files = list(self.cur)
1514
assert len(files) == 1
1515
assert list(x[0] for x in files) == [f'drop_test_2_{cls.id}.ipynb']
1516
1517
# Drop 2nd file
1518
self.cur.execute(f'''
1519
drop {ftype} file 'drop_test_2_{cls.id}.ipynb'
1520
''')
1521
1522
# Make sure no files are there
1523
self.cur.execute(f'''
1524
show {ftype} files like 'drop_%{cls.id}%'
1525
''')
1526
files = list(self.cur)
1527
assert len(files) == 0
1528
1529