Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/ext/rowdat_1.py
469 views
1
#!/usr/bin/env python3
2
import struct
3
import warnings
4
from io import BytesIO
5
from typing import Any
6
from typing import List
7
from typing import Optional
8
from typing import Sequence
9
from typing import Tuple
10
from typing import TYPE_CHECKING
11
12
from ...config import get_option
13
from ...mysql.constants import FIELD_TYPE as ft
14
from ..dtypes import DEFAULT_VALUES
15
from ..dtypes import NUMPY_TYPE_MAP
16
from ..dtypes import PANDAS_TYPE_MAP
17
from ..dtypes import POLARS_TYPE_MAP
18
from ..dtypes import PYARROW_TYPE_MAP
19
20
if TYPE_CHECKING:
21
try:
22
import numpy as np
23
except ImportError:
24
pass
25
try:
26
import polars as pl
27
except ImportError:
28
pass
29
try:
30
import pandas as pd
31
except ImportError:
32
pass
33
try:
34
import pyarrow as pa
35
except ImportError:
36
pass
37
try:
38
import pyarrow.compute as pc # noqa: F401
39
except ImportError:
40
pass
41
42
has_accel = False
43
try:
44
if not get_option('pure_python'):
45
import _singlestoredb_accel
46
has_accel = True
47
except ImportError:
48
warnings.warn(
49
'could not load accelerated data reader for external functions; '
50
'using pure Python implementation.',
51
RuntimeWarning,
52
)
53
54
numeric_formats = {
55
ft.TINY: '<b',
56
-ft.TINY: '<B',
57
ft.SHORT: '<h',
58
-ft.SHORT: '<H',
59
ft.INT24: '<i',
60
-ft.INT24: '<I',
61
ft.LONG: '<i',
62
-ft.LONG: '<I',
63
ft.LONGLONG: '<q',
64
-ft.LONGLONG: '<Q',
65
ft.FLOAT: '<f',
66
ft.DOUBLE: '<d',
67
}
68
numeric_sizes = {
69
ft.TINY: 1,
70
-ft.TINY: 1,
71
ft.SHORT: 2,
72
-ft.SHORT: 2,
73
ft.INT24: 4,
74
-ft.INT24: 4,
75
ft.LONG: 4,
76
-ft.LONG: 4,
77
ft.LONGLONG: 8,
78
-ft.LONGLONG: 8,
79
ft.FLOAT: 4,
80
ft.DOUBLE: 8,
81
}
82
medium_int_types = set([ft.INT24, -ft.INT24])
83
int_types = set([
84
ft.TINY, -ft.TINY, ft.SHORT, -ft.SHORT, ft.INT24, -ft.INT24,
85
ft.LONG, -ft.LONG, ft.LONGLONG, -ft.LONGLONG,
86
])
87
string_types = set([15, 245, 247, 248, 249, 250, 251, 252, 253, 254])
88
binary_types = set([-x for x in string_types])
89
90
91
def _load(
92
colspec: List[Tuple[str, int]],
93
data: bytes,
94
) -> Tuple[List[int], List[Any]]:
95
'''
96
Convert bytes in rowdat_1 format into rows of data.
97
98
Parameters
99
----------
100
colspec : List[str]
101
An List of column data types
102
data : bytes
103
The data in rowdat_1 format
104
105
Returns
106
-------
107
Tuple[List[int], List[Any]]
108
109
'''
110
data_len = len(data)
111
data_io = BytesIO(data)
112
row_ids = []
113
rows = []
114
val = None
115
while data_io.tell() < data_len:
116
row_ids.append(struct.unpack('<q', data_io.read(8))[0])
117
row = []
118
for _, ctype in colspec:
119
is_null = data_io.read(1) == b'\x01'
120
if ctype in numeric_formats:
121
val = struct.unpack(
122
numeric_formats[ctype],
123
data_io.read(numeric_sizes[ctype]),
124
)[0]
125
elif ctype in string_types:
126
slen = struct.unpack('<q', data_io.read(8))[0]
127
val = data_io.read(slen).decode('utf-8')
128
elif ctype in binary_types:
129
slen = struct.unpack('<q', data_io.read(8))[0]
130
val = data_io.read(slen)
131
else:
132
raise TypeError(f'unrecognized column type: {ctype}')
133
row.append(None if is_null else val)
134
rows.append(row)
135
return row_ids, rows
136
137
138
def _load_vectors(
139
colspec: List[Tuple[str, int]],
140
data: bytes,
141
) -> Tuple[List[int], List[Tuple[Sequence[Any], Optional[Sequence[Any]]]]]:
142
'''
143
Convert bytes in rowdat_1 format into columns of data.
144
145
Parameters
146
----------
147
colspec : List[str]
148
An List of column data types
149
data : bytes
150
The data in rowdat_1 format
151
152
Returns
153
-------
154
Tuple[List[int], List[Tuple[Any, Any]]]
155
156
'''
157
data_len = len(data)
158
data_io = BytesIO(data)
159
row_ids = []
160
cols: List[Any] = [[] for _ in colspec]
161
masks: List[Any] = [[] for _ in colspec]
162
val = None
163
while data_io.tell() < data_len:
164
row_ids.append(struct.unpack('<q', data_io.read(8))[0])
165
for i, (_, ctype) in enumerate(colspec):
166
default = DEFAULT_VALUES[ctype]
167
is_null = data_io.read(1) == b'\x01'
168
if ctype in numeric_formats:
169
val = struct.unpack(
170
numeric_formats[ctype],
171
data_io.read(numeric_sizes[ctype]),
172
)[0]
173
elif ctype in string_types:
174
slen = struct.unpack('<q', data_io.read(8))[0]
175
val = data_io.read(slen).decode('utf-8')
176
elif ctype in binary_types:
177
slen = struct.unpack('<q', data_io.read(8))[0]
178
val = data_io.read(slen)
179
else:
180
raise TypeError(f'unrecognized column type: {ctype}')
181
cols[i].append(default if is_null else val)
182
masks[i].append(True if is_null else False)
183
return row_ids, [(x, y) for x, y in zip(cols, masks)]
184
185
186
def _load_pandas(
187
colspec: List[Tuple[str, int]],
188
data: bytes,
189
) -> Tuple[
190
'pd.Series[np.int64]',
191
List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],
192
]:
193
'''
194
Convert bytes in rowdat_1 format into rows of data.
195
196
Parameters
197
----------
198
colspec : List[str]
199
An List of column data types
200
data : bytes
201
The data in rowdat_1 format
202
203
Returns
204
-------
205
Tuple[pd.Series[int], List[Tuple[pd.Series[Any], pd.Series[bool]]]]
206
207
'''
208
import numpy as np
209
import pandas as pd
210
211
row_ids, cols = _load_vectors(colspec, data)
212
index = pd.Series(row_ids)
213
return pd.Series(row_ids, dtype=np.int64), [
214
(
215
pd.Series(data, index=index, name=name, dtype=PANDAS_TYPE_MAP[dtype]),
216
pd.Series(mask, index=index, dtype=np.bool_),
217
)
218
for (data, mask), (name, dtype) in zip(cols, colspec)
219
]
220
221
222
def _load_polars(
223
colspec: List[Tuple[str, int]],
224
data: bytes,
225
) -> Tuple[
226
'pl.Series[pl.Int64]',
227
List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],
228
]:
229
'''
230
Convert bytes in rowdat_1 format into rows of data.
231
232
Parameters
233
----------
234
colspec : List[str]
235
An List of column data types
236
data : bytes
237
The data in rowdat_1 format
238
239
Returns
240
-------
241
Tuple[polars.Series[int], List[polars.Series[Any]]]
242
243
'''
244
import polars as pl
245
246
row_ids, cols = _load_vectors(colspec, data)
247
return pl.Series(None, row_ids, dtype=pl.Int64), \
248
[
249
(
250
pl.Series(name=name, values=data, dtype=POLARS_TYPE_MAP[dtype]),
251
pl.Series(values=mask, dtype=pl.Boolean),
252
)
253
for (data, mask), (name, dtype) in zip(cols, colspec)
254
]
255
256
257
def _load_numpy(
258
colspec: List[Tuple[str, int]],
259
data: bytes,
260
) -> Tuple[
261
'np.typing.NDArray[np.int64]',
262
List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
263
]:
264
'''
265
Convert bytes in rowdat_1 format into rows of data.
266
267
Parameters
268
----------
269
colspec : List[str]
270
An List of column data types
271
data : bytes
272
The data in rowdat_1 format
273
274
Returns
275
-------
276
Tuple[np.ndarray[int], List[np.ndarray[Any]]]
277
278
'''
279
import numpy as np
280
281
row_ids, cols = _load_vectors(colspec, data)
282
return np.asarray(row_ids, dtype=np.int64), \
283
[
284
(
285
np.asarray(data, dtype=NUMPY_TYPE_MAP[dtype]), # type: ignore
286
np.asarray(mask, dtype=np.bool_), # type: ignore
287
)
288
for (data, mask), (name, dtype) in zip(cols, colspec)
289
]
290
291
292
def _load_arrow(
293
colspec: List[Tuple[str, int]],
294
data: bytes,
295
) -> Tuple[
296
'pa.Array[pa.int64]',
297
List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],
298
]:
299
'''
300
Convert bytes in rowdat_1 format into rows of data.
301
302
Parameters
303
----------
304
colspec : List[str]
305
An List of column data types
306
data : bytes
307
The data in rowdat_1 format
308
309
Returns
310
-------
311
Tuple[pyarrow.Array[int], List[pyarrow.Array[Any]]]
312
313
'''
314
import pyarrow as pa
315
316
row_ids, cols = _load_vectors(colspec, data)
317
return pa.array(row_ids, type=pa.int64()), \
318
[
319
(
320
pa.array(
321
data, type=PYARROW_TYPE_MAP[dtype],
322
mask=pa.array(mask, type=pa.bool_()),
323
),
324
pa.array(mask, type=pa.bool_()),
325
)
326
for (data, mask), (name, dtype) in zip(cols, colspec)
327
]
328
329
330
def _dump(
331
returns: List[int],
332
row_ids: List[int],
333
rows: List[List[Any]],
334
) -> bytes:
335
'''
336
Convert a list of lists of data into rowdat_1 format.
337
338
Parameters
339
----------
340
returns : List[int]
341
The returned data type
342
row_ids : List[int]
343
The row IDs
344
rows : List[List[Any]]
345
The rows of data and masks to serialize
346
347
Returns
348
-------
349
bytes
350
351
'''
352
out = BytesIO()
353
354
if len(rows) == 0 or len(row_ids) == 0:
355
return out.getbuffer()
356
357
for row_id, *values in zip(row_ids, *list(zip(*rows))):
358
out.write(struct.pack('<q', row_id))
359
for rtype, value in zip(returns, values):
360
out.write(b'\x01' if value is None else b'\x00')
361
default = DEFAULT_VALUES[rtype]
362
if rtype in numeric_formats:
363
if value is None:
364
out.write(struct.pack(numeric_formats[rtype], default))
365
else:
366
if rtype in int_types:
367
if rtype == ft.INT24:
368
if int(value) > 8388607 or int(value) < -8388608:
369
raise ValueError(
370
'value is outside range of MEDIUMINT',
371
)
372
elif rtype == -ft.INT24:
373
if int(value) > 16777215 or int(value) < 0:
374
raise ValueError(
375
'value is outside range of UNSIGNED MEDIUMINT',
376
)
377
out.write(struct.pack(numeric_formats[rtype], int(value)))
378
else:
379
out.write(struct.pack(numeric_formats[rtype], float(value)))
380
elif rtype in string_types:
381
if value is None:
382
out.write(struct.pack('<q', 0))
383
else:
384
sval = value.encode('utf-8')
385
out.write(struct.pack('<q', len(sval)))
386
out.write(sval)
387
elif rtype in binary_types:
388
if value is None:
389
out.write(struct.pack('<q', 0))
390
else:
391
out.write(struct.pack('<q', len(value)))
392
out.write(value)
393
else:
394
raise TypeError(f'unrecognized column type: {rtype}')
395
396
return out.getbuffer()
397
398
399
def _dump_vectors(
400
returns: List[int],
401
row_ids: List[int],
402
cols: List[Tuple[Sequence[Any], Optional[Sequence[Any]]]],
403
) -> bytes:
404
'''
405
Convert a list of columns of data into rowdat_1 format.
406
407
Parameters
408
----------
409
returns : List[int]
410
The returned data type
411
row_ids : List[int]
412
The row IDs
413
cols : List[Tuple[Any, Any]]
414
The rows of data and masks to serialize
415
416
Returns
417
-------
418
bytes
419
420
'''
421
out = BytesIO()
422
423
if len(cols) == 0 or len(row_ids) == 0:
424
return out.getbuffer()
425
426
for j, row_id in enumerate(row_ids):
427
428
out.write(struct.pack('<q', row_id))
429
430
for i, rtype in enumerate(returns):
431
value = cols[i][0][j]
432
if cols[i][1] is not None:
433
is_null = cols[i][1][j] # type: ignore
434
else:
435
is_null = False
436
437
out.write(b'\x01' if is_null or value is None else b'\x00')
438
default = DEFAULT_VALUES[rtype]
439
try:
440
if rtype in numeric_formats:
441
if value is None:
442
out.write(struct.pack(numeric_formats[rtype], default))
443
else:
444
if rtype in int_types:
445
if rtype == ft.INT24:
446
if int(value) > 8388607 or int(value) < -8388608:
447
raise ValueError(
448
'value is outside range of MEDIUMINT',
449
)
450
elif rtype == -ft.INT24:
451
if int(value) > 16777215 or int(value) < 0:
452
raise ValueError(
453
'value is outside range of UNSIGNED MEDIUMINT',
454
)
455
out.write(struct.pack(numeric_formats[rtype], int(value)))
456
else:
457
out.write(struct.pack(numeric_formats[rtype], float(value)))
458
elif rtype in string_types:
459
if value is None:
460
out.write(struct.pack('<q', 0))
461
else:
462
sval = value.encode('utf-8')
463
out.write(struct.pack('<q', len(sval)))
464
out.write(sval)
465
elif rtype in binary_types:
466
if value is None:
467
out.write(struct.pack('<q', 0))
468
else:
469
out.write(struct.pack('<q', len(value)))
470
out.write(value)
471
else:
472
raise TypeError(f'unrecognized column type: {rtype}')
473
474
except struct.error as exc:
475
raise ValueError(str(exc))
476
477
return out.getbuffer()
478
479
480
def _dump_arrow(
481
returns: List[int],
482
row_ids: 'pa.Array[int]',
483
cols: List[Tuple['pa.Array[Any]', 'pa.Array[bool]']],
484
) -> bytes:
485
return _dump_vectors(
486
returns,
487
row_ids.tolist(),
488
[(x.tolist(), y.tolist() if y is not None else None) for x, y in cols],
489
)
490
491
492
def _dump_numpy(
493
returns: List[int],
494
row_ids: 'np.typing.NDArray[np.int64]',
495
cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
496
) -> bytes:
497
return _dump_vectors(
498
returns,
499
row_ids.tolist(),
500
[(x.tolist(), y.tolist() if y is not None else None) for x, y in cols],
501
)
502
503
504
def _dump_pandas(
505
returns: List[int],
506
row_ids: 'pd.Series[np.int64]',
507
cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],
508
) -> bytes:
509
return _dump_vectors(
510
returns,
511
row_ids.to_list(),
512
[(x.to_list(), y.to_list() if y is not None else None) for x, y in cols],
513
)
514
515
516
def _dump_polars(
517
returns: List[int],
518
row_ids: 'pl.Series[pl.Int64]',
519
cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],
520
) -> bytes:
521
return _dump_vectors(
522
returns,
523
row_ids.to_list(),
524
[(x.to_list(), y.to_list() if y is not None else None) for x, y in cols],
525
)
526
527
528
def _load_numpy_accel(
529
colspec: List[Tuple[str, int]],
530
data: bytes,
531
) -> Tuple[
532
'np.typing.NDArray[np.int64]',
533
List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
534
]:
535
if not has_accel:
536
raise RuntimeError('could not load SingleStoreDB extension')
537
538
return _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
539
540
541
def _dump_numpy_accel(
542
returns: List[int],
543
row_ids: 'np.typing.NDArray[np.int64]',
544
cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
545
) -> bytes:
546
if not has_accel:
547
raise RuntimeError('could not load SingleStoreDB extension')
548
549
return _singlestoredb_accel.dump_rowdat_1_numpy(returns, row_ids, cols)
550
551
552
def _load_pandas_accel(
553
colspec: List[Tuple[str, int]],
554
data: bytes,
555
) -> Tuple[
556
'pd.Series[np.int64]',
557
List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],
558
]:
559
if not has_accel:
560
raise RuntimeError('could not load SingleStoreDB extension')
561
562
import numpy as np
563
import pandas as pd
564
565
numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
566
cols = [
567
(
568
pd.Series(data, name=name, dtype=PANDAS_TYPE_MAP[dtype]),
569
pd.Series(mask, dtype=np.bool_),
570
)
571
for (name, dtype), (data, mask) in zip(colspec, numpy_cols)
572
]
573
return pd.Series(numpy_ids, dtype=np.int64), cols
574
575
576
def _dump_pandas_accel(
577
returns: List[int],
578
row_ids: 'pd.Series[np.int64]',
579
cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],
580
) -> bytes:
581
if not has_accel:
582
raise RuntimeError('could not load SingleStoreDB extension')
583
584
numpy_ids = row_ids.to_numpy()
585
numpy_cols = [
586
(
587
data.to_numpy(),
588
mask.to_numpy() if mask is not None else None,
589
)
590
for data, mask in cols
591
]
592
return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols)
593
594
595
def _load_polars_accel(
596
colspec: List[Tuple[str, int]],
597
data: bytes,
598
) -> Tuple[
599
'pl.Series[pl.Int64]',
600
List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],
601
]:
602
if not has_accel:
603
raise RuntimeError('could not load SingleStoreDB extension')
604
605
import polars as pl
606
607
numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
608
cols = [
609
(
610
pl.Series(
611
name=name, values=data.tolist()
612
if dtype in string_types or dtype in binary_types else data,
613
dtype=POLARS_TYPE_MAP[dtype],
614
),
615
pl.Series(values=mask, dtype=pl.Boolean),
616
)
617
for (name, dtype), (data, mask) in zip(colspec, numpy_cols)
618
]
619
return pl.Series(values=numpy_ids, dtype=pl.Int64), cols
620
621
622
def _dump_polars_accel(
623
returns: List[int],
624
row_ids: 'pl.Series[pl.Int64]',
625
cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],
626
) -> bytes:
627
if not has_accel:
628
raise RuntimeError('could not load SingleStoreDB extension')
629
630
numpy_ids = row_ids.to_numpy()
631
numpy_cols = [
632
(
633
data.to_numpy(),
634
mask.to_numpy() if mask is not None else None,
635
)
636
for data, mask in cols
637
]
638
return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols)
639
640
641
def _load_arrow_accel(
642
colspec: List[Tuple[str, int]],
643
data: bytes,
644
) -> Tuple[
645
'pa.Array[pa.int64]',
646
List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],
647
]:
648
if not has_accel:
649
raise RuntimeError('could not load SingleStoreDB extension')
650
651
import pyarrow as pa
652
653
numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)
654
cols = [
655
(
656
pa.array(data, type=PYARROW_TYPE_MAP[dtype], mask=mask),
657
pa.array(mask, type=pa.bool_()),
658
)
659
for (data, mask), (name, dtype) in zip(numpy_cols, colspec)
660
]
661
return pa.array(numpy_ids, type=pa.int64()), cols
662
663
664
def _create_arrow_mask(
665
data: 'pa.Array[Any]',
666
mask: 'pa.Array[pa.bool_]',
667
) -> 'pa.Array[pa.bool_]':
668
import pyarrow.compute as pc # noqa: F811
669
670
if mask is None:
671
return data.is_null().to_numpy(zero_copy_only=False)
672
673
return pc.or_(data.is_null(), mask.is_null()).to_numpy(zero_copy_only=False)
674
675
676
def _dump_arrow_accel(
677
returns: List[int],
678
row_ids: 'pa.Array[pa.int64]',
679
cols: List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],
680
) -> bytes:
681
if not has_accel:
682
raise RuntimeError('could not load SingleStoreDB extension')
683
684
numpy_cols = [
685
(
686
data.fill_null(DEFAULT_VALUES[dtype]).to_numpy(zero_copy_only=False),
687
_create_arrow_mask(data, mask),
688
)
689
for (data, mask), dtype in zip(cols, returns)
690
]
691
return _singlestoredb_accel.dump_rowdat_1_numpy(
692
returns, row_ids.to_numpy(), numpy_cols,
693
)
694
695
696
if not has_accel:
697
load = _load_accel = _load
698
dump = _dump_accel = _dump
699
load_list = _load_vectors # noqa: F811
700
dump_list = _dump_vectors # noqa: F811
701
load_pandas = _load_pandas_accel = _load_pandas # noqa: F811
702
dump_pandas = _dump_pandas_accel = _dump_pandas # noqa: F811
703
load_numpy = _load_numpy_accel = _load_numpy # noqa: F811
704
dump_numpy = _dump_numpy_accel = _dump_numpy # noqa: F811
705
load_arrow = _load_arrow_accel = _load_arrow # noqa: F811
706
dump_arrow = _dump_arrow_accel = _dump_arrow # noqa: F811
707
load_polars = _load_polars_accel = _load_polars # noqa: F811
708
dump_polars = _dump_polars_accel = _dump_polars # noqa: F811
709
710
else:
711
_load_accel = _singlestoredb_accel.load_rowdat_1
712
_dump_accel = _singlestoredb_accel.dump_rowdat_1
713
load = _load_accel
714
dump = _dump_accel
715
load_list = _load_vectors
716
dump_list = _dump_vectors
717
load_pandas = _load_pandas_accel
718
dump_pandas = _dump_pandas_accel
719
load_numpy = _load_numpy_accel
720
dump_numpy = _dump_numpy_accel
721
load_arrow = _load_arrow_accel
722
dump_arrow = _dump_arrow_accel
723
load_polars = _load_polars_accel
724
dump_polars = _dump_polars_accel
725
726