Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/utils.py
469 views
1
import dataclasses
2
import inspect
3
import struct
4
import sys
5
import types
6
import typing
7
from enum import Enum
8
from typing import Any
9
from typing import Dict
10
from typing import Iterable
11
from typing import Tuple
12
from typing import Union
13
14
from .typing import Masked
15
16
if sys.version_info >= (3, 10):
17
_UNION_TYPES = {typing.Union, types.UnionType}
18
else:
19
_UNION_TYPES = {typing.Union}
20
21
22
is_dataclass = dataclasses.is_dataclass
23
24
25
def is_masked(obj: Any) -> bool:
26
"""Check if an object is a Masked type."""
27
origin = typing.get_origin(obj)
28
if origin is not None:
29
return origin is Masked or \
30
(inspect.isclass(origin) and issubclass(origin, Masked))
31
return False
32
33
34
def is_union(x: Any) -> bool:
35
"""Check if the object is a Union."""
36
return typing.get_origin(x) in _UNION_TYPES
37
38
39
def get_annotations(obj: Any) -> Dict[str, Any]:
40
"""Get the annotations of an object."""
41
return typing.get_type_hints(obj)
42
43
44
def get_module(obj: Any) -> str:
45
"""Get the module of an object."""
46
module = getattr(obj, '__module__', '').split('.')
47
if module:
48
return module[0]
49
return ''
50
51
52
def get_type_name(obj: Any) -> str:
53
"""Get the type name of an object."""
54
if hasattr(obj, '__name__'):
55
return obj.__name__
56
if hasattr(obj, '__class__'):
57
return obj.__class__.__name__
58
return ''
59
60
61
def is_numpy(obj: Any) -> bool:
62
"""Check if an object is a numpy array."""
63
if str(obj).startswith('numpy.ndarray['):
64
return True
65
66
if inspect.isclass(obj):
67
if get_module(obj) == 'numpy':
68
return get_type_name(obj) == 'ndarray'
69
70
origin = typing.get_origin(obj)
71
if get_module(origin) == 'numpy':
72
if get_type_name(origin) == 'ndarray':
73
return True
74
75
dtype = type(obj)
76
if get_module(dtype) == 'numpy':
77
return get_type_name(dtype) == 'ndarray'
78
79
return False
80
81
82
def is_dataframe(obj: Any) -> bool:
83
"""Check if an object is a DataFrame."""
84
# Cheating here a bit so we don't have to import pandas / polars / pyarrow:
85
# unless we absolutely need to
86
if get_module(obj) == 'pandas':
87
return get_type_name(obj) == 'DataFrame'
88
if get_module(obj) == 'polars':
89
return get_type_name(obj) == 'DataFrame'
90
if get_module(obj) == 'pyarrow':
91
return get_type_name(obj) == 'Table'
92
return False
93
94
95
def is_vector(obj: Any, include_masks: bool = False) -> bool:
96
"""Check if an object is a vector type."""
97
return is_pandas_series(obj) \
98
or is_polars_series(obj) \
99
or is_pyarrow_array(obj) \
100
or is_numpy(obj) \
101
or is_masked(obj)
102
103
104
def get_data_format(obj: Any) -> str:
105
"""Return the data format of the DataFrame / Table / vector."""
106
# Cheating here a bit so we don't have to import pandas / polars / pyarrow
107
# unless we absolutely need to
108
if get_module(obj) == 'pandas':
109
return 'pandas'
110
if get_module(obj) == 'polars':
111
return 'polars'
112
if get_module(obj) == 'pyarrow':
113
return 'arrow'
114
if get_module(obj) == 'numpy':
115
return 'numpy'
116
if isinstance(obj, list):
117
return 'list'
118
return 'scalar'
119
120
121
def is_pandas_series(obj: Any) -> bool:
122
"""Check if an object is a pandas Series."""
123
if is_union(obj):
124
obj = typing.get_args(obj)[0]
125
return (
126
get_module(obj) == 'pandas' and
127
get_type_name(obj) == 'Series'
128
)
129
130
131
def is_polars_series(obj: Any) -> bool:
132
"""Check if an object is a polars Series."""
133
if is_union(obj):
134
obj = typing.get_args(obj)[0]
135
return (
136
get_module(obj) == 'polars' and
137
get_type_name(obj) == 'Series'
138
)
139
140
141
def is_pyarrow_array(obj: Any) -> bool:
142
"""Check if an object is a pyarrow Array."""
143
if is_union(obj):
144
obj = typing.get_args(obj)[0]
145
return (
146
get_module(obj) == 'pyarrow' and
147
get_type_name(obj) == 'Array'
148
)
149
150
151
def is_typeddict(obj: Any) -> bool:
152
"""Check if an object is a TypedDict."""
153
if hasattr(typing, 'is_typeddict'):
154
return typing.is_typeddict(obj) # noqa: TYP006
155
return False
156
157
158
def is_namedtuple(obj: Any) -> bool:
159
"""Check if an object is a named tuple."""
160
if inspect.isclass(obj):
161
return (
162
issubclass(obj, tuple) and
163
hasattr(obj, '_asdict') and
164
hasattr(obj, '_fields')
165
)
166
return (
167
isinstance(obj, tuple) and
168
hasattr(obj, '_asdict') and
169
hasattr(obj, '_fields')
170
)
171
172
173
def is_pydantic(obj: Any) -> bool:
174
"""Check if an object is a pydantic model."""
175
if not inspect.isclass(obj):
176
return False
177
# We don't want to import pydantic here, so we check if
178
# the class is a subclass
179
return bool([
180
x for x in inspect.getmro(obj)
181
if get_module(x) == 'pydantic'
182
and get_type_name(x) == 'BaseModel'
183
])
184
185
186
class VectorTypes(str, Enum):
187
"""Enum for vector types."""
188
F16 = 'f16'
189
F32 = 'f32'
190
F64 = 'f64'
191
I8 = 'i8'
192
I16 = 'i16'
193
I32 = 'i32'
194
I64 = 'i64'
195
196
197
def _vector_type_to_numpy_type(
198
vector_type: VectorTypes,
199
) -> str:
200
"""Convert a vector type to a numpy type."""
201
if vector_type == VectorTypes.F32:
202
return 'f4'
203
elif vector_type == VectorTypes.F64:
204
return 'f8'
205
elif vector_type == VectorTypes.I8:
206
return 'i1'
207
elif vector_type == VectorTypes.I16:
208
return 'i2'
209
elif vector_type == VectorTypes.I32:
210
return 'i4'
211
elif vector_type == VectorTypes.I64:
212
return 'i8'
213
raise ValueError(f'unsupported element type: {vector_type}')
214
215
216
def _vector_type_to_struct_format(
217
vec: Any,
218
vector_type: VectorTypes,
219
) -> str:
220
"""Convert a vector type to a struct format string."""
221
n = len(vec)
222
if vector_type == VectorTypes.F32:
223
if isinstance(vec, (bytes, bytearray)):
224
n = n // 4
225
return f'<{n}f'
226
elif vector_type == VectorTypes.F64:
227
if isinstance(vec, (bytes, bytearray)):
228
n = n // 8
229
return f'<{n}d'
230
elif vector_type == VectorTypes.I8:
231
return f'<{n}b'
232
elif vector_type == VectorTypes.I16:
233
if isinstance(vec, (bytes, bytearray)):
234
n = n // 2
235
return f'<{n}h'
236
elif vector_type == VectorTypes.I32:
237
if isinstance(vec, (bytes, bytearray)):
238
n = n // 4
239
return f'<{n}i'
240
elif vector_type == VectorTypes.I64:
241
if isinstance(vec, (bytes, bytearray)):
242
n = n // 8
243
return f'<{n}q'
244
raise ValueError(f'unsupported element type: {vector_type}')
245
246
247
def unpack_vector(
248
obj: Union[bytes, bytearray],
249
vec_type: VectorTypes = VectorTypes.F32,
250
) -> Tuple[Any]:
251
"""
252
Unpack a vector from bytes.
253
254
Parameters
255
----------
256
obj : bytes or bytearray
257
The object to unpack.
258
vec_type : VectorTypes
259
The type of the elements in the vector.
260
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
261
Default is 'f32'.
262
263
Returns
264
-------
265
Tuple[Any]
266
The unpacked vector.
267
268
"""
269
return struct.unpack(_vector_type_to_struct_format(obj, vec_type), obj)
270
271
272
def pack_vector(
273
obj: Any,
274
vec_type: VectorTypes = VectorTypes.F32,
275
) -> bytes:
276
"""
277
Pack a vector into bytes.
278
279
Parameters
280
----------
281
obj : Any
282
The object to pack.
283
vec_type : VectorTypes
284
The type of the elements in the vector.
285
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
286
Default is 'f32'.
287
288
Returns
289
-------
290
bytes
291
The packed vector.
292
293
"""
294
if isinstance(obj, (list, tuple)):
295
return struct.pack(_vector_type_to_struct_format(obj, vec_type), *obj)
296
297
if is_numpy(obj):
298
return obj.tobytes()
299
300
if is_pandas_series(obj):
301
import pandas as pd
302
return pd.Series(obj).to_numpy().tobytes()
303
304
if is_polars_series(obj):
305
import polars as pl
306
return pl.Series(obj).to_numpy().tobytes()
307
308
if is_pyarrow_array(obj):
309
import pyarrow as pa
310
return pa.array(obj).to_numpy().tobytes()
311
312
raise ValueError(
313
f'unsupported object type: {type(obj)}',
314
)
315
316
317
def unpack_vectors(
318
arr_of_vec: Any,
319
vec_type: VectorTypes = VectorTypes.F32,
320
) -> Iterable[Any]:
321
"""
322
Unpack a vector from an array of bytes.
323
324
Parameters
325
----------
326
arr_of_vec : Iterable[Any]
327
The array of bytes to unpack.
328
vec_type : VectorTypes
329
The type of the elements in the vector.
330
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
331
Default is 'f32'.
332
333
Returns
334
-------
335
Iterable[Any]
336
The unpacked vector.
337
338
"""
339
if isinstance(arr_of_vec, (list, tuple)):
340
return [unpack_vector(x, vec_type) for x in arr_of_vec]
341
342
import numpy as np
343
344
dtype = _vector_type_to_numpy_type(vec_type)
345
346
np_arr = np.array(
347
[np.frombuffer(x, dtype=dtype) for x in arr_of_vec],
348
dtype=dtype,
349
)
350
351
if is_numpy(arr_of_vec):
352
return np_arr
353
354
if is_pandas_series(arr_of_vec):
355
import pandas as pd
356
return pd.Series(np_arr)
357
358
if is_polars_series(arr_of_vec):
359
import polars as pl
360
return pl.Series(np_arr)
361
362
if is_pyarrow_array(arr_of_vec):
363
import pyarrow as pa
364
return pa.array(np_arr)
365
366
raise ValueError(
367
f'unsupported object type: {type(arr_of_vec)}',
368
)
369
370
371
def pack_vectors(
372
arr_of_arr: Iterable[Any],
373
vec_type: VectorTypes = VectorTypes.F32,
374
) -> Iterable[Any]:
375
"""
376
Pack a vector into an array of bytes.
377
378
Parameters
379
----------
380
arr_of_arr : Iterable[Any]
381
The array of bytes to pack.
382
vec_type : VectorTypes
383
The type of the elements in the vector.
384
Can be one of 'f32', 'f64', 'i8', 'i16', 'i32', or 'i64'.
385
Default is 'f32'.
386
387
Returns
388
-------
389
Iterable[Any]
390
The array of packed vectors.
391
392
"""
393
if isinstance(arr_of_arr, (list, tuple)):
394
if not arr_of_arr:
395
return []
396
fmt = _vector_type_to_struct_format(arr_of_arr[0], vec_type)
397
return [struct.pack(fmt, x) for x in arr_of_arr]
398
399
import numpy as np
400
401
# Use object type because numpy truncates nulls at the end of fixed binary
402
np_arr = np.array([x.tobytes() for x in arr_of_arr], dtype=np.object_)
403
404
if is_numpy(arr_of_arr):
405
return np_arr
406
407
if is_pandas_series(arr_of_arr):
408
import pandas as pd
409
return pd.Series(np_arr)
410
411
if is_polars_series(arr_of_arr):
412
import polars as pl
413
return pl.Series(np_arr)
414
415
if is_pyarrow_array(arr_of_arr):
416
import pyarrow as pa
417
return pa.array(np_arr)
418
419
raise ValueError(
420
f'unsupported object type: {type(arr_of_arr)}',
421
)
422
423