Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/fusion/handlers/utils.py
801 views
1
#!/usr/bin/env python
2
import datetime
3
import os
4
from typing import Any
5
from typing import Dict
6
from typing import Optional
7
from typing import Union
8
9
from ...exceptions import ManagementError
10
from ...management import files as mgmt_files
11
from ...management import manage_workspaces
12
from ...management.files import FilesManager
13
from ...management.files import FileSpace
14
from ...management.files import manage_files
15
from ...management.inference_api import InferenceAPIInfo
16
from ...management.inference_api import InferenceAPIManager
17
from ...management.workspace import StarterWorkspace
18
from ...management.workspace import Workspace
19
from ...management.workspace import WorkspaceGroup
20
from ...management.workspace import WorkspaceManager
21
22
23
def get_workspace_manager() -> WorkspaceManager:
24
"""Return a new workspace manager."""
25
return manage_workspaces()
26
27
28
def get_files_manager() -> FilesManager:
29
"""Return a new files manager."""
30
return manage_files()
31
32
33
def dt_isoformat(dt: Optional[datetime.datetime]) -> Optional[str]:
34
"""Convert datetime to string."""
35
if dt is None:
36
return None
37
return dt.isoformat()
38
39
40
def get_workspace_group(params: Dict[str, Any]) -> WorkspaceGroup:
41
"""
42
Find a workspace group matching group_id or group_name.
43
44
This function will get a workspace group or ID from the
45
following parameters:
46
47
* params['group_name']
48
* params['group_id']
49
* params['group']['group_name']
50
* params['group']['group_id']
51
* params['in_group']['group_name']
52
* params['in_group']['group_id']
53
54
Or, from the SINGLESTOREDB_WORKSPACE_GROUP environment variable.
55
56
"""
57
manager = get_workspace_manager()
58
59
group_name = params.get('group_name') or \
60
(params.get('in_group') or {}).get('group_name') or \
61
(params.get('group') or {}).get('group_name')
62
if group_name:
63
workspace_groups = [
64
x for x in manager.workspace_groups
65
if x.name == group_name
66
]
67
68
if not workspace_groups:
69
raise KeyError(
70
f'no workspace group found with name: {group_name}',
71
)
72
73
if len(workspace_groups) > 1:
74
ids = ', '.join(x.id for x in workspace_groups)
75
raise ValueError(
76
f'more than one workspace group with given name was found: {ids}',
77
)
78
79
return workspace_groups[0]
80
81
group_id = params.get('group_id') or \
82
(params.get('in_group') or {}).get('group_id') or \
83
(params.get('group') or {}).get('group_id')
84
if group_id:
85
try:
86
return manager.get_workspace_group(group_id)
87
except ManagementError as exc:
88
if exc.errno == 404:
89
raise KeyError(f'no workspace group found with ID: {group_id}')
90
raise
91
92
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
93
try:
94
return manager.get_workspace_group(
95
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
96
)
97
except ManagementError as exc:
98
if exc.errno == 404:
99
raise KeyError(
100
'no workspace found with ID: '
101
f'{os.environ["SINGLESTOREDB_WORKSPACE_GROUP"]}',
102
)
103
raise
104
105
if os.environ.get('SINGLESTOREDB_CLUSTER'):
106
raise ValueError('clusters and shared workspaces are not currently supported')
107
108
raise KeyError('no workspace group was specified')
109
110
111
def get_workspace(params: Dict[str, Any]) -> Workspace:
112
"""
113
Retrieve the specified workspace.
114
115
This function will get a workspace group or ID from the
116
following parameters:
117
118
* params['workspace_name']
119
* params['workspace_id']
120
* params['workspace']['workspace_name']
121
* params['workspace']['workspace_id']
122
123
Or, from the SINGLESTOREDB_WORKSPACE environment variable.
124
125
"""
126
manager = get_workspace_manager()
127
workspace_name = params.get('workspace_name') or \
128
(params.get('workspace') or {}).get('workspace_name')
129
if workspace_name:
130
wg = get_workspace_group(params)
131
workspaces = [
132
x for x in wg.workspaces
133
if x.name == workspace_name
134
]
135
136
if not workspaces:
137
raise KeyError(
138
f'no workspace found with name: {workspace_name}',
139
)
140
141
if len(workspaces) > 1:
142
ids = ', '.join(x.id for x in workspaces)
143
raise ValueError(
144
f'more than one workspace with given name was found: {ids}',
145
)
146
147
return workspaces[0]
148
149
workspace_id = params.get('workspace_id') or \
150
(params.get('workspace') or {}).get('workspace_id')
151
if workspace_id:
152
try:
153
return manager.get_workspace(workspace_id)
154
except ManagementError as exc:
155
if exc.errno == 404:
156
raise KeyError(f'no workspace found with ID: {workspace_id}')
157
raise
158
159
if os.environ.get('SINGLESTOREDB_WORKSPACE'):
160
try:
161
return manager.get_workspace(
162
os.environ['SINGLESTOREDB_WORKSPACE'],
163
)
164
except ManagementError as exc:
165
if exc.errno == 404:
166
raise KeyError(
167
'no workspace found with ID: '
168
f'{os.environ["SINGLESTOREDB_WORKSPACE"]}',
169
)
170
raise
171
172
if os.environ.get('SINGLESTOREDB_CLUSTER'):
173
raise ValueError('clusters and shared workspaces are not currently supported')
174
175
raise KeyError('no workspace was specified')
176
177
178
def get_deployment(
179
params: Dict[str, Any],
180
) -> Union[WorkspaceGroup, StarterWorkspace]:
181
"""
182
Find a starter workspace matching deployment_id or deployment_name.
183
184
This function will get a starter workspace or ID from the
185
following parameters:
186
187
* params['deployment_name']
188
* params['deployment_id']
189
* params['group']['deployment_name']
190
* params['group']['deployment_id']
191
* params['in_deployment']['deployment_name']
192
* params['in_deployment']['deployment_id']
193
* params['in']['in_group']['deployment_name']
194
* params['in']['in_group']['deployment_id']
195
* params['in']['in_deployment']['deployment_name']
196
* params['in']['in_deployment']['deployment_id']
197
198
Or, from the SINGLESTOREDB_WORKSPACE_GROUP
199
or SINGLESTOREDB_CLUSTER environment variables.
200
201
"""
202
manager = get_workspace_manager()
203
204
#
205
# Search for deployment by name
206
#
207
deployment_name = params.get('deployment_name') or \
208
(params.get('in_deployment') or {}).get('deployment_name') or \
209
(params.get('group') or {}).get('deployment_name') or \
210
((params.get('in') or {}).get('in_group') or {}).get('deployment_name') or \
211
((params.get('in') or {}).get('in_deployment') or {}).get('deployment_name')
212
213
if deployment_name:
214
# Standard workspace group
215
workspace_groups = [
216
x for x in manager.workspace_groups
217
if x.name == deployment_name
218
]
219
220
if len(workspace_groups) == 1:
221
return workspace_groups[0]
222
223
elif len(workspace_groups) > 1:
224
ids = ', '.join(x.id for x in workspace_groups)
225
raise ValueError(
226
f'more than one workspace group with given name was found: {ids}',
227
)
228
229
# Starter workspace
230
starter_workspaces = [
231
x for x in manager.starter_workspaces
232
if x.name == deployment_name
233
]
234
235
if len(starter_workspaces) == 1:
236
return starter_workspaces[0]
237
238
elif len(starter_workspaces) > 1:
239
ids = ', '.join(x.id for x in starter_workspaces)
240
raise ValueError(
241
f'more than one starter workspace with given name was found: {ids}',
242
)
243
244
raise KeyError(f'no deployment found with name: {deployment_name}')
245
246
#
247
# Search for deployment by ID
248
#
249
deployment_id = params.get('deployment_id') or \
250
(params.get('in_deployment') or {}).get('deployment_id') or \
251
(params.get('group') or {}).get('deployment_id') or \
252
((params.get('in') or {}).get('in_group') or {}).get('deployment_id') or \
253
((params.get('in') or {}).get('in_deployment') or {}).get('deployment_id')
254
255
if deployment_id:
256
try:
257
# Standard workspace group
258
return manager.get_workspace_group(deployment_id)
259
except ManagementError as exc:
260
if exc.errno == 404:
261
try:
262
# Starter workspace
263
return manager.get_starter_workspace(deployment_id)
264
except ManagementError as exc:
265
if exc.errno == 404:
266
raise KeyError(f'no deployment found with ID: {deployment_id}')
267
raise
268
else:
269
raise
270
271
# Use workspace group from environment
272
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
273
try:
274
return manager.get_workspace_group(
275
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
276
)
277
except ManagementError as exc:
278
if exc.errno == 404:
279
raise KeyError(
280
'no workspace found with ID: '
281
f'{os.environ["SINGLESTOREDB_WORKSPACE_GROUP"]}',
282
)
283
raise
284
285
# Use cluster from environment
286
if os.environ.get('SINGLESTOREDB_CLUSTER'):
287
try:
288
return manager.get_starter_workspace(
289
os.environ['SINGLESTOREDB_CLUSTER'],
290
)
291
except ManagementError as exc:
292
if exc.errno == 404:
293
raise KeyError(
294
'no starter workspace found with ID: '
295
f'{os.environ["SINGLESTOREDB_CLUSTER"]}',
296
)
297
raise
298
299
raise KeyError('no deployment was specified')
300
301
302
def get_file_space(params: Dict[str, Any]) -> FileSpace:
303
"""
304
Retrieve the specified file space.
305
306
This function will get a file space from the
307
following parameters:
308
309
* params['file_location']
310
"""
311
manager = get_files_manager()
312
313
file_location = params.get('file_location')
314
if file_location:
315
file_location_lower_case = file_location.lower()
316
317
if file_location_lower_case == mgmt_files.PERSONAL_SPACE:
318
return manager.personal_space
319
elif file_location_lower_case == mgmt_files.SHARED_SPACE:
320
return manager.shared_space
321
elif file_location_lower_case == mgmt_files.MODELS_SPACE:
322
return manager.models_space
323
else:
324
raise ValueError(f'invalid file location: {file_location}')
325
326
raise KeyError('no file space was specified')
327
328
329
def get_inference_api_manager() -> InferenceAPIManager:
330
"""Return the inference API manager for the current project."""
331
wm = get_workspace_manager()
332
return wm.organization.inference_apis
333
334
335
def get_inference_api(params: Dict[str, Any]) -> InferenceAPIInfo:
336
"""Return an inference API based on model name in params."""
337
inference_apis = get_inference_api_manager()
338
model_name = params['model_name']
339
return inference_apis.get(model_name)
340
341