Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/utils.py
799 views
1
import dataclasses
2
import inspect
3
import struct
4
import sys
5
import types
6
import typing
7
from collections.abc import Iterable
8
from enum import Enum
9
from typing import Any
10
from typing import Dict
11
from typing import Tuple
12
from typing import Union
13
14
from .typing import Masked
15
16
if sys.version_info >= (3, 10):
17
_UNION_TYPES = {typing.Union, types.UnionType}
18
else:
19
_UNION_TYPES = {typing.Union}
20
21
22
is_dataclass = dataclasses.is_dataclass
23
24
25
def is_masked(obj: Any) -> bool:
26
"""Check if an object is a Masked type."""
27
origin = typing.get_origin(obj)
28
if origin is not None:
29
return origin is Masked or \
30
(inspect.isclass(origin) and issubclass(origin, Masked))
31
return False
32
33
34
def is_union(x: Any) -> bool:
35
"""Check if the object is a Union."""
36
return typing.get_origin(x) in _UNION_TYPES
37
38
39
def get_annotations(obj: Any) -> Dict[str, Any]:
40
"""Get the annotations of an object."""
41
return typing.get_type_hints(obj)
42
43
44
def get_module(obj: Any) -> str:
45
"""Get the module of an object."""
46
module = getattr(obj, '__module__', '').split('.')
47
if module:
48
return module[0]
49
return ''
50
51
52
def get_type_name(obj: Any) -> str:
53
"""Get the type name of an object."""
54
if hasattr(obj, '__name__'):
55
return obj.__name__
56
if hasattr(obj, '__class__'):
57
return obj.__class__.__name__
58
return ''
59
60
61
def is_numpy(obj: Any) -> bool:
62
"""Check if an object is a numpy array."""
63
if str(obj).startswith('numpy.ndarray['):
64
return True
65
66
if inspect.isclass(obj):
67
if get_module(obj) == 'numpy':
68
return get_type_name(obj) == 'ndarray'
69
70
origin = typing.get_origin(obj)
71
if get_module(origin) == 'numpy':
72
if get_type_name(origin) == 'ndarray':
73
return True
74
75
dtype = type(obj)
76
if get_module(dtype) == 'numpy':
77
return get_type_name(dtype) == 'ndarray'
78
79
return False
80
81
82
def is_dataframe(obj: Any) -> bool:
83
"""Check if an object is a DataFrame."""
84
# Cheating here a bit so we don't have to import pandas / polars / pyarrow:
85
# unless we absolutely need to
86
if get_module(obj) == 'pandas':
87
return get_type_name(obj) == 'DataFrame'
88
if get_module(obj) == 'polars':
89
return get_type_name(obj) == 'DataFrame'
90
if get_module(obj) == 'pyarrow':
91
return get_type_name(obj) == 'Table'
92
return False
93
94
95
def is_vector(obj: Any, include_masks: bool = False) -> bool:
96
"""Check if an object is a vector type."""
97
return is_pandas_series(obj) \
98
or is_polars_series(obj) \
99
or is_pyarrow_array(obj) \
100
or is_numpy(obj) \
101
or is_masked(obj)
102
103
104
def get_data_format(obj: Any) -> str:
105
"""Return the data format of the DataFrame / Table / vector."""
106
# Cheating here a bit so we don't have to import pandas / polars / pyarrow
107
# unless we absolutely need to
108
if get_module(obj) == 'pandas':
109
return 'pandas'
110
if get_module(obj) == 'polars':
111
return 'polars'
112
if get_module(obj) == 'pyarrow':
113
return 'arrow'
114
if get_module(obj) == 'numpy':
115
return 'numpy'
116
if isinstance(obj, list):
117
return 'list'
118
return 'scalar'
119
120
121
def is_pandas_series(obj: Any) -> bool:
122
"""Check if an object is a pandas Series."""
123
if is_union(obj):
124
obj = typing.get_args(obj)[0]
125
return (
126
get_module(obj) == 'pandas' and
127
get_type_name(obj) == 'Series'
128
)
129
130
131
def is_polars_series(obj: Any) -> bool:
132
"""Check if an object is a polars Series."""
133
if is_union(obj):
134
obj = typing.get_args(obj)[0]
135
return (
136
get_module(obj) == 'polars' and
137
get_type_name(obj) == 'Series'
138
)
139
140
141
def is_pyarrow_array(obj: Any) -> bool:
142
"""Check if an object is a pyarrow Array."""
143
if is_union(obj):
144
obj = typing.get_args(obj)[0]
145
return (
146
get_module(obj) == 'pyarrow' and
147
get_type_name(obj) == 'Array'
148
)
149
150
151
def is_typeddict(obj: Any) -> bool:
152
"""Check if an object is a TypedDict."""
153
if hasattr(typing, 'is_typeddict'):
154
return typing.is_typeddict(obj) # noqa: TYP006
155
return False
156
157
158
def is_namedtuple(obj: Any) -> bool:
159
"""Check if an object is a named tuple."""
160
if inspect.isclass(obj):
161
return (
162
issubclass(obj, tuple) and
163
hasattr(obj, '_asdict') and
164
hasattr(obj, '_fields')
165
)
166
return (
167
isinstance(obj, tuple) and
168
hasattr(obj, '_asdict') and
169
hasattr(obj, '_fields')
170
)
171
172
173
def is_pydantic(obj: Any) -> bool:
174
"""Check if an object is a pydantic model."""
175
if not inspect.isclass(obj):
176
return False
177
# We don't want to import pydantic here, so we check if
178
# the class is a subclass
179
return bool([
180
x for x in inspect.getmro(obj)
181
if get_module(x) == 'pydantic'
182
and get_type_name(x) == 'BaseModel'
183
])
184
185
186
class VectorTypes(str, Enum):
187
"""Enum for vector types."""
188
F16 = 'f16'
189
F32 = 'f32'
190
F64 = 'f64'
191
I8 = 'i8'
192
I16 = 'i16'
193
I32 = 'i32'
194
I64 = 'i64'
195
196
197
def _vector_type_to_numpy_type(
198
vector_type: VectorTypes,
199
) -> str:
200
"""Convert a vector type to a numpy type."""
201
if vector_type == VectorTypes.F16:
202
return 'f2'
203
elif vector_type == VectorTypes.F32:
204
return 'f4'
205
elif vector_type == VectorTypes.F64:
206
return 'f8'
207
elif vector_type == VectorTypes.I8:
208
return 'i1'
209
elif vector_type == VectorTypes.I16:
210
return 'i2'
211
elif vector_type == VectorTypes.I32:
212
return 'i4'
213
elif vector_type == VectorTypes.I64:
214
return 'i8'
215
raise ValueError(f'unsupported element type: {vector_type}')
216
217
218
def _vector_type_to_struct_format(
219
vec: Any,
220
vector_type: VectorTypes,
221
) -> str:
222
"""Convert a vector type to a struct format string."""
223
n = len(vec)
224
if vector_type == VectorTypes.F16:
225
if isinstance(vec, (bytes, bytearray)):
226
n = n // 2
227
return f'<{n}e'
228
elif vector_type == VectorTypes.F32:
229
if isinstance(vec, (bytes, bytearray)):
230
n = n // 4
231
return f'<{n}f'
232
elif vector_type == VectorTypes.F64:
233
if isinstance(vec, (bytes, bytearray)):
234
n = n // 8
235
return f'<{n}d'
236
elif vector_type == VectorTypes.I8:
237
return f'<{n}b'
238
elif vector_type == VectorTypes.I16:
239
if isinstance(vec, (bytes, bytearray)):
240
n = n // 2
241
return f'<{n}h'
242
elif vector_type == VectorTypes.I32:
243
if isinstance(vec, (bytes, bytearray)):
244
n = n // 4
245
return f'<{n}i'
246
elif vector_type == VectorTypes.I64:
247
if isinstance(vec, (bytes, bytearray)):
248
n = n // 8
249
return f'<{n}q'
250
raise ValueError(f'unsupported element type: {vector_type}')
251
252
253
def unpack_vector(
254
obj: Union[bytes, bytearray],
255
vec_type: VectorTypes = VectorTypes.F32,
256
) -> Tuple[Any]:
257
"""
258
Unpack a vector from bytes.
259
260
Parameters
261
----------
262
obj : bytes or bytearray
263
The object to unpack.
264
vec_type : VectorTypes
265
The type of the elements in the vector.
266
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
267
Default is 'f32'.
268
269
Returns
270
-------
271
Tuple[Any]
272
The unpacked vector.
273
274
"""
275
return struct.unpack(_vector_type_to_struct_format(obj, vec_type), obj)
276
277
278
def pack_vector(
279
obj: Any,
280
vec_type: VectorTypes = VectorTypes.F32,
281
) -> bytes:
282
"""
283
Pack a vector into bytes.
284
285
Parameters
286
----------
287
obj : Any
288
The object to pack.
289
vec_type : VectorTypes
290
The type of the elements in the vector.
291
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
292
Default is 'f32'.
293
294
Returns
295
-------
296
bytes
297
The packed vector.
298
299
"""
300
if isinstance(obj, (list, tuple)):
301
return struct.pack(_vector_type_to_struct_format(obj, vec_type), *obj)
302
303
if is_numpy(obj):
304
return obj.tobytes()
305
306
if is_pandas_series(obj):
307
import pandas as pd
308
return pd.Series(obj).to_numpy().tobytes()
309
310
if is_polars_series(obj):
311
import polars as pl
312
return pl.Series(obj).to_numpy().tobytes()
313
314
if is_pyarrow_array(obj):
315
import pyarrow as pa
316
return pa.array(obj).to_numpy().tobytes()
317
318
raise ValueError(
319
f'unsupported object type: {type(obj)}',
320
)
321
322
323
def unpack_vectors(
324
arr_of_vec: Any,
325
vec_type: VectorTypes = VectorTypes.F32,
326
) -> Iterable[Any]:
327
"""
328
Unpack a vector from an array of bytes.
329
330
Parameters
331
----------
332
arr_of_vec : Iterable[Any]
333
The array of bytes to unpack.
334
vec_type : VectorTypes
335
The type of the elements in the vector.
336
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
337
Default is 'f32'.
338
339
Returns
340
-------
341
Iterable[Any]
342
The unpacked vector.
343
344
"""
345
if isinstance(arr_of_vec, (list, tuple)):
346
return [unpack_vector(x, vec_type) for x in arr_of_vec]
347
348
import numpy as np
349
350
dtype = _vector_type_to_numpy_type(vec_type)
351
352
np_arr = np.array(
353
[np.frombuffer(x, dtype=dtype) for x in arr_of_vec],
354
dtype=dtype,
355
)
356
357
if is_numpy(arr_of_vec):
358
return np_arr
359
360
if is_pandas_series(arr_of_vec):
361
import pandas as pd
362
return pd.Series(np_arr)
363
364
if is_polars_series(arr_of_vec):
365
import polars as pl
366
return pl.Series(np_arr)
367
368
if is_pyarrow_array(arr_of_vec):
369
import pyarrow as pa
370
return pa.array(np_arr)
371
372
raise ValueError(
373
f'unsupported object type: {type(arr_of_vec)}',
374
)
375
376
377
def pack_vectors(
378
arr_of_arr: Iterable[Any],
379
vec_type: VectorTypes = VectorTypes.F32,
380
) -> Iterable[Any]:
381
"""
382
Pack a vector into an array of bytes.
383
384
Parameters
385
----------
386
arr_of_arr : Iterable[Any]
387
The array of bytes to pack.
388
vec_type : VectorTypes
389
The type of the elements in the vector.
390
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
391
Default is 'f32'.
392
393
Returns
394
-------
395
Iterable[Any]
396
The array of packed vectors.
397
398
"""
399
if isinstance(arr_of_arr, (list, tuple)):
400
if not arr_of_arr:
401
return []
402
fmt = _vector_type_to_struct_format(arr_of_arr[0], vec_type)
403
return [struct.pack(fmt, x) for x in arr_of_arr]
404
405
import numpy as np
406
407
# Use object type because numpy truncates nulls at the end of fixed binary
408
np_arr = np.array([x.tobytes() for x in arr_of_arr], dtype=np.object_)
409
410
if is_numpy(arr_of_arr):
411
return np_arr
412
413
if is_pandas_series(arr_of_arr):
414
import pandas as pd
415
return pd.Series(np_arr)
416
417
if is_polars_series(arr_of_arr):
418
import polars as pl
419
return pl.Series(np_arr)
420
421
if is_pyarrow_array(arr_of_arr):
422
import pyarrow as pa
423
return pa.array(np_arr)
424
425
raise ValueError(
426
f'unsupported object type: {type(arr_of_arr)}',
427
)
428
429