Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/utils/results.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB package utilities."""
3
import collections
4
import warnings
5
from typing import Any
6
from typing import Callable
7
from typing import Dict
8
from typing import List
9
from typing import NamedTuple
10
from typing import Optional
11
from typing import Tuple
12
from typing import Union
13
14
from .dtypes import NUMPY_TYPE_MAP
15
from .dtypes import POLARS_TYPE_MAP
16
from .dtypes import PYARROW_TYPE_MAP
17
18
UNSIGNED_FLAG = 32
19
BINARY_FLAG = 128
20
21
try:
22
has_numpy = True
23
import numpy as np
24
except ImportError:
25
has_numpy = False
26
27
try:
28
has_pandas = True
29
import pandas as pd
30
except ImportError:
31
has_pandas = False
32
33
try:
34
has_polars = True
35
import polars as pl
36
except ImportError:
37
has_polars = False
38
39
try:
40
has_pyarrow = True
41
import pyarrow as pa
42
except ImportError:
43
has_pyarrow = False
44
45
DBAPIResult = Union[List[Tuple[Any, ...]], Tuple[Any, ...]]
46
OneResult = Union[
47
Tuple[Any, ...], Dict[str, Any],
48
'np.ndarray', 'pd.DataFrame', 'pl.DataFrame', 'pa.Table',
49
]
50
ManyResult = Union[
51
List[Tuple[Any, ...]], List[Dict[str, Any]],
52
'np.ndarray', 'pd.DataFrame', 'pl.DataFrame', 'pa.Table',
53
]
54
Result = Union[OneResult, ManyResult]
55
56
57
class Description(NamedTuple):
58
"""Column definition."""
59
name: str
60
type_code: int
61
display_size: Optional[int]
62
internal_size: Optional[int]
63
precision: Optional[int]
64
scale: Optional[int]
65
null_ok: Optional[bool]
66
flags: Optional[int]
67
charset: Optional[int]
68
69
70
if has_numpy:
71
# If an int column is nullable, we need to use floats rather than
72
# ints for numpy and pandas.
73
NUMPY_TYPE_MAP_CAST_FLOAT = NUMPY_TYPE_MAP.copy()
74
NUMPY_TYPE_MAP_CAST_FLOAT.update({
75
1: np.float32, # Tiny
76
-1: np.float32, # Unsigned Tiny
77
2: np.float32, # Short
78
-2: np.float32, # Unsigned Short
79
3: np.float64, # Long
80
-3: np.float64, # Unsigned Long
81
8: np.float64, # LongLong
82
-8: np.float64, # Unsigned LongLong
83
9: np.float64, # Int24
84
-9: np.float64, # Unsigned Int24
85
13: np.float64, # Year
86
})
87
88
if has_polars:
89
# Remap date/times to strings; let polars do the parsing
90
POLARS_TYPE_MAP = POLARS_TYPE_MAP.copy()
91
POLARS_TYPE_MAP.update({
92
7: pl.Utf8,
93
10: pl.Utf8,
94
12: pl.Utf8,
95
})
96
97
98
INT_TYPES = set([1, 2, 3, 8, 9])
99
CHAR_TYPES = set([15, 249, 250, 251, 252, 253, 254])
100
DECIMAL_TYPES = set([0, 246])
101
102
103
def signed(desc: Description) -> int:
104
if ((desc.flags or 0) & UNSIGNED_FLAG and desc.type_code in INT_TYPES) or \
105
(desc.charset == 63 and desc.type_code in CHAR_TYPES):
106
return -desc.type_code
107
return desc.type_code
108
109
110
def _description_to_numpy_schema(desc: List[Description]) -> Dict[str, Any]:
111
"""Convert description to numpy array schema info."""
112
if has_numpy:
113
return dict(
114
dtype=[
115
(
116
x.name,
117
NUMPY_TYPE_MAP_CAST_FLOAT[signed(x)]
118
if x.null_ok else NUMPY_TYPE_MAP[signed(x)],
119
)
120
for x in desc
121
],
122
)
123
return {}
124
125
126
def _description_to_pandas_schema(desc: List[Description]) -> Dict[str, Any]:
127
"""Convert description to pandas DataFrame schema info."""
128
if has_pandas:
129
return dict(columns=[x.name for x in desc])
130
return {}
131
132
133
def _decimalize_polars(desc: Description) -> 'pl.Decimal':
134
return pl.Decimal(desc.precision or 10, desc.scale or 0)
135
136
137
def _description_to_polars_schema(desc: List[Description]) -> Dict[str, Any]:
138
"""Convert description to polars DataFrame schema info."""
139
if has_polars:
140
with_columns = {}
141
for x in desc:
142
if x.type_code in [7, 12]:
143
if x.scale == 6:
144
with_columns[x.name] = pl.col(x.name).str.to_datetime(
145
'%Y-%m-%d %H:%M:%S.%6f', time_unit='us',
146
)
147
else:
148
with_columns[x.name] = pl.col(x.name).str.to_datetime(
149
'%Y-%m-%d %H:%M:%S', time_unit='us',
150
)
151
elif x.type_code == 10:
152
with_columns[x.name] = pl.col(x.name).str.to_date('%Y-%m-%d')
153
154
return dict(
155
schema=dict(
156
schema=[
157
(
158
x.name, _decimalize_polars(x)
159
if x.type_code in DECIMAL_TYPES else POLARS_TYPE_MAP[signed(x)],
160
)
161
for x in desc
162
],
163
),
164
with_columns=with_columns,
165
)
166
return {}
167
168
169
def _decimalize_arrow(desc: Description) -> 'pa.Decimal128':
170
return pa.decimal128(desc.precision or 10, desc.scale or 0)
171
172
173
def _description_to_arrow_schema(desc: List[Description]) -> Dict[str, Any]:
174
"""Convert description to Arrow Table schema info."""
175
if has_pyarrow:
176
return dict(
177
schema=pa.schema([
178
(
179
x.name, _decimalize_arrow(x)
180
if x.type_code in DECIMAL_TYPES else PYARROW_TYPE_MAP[signed(x)],
181
)
182
for x in desc
183
]),
184
)
185
return {}
186
187
188
def results_to_numpy(
189
desc: List[Description],
190
res: Optional[DBAPIResult],
191
single: Optional[bool] = False,
192
schema: Optional[Dict[str, Any]] = None,
193
) -> Optional[Result]:
194
"""
195
Convert results to numpy.
196
197
Parameters
198
----------
199
desc : list of Descriptions
200
The column metadata
201
res : tuple or list of tuples
202
The query results
203
single : bool, optional
204
Is this a single result (i.e., from `fetchone`)?
205
schema : Dict[str, Any], optional
206
Cached schema for current output format
207
208
Returns
209
-------
210
numpy.array
211
If `numpy` is available
212
tuple or list of tuples
213
If `numpy` is not available
214
215
"""
216
if not res:
217
return res
218
if has_numpy:
219
schema = _description_to_numpy_schema(desc) if schema is None else schema
220
if single:
221
return np.array([res], **schema)
222
return np.array(list(res), **schema)
223
warnings.warn(
224
'numpy is not available; unable to convert to array',
225
RuntimeWarning,
226
)
227
return res
228
229
230
def results_to_pandas(
231
desc: List[Description],
232
res: Optional[DBAPIResult],
233
single: Optional[bool] = False,
234
schema: Optional[Dict[str, Any]] = None,
235
) -> Optional[Result]:
236
"""
237
Convert results to pandas.
238
239
Parameters
240
----------
241
desc : list of Descriptions
242
The column metadata
243
res : tuple or list of tuples
244
The query results
245
single : bool, optional
246
Is this a single result (i.e., from `fetchone`)?
247
schema : Dict[str, Any], optional
248
Cached schema for current output format
249
250
Returns
251
-------
252
DataFrame
253
If `pandas` is available
254
tuple or list of tuples
255
If `pandas` is not available
256
257
"""
258
if not res:
259
return res
260
if has_pandas:
261
schema = _description_to_pandas_schema(desc) if schema is None else schema
262
return pd.DataFrame(results_to_numpy(desc, res, single=single, schema=schema))
263
warnings.warn(
264
'pandas is not available; unable to convert to DataFrame',
265
RuntimeWarning,
266
)
267
return res
268
269
270
def results_to_polars(
271
desc: List[Description],
272
res: Optional[DBAPIResult],
273
single: Optional[bool] = False,
274
schema: Optional[Dict[str, Any]] = None,
275
) -> Optional[Result]:
276
"""
277
Convert results to polars.
278
279
Parameters
280
----------
281
desc : list of Descriptions
282
The column metadata
283
res : tuple or list of tuples
284
The query results
285
single : bool, optional
286
Is this a single result (i.e., from `fetchone`)?
287
schema : Dict[str, Any], optional
288
Cached schema for current output format
289
290
Returns
291
-------
292
DataFrame
293
If `polars` is available
294
tuple or list of tuples
295
If `polars` is not available
296
297
"""
298
if not res:
299
return res
300
if has_polars:
301
schema = _description_to_polars_schema(desc) if schema is None else schema
302
if single:
303
out = pl.DataFrame([res], **schema.get('schema', {}))
304
else:
305
out = pl.DataFrame(res, **schema.get('schema', {}))
306
with_columns = schema.get('with_columns')
307
if with_columns:
308
return out.with_columns(**with_columns)
309
return out
310
warnings.warn(
311
'polars is not available; unable to convert to DataFrame',
312
RuntimeWarning,
313
)
314
return res
315
316
317
def results_to_arrow(
318
desc: List[Description],
319
res: Optional[DBAPIResult],
320
single: Optional[bool] = False,
321
schema: Optional[Dict[str, Any]] = None,
322
) -> Optional[Result]:
323
"""
324
Convert results to Arrow.
325
326
Parameters
327
----------
328
desc : list of Descriptions
329
The column metadata
330
res : tuple or list of tuples
331
The query results
332
single : bool, optional
333
Is this a single result (i.e., from `fetchone`)?
334
schema : Dict[str, Any], optional
335
Cached schema for current output format
336
337
Returns
338
-------
339
Table
340
If `pyarrow` is available
341
tuple or list of tuples
342
If `pyarrow` is not available
343
344
"""
345
if not res:
346
return res
347
if has_pyarrow:
348
names = [x[0] for x in desc]
349
schema = _description_to_arrow_schema(desc) if schema is None else schema
350
if single:
351
if isinstance(res, dict):
352
return pa.Table.from_pylist([res], **schema)
353
else:
354
return pa.Table.from_pylist([dict(zip(names, res))], **schema)
355
if isinstance(res[0], dict):
356
return pa.Table.from_pylist(res, **schema)
357
else:
358
return pa.Table.from_pylist([dict(zip(names, x)) for x in res], **schema)
359
warnings.warn(
360
'pyarrow is not available; unable to convert to Table',
361
RuntimeWarning,
362
)
363
return res
364
365
366
def results_to_namedtuple(
367
desc: List[Description],
368
res: Optional[DBAPIResult],
369
single: Optional[bool] = False,
370
schema: Optional[Dict[str, Any]] = None,
371
) -> Optional[Result]:
372
"""
373
Convert results to namedtuples.
374
375
Parameters
376
----------
377
desc : list of Descriptions
378
The column metadata
379
res : tuple or list of tuples
380
The query results
381
single : bool, optional
382
Is this a single result (i.e., from `fetchone`)?
383
schema : Dict[str, Any], optional
384
Cached schema for current output format
385
386
Returns
387
-------
388
namedtuple
389
If single is True
390
list of namedtuples
391
If single is False
392
393
"""
394
if not res:
395
return res
396
tup = collections.namedtuple( # type: ignore
397
'Row', list(
398
[x[0] for x in desc],
399
), rename=True,
400
)
401
if single:
402
return tup(*res)
403
return [tup(*x) for x in res]
404
405
406
def results_to_dict(
407
desc: List[Description],
408
res: Optional[DBAPIResult],
409
single: Optional[bool] = False,
410
schema: Optional[Dict[str, Any]] = None,
411
) -> Optional[Result]:
412
"""
413
Convert results to dicts.
414
415
Parameters
416
----------
417
desc : list of Descriptions
418
The column metadata
419
res : tuple or list of tuples
420
The query results
421
single : bool, optional
422
Is this a single result (i.e., from `fetchone`)?
423
schema : Dict[str, Any], optional
424
Cached schema for current output format
425
426
Returns
427
-------
428
dict
429
If single is True
430
list of dicts
431
If single is False
432
433
"""
434
if not res:
435
return res
436
names = [x[0] for x in desc]
437
if single:
438
return dict(zip(names, res))
439
return [dict(zip(names, x)) for x in res]
440
441
442
def results_to_tuple(
443
desc: List[Description],
444
res: Optional[DBAPIResult],
445
single: Optional[bool] = False,
446
schema: Optional[Dict[str, Any]] = None,
447
) -> Optional[Result]:
448
"""
449
Convert results to tuples.
450
451
Parameters
452
----------
453
desc : list of Descriptions
454
The column metadata
455
res : tuple or list of tuples
456
The query results
457
single : bool, optional
458
Is this a single result (i.e., from `fetchone`)?
459
schema : Dict[str, Any], optional
460
Cached schema for current output format
461
462
Returns
463
-------
464
tuple
465
If single is True
466
list of tuples
467
If single is False
468
469
"""
470
if not res:
471
return res
472
if single:
473
if type(res) is tuple:
474
return res
475
return tuple(res)
476
if type(res[0]) is tuple:
477
return list(res)
478
return [tuple(x) for x in res]
479
480
481
def _no_schema(desc: List[Description]) -> Optional[Dict[str, Any]]:
482
return {}
483
484
485
_converters: Dict[
486
str, Callable[
487
[
488
List[Description], Optional[DBAPIResult],
489
Optional[bool], Optional[Dict[str, Any]],
490
],
491
Optional[Result],
492
],
493
] = {
494
'tuple': results_to_tuple,
495
'tuples': results_to_tuple,
496
'namedtuple': results_to_namedtuple,
497
'namedtuples': results_to_namedtuple,
498
'dict': results_to_dict,
499
'dicts': results_to_dict,
500
'numpy': results_to_numpy,
501
'pandas': results_to_pandas,
502
'polars': results_to_polars,
503
'arrow': results_to_arrow,
504
'pyarrow': results_to_arrow,
505
}
506
507
_schema_converters: Dict[
508
str, Callable[
509
[List[Description]], Optional[Dict[str, Any]],
510
],
511
] = {
512
'tuple': _no_schema,
513
'tuples': _no_schema,
514
'namedtuple': _no_schema,
515
'namedtuples': _no_schema,
516
'dict': _no_schema,
517
'dicts': _no_schema,
518
'structsequence': _no_schema,
519
'structsequences': _no_schema,
520
'numpy': _description_to_numpy_schema,
521
'pandas': _description_to_numpy_schema,
522
'polars': _description_to_polars_schema,
523
'arrow': _description_to_arrow_schema,
524
'pyarrow': _description_to_arrow_schema,
525
}
526
527
528
def format_results(
529
format: str,
530
desc: List[Description],
531
res: Optional[DBAPIResult],
532
single: bool = False,
533
schema: Optional[Dict[str, Any]] = None,
534
) -> Optional[Result]:
535
"""
536
Convert results to format specified in the package options.
537
538
Parameters
539
----------
540
format : str
541
Name of the format type
542
desc : list of Descriptions
543
The column metadata
544
res : tuple or list of tuples
545
The query results
546
single : bool, optional
547
Is this a single result (i.e., from `fetchone`)?
548
schema : Dict[str, Any], optional
549
Cached schema for current output format
550
551
Returns
552
-------
553
list of (named)tuples, list of dicts or DataFrame
554
If single is False
555
(named)tuple, dict, or DataFrame
556
If single is True
557
558
"""
559
return _converters[format](desc, res, single, schema)
560
561
562
def get_schema(
563
format: str,
564
desc: List[Description],
565
) -> Dict[str, Any]:
566
"""
567
Convert a DB-API description to a format schema.
568
569
Parameters
570
----------
571
format : str
572
Name of the format type
573
desc : list of Descriptions
574
The column metadata
575
576
Returns
577
-------
578
Dict[str, Any]
579
A dictionary of function parameters containing schema information
580
for the given format type
581
582
"""
583
if format in _schema_converters:
584
return _schema_converters[format](desc) or {}
585
return {}
586
587