Path: blob/main/singlestoredb/functions/ext/rowdat_1.py
469 views
#!/usr/bin/env python31import struct2import warnings3from io import BytesIO4from typing import Any5from typing import List6from typing import Optional7from typing import Sequence8from typing import Tuple9from typing import TYPE_CHECKING1011from ...config import get_option12from ...mysql.constants import FIELD_TYPE as ft13from ..dtypes import DEFAULT_VALUES14from ..dtypes import NUMPY_TYPE_MAP15from ..dtypes import PANDAS_TYPE_MAP16from ..dtypes import POLARS_TYPE_MAP17from ..dtypes import PYARROW_TYPE_MAP1819if TYPE_CHECKING:20try:21import numpy as np22except ImportError:23pass24try:25import polars as pl26except ImportError:27pass28try:29import pandas as pd30except ImportError:31pass32try:33import pyarrow as pa34except ImportError:35pass36try:37import pyarrow.compute as pc # noqa: F40138except ImportError:39pass4041has_accel = False42try:43if not get_option('pure_python'):44import _singlestoredb_accel45has_accel = True46except ImportError:47warnings.warn(48'could not load accelerated data reader for external functions; '49'using pure Python implementation.',50RuntimeWarning,51)5253numeric_formats = {54ft.TINY: '<b',55-ft.TINY: '<B',56ft.SHORT: '<h',57-ft.SHORT: '<H',58ft.INT24: '<i',59-ft.INT24: '<I',60ft.LONG: '<i',61-ft.LONG: '<I',62ft.LONGLONG: '<q',63-ft.LONGLONG: '<Q',64ft.FLOAT: '<f',65ft.DOUBLE: '<d',66}67numeric_sizes = {68ft.TINY: 1,69-ft.TINY: 1,70ft.SHORT: 2,71-ft.SHORT: 2,72ft.INT24: 4,73-ft.INT24: 4,74ft.LONG: 4,75-ft.LONG: 4,76ft.LONGLONG: 8,77-ft.LONGLONG: 8,78ft.FLOAT: 4,79ft.DOUBLE: 8,80}81medium_int_types = set([ft.INT24, -ft.INT24])82int_types = set([83ft.TINY, -ft.TINY, ft.SHORT, -ft.SHORT, ft.INT24, -ft.INT24,84ft.LONG, -ft.LONG, ft.LONGLONG, -ft.LONGLONG,85])86string_types = set([15, 245, 247, 248, 249, 250, 251, 252, 253, 254])87binary_types = set([-x for x in string_types])888990def _load(91colspec: List[Tuple[str, int]],92data: bytes,93) -> Tuple[List[int], List[Any]]:94'''95Convert bytes in rowdat_1 format into rows of data.9697Parameters98----------99colspec : List[str]100An List of column data types101data : bytes102The data in rowdat_1 format103104Returns105-------106Tuple[List[int], List[Any]]107108'''109data_len = len(data)110data_io = BytesIO(data)111row_ids = []112rows = []113val = None114while data_io.tell() < data_len:115row_ids.append(struct.unpack('<q', data_io.read(8))[0])116row = []117for _, ctype in colspec:118is_null = data_io.read(1) == b'\x01'119if ctype in numeric_formats:120val = struct.unpack(121numeric_formats[ctype],122data_io.read(numeric_sizes[ctype]),123)[0]124elif ctype in string_types:125slen = struct.unpack('<q', data_io.read(8))[0]126val = data_io.read(slen).decode('utf-8')127elif ctype in binary_types:128slen = struct.unpack('<q', data_io.read(8))[0]129val = data_io.read(slen)130else:131raise TypeError(f'unrecognized column type: {ctype}')132row.append(None if is_null else val)133rows.append(row)134return row_ids, rows135136137def _load_vectors(138colspec: List[Tuple[str, int]],139data: bytes,140) -> Tuple[List[int], List[Tuple[Sequence[Any], Optional[Sequence[Any]]]]]:141'''142Convert bytes in rowdat_1 format into columns of data.143144Parameters145----------146colspec : List[str]147An List of column data types148data : bytes149The data in rowdat_1 format150151Returns152-------153Tuple[List[int], List[Tuple[Any, Any]]]154155'''156data_len = len(data)157data_io = BytesIO(data)158row_ids = []159cols: List[Any] = [[] for _ in colspec]160masks: List[Any] = [[] for _ in colspec]161val = None162while data_io.tell() < data_len:163row_ids.append(struct.unpack('<q', data_io.read(8))[0])164for i, (_, ctype) in enumerate(colspec):165default = DEFAULT_VALUES[ctype]166is_null = data_io.read(1) == b'\x01'167if ctype in numeric_formats:168val = struct.unpack(169numeric_formats[ctype],170data_io.read(numeric_sizes[ctype]),171)[0]172elif ctype in string_types:173slen = struct.unpack('<q', data_io.read(8))[0]174val = data_io.read(slen).decode('utf-8')175elif ctype in binary_types:176slen = struct.unpack('<q', data_io.read(8))[0]177val = data_io.read(slen)178else:179raise TypeError(f'unrecognized column type: {ctype}')180cols[i].append(default if is_null else val)181masks[i].append(True if is_null else False)182return row_ids, [(x, y) for x, y in zip(cols, masks)]183184185def _load_pandas(186colspec: List[Tuple[str, int]],187data: bytes,188) -> Tuple[189'pd.Series[np.int64]',190List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],191]:192'''193Convert bytes in rowdat_1 format into rows of data.194195Parameters196----------197colspec : List[str]198An List of column data types199data : bytes200The data in rowdat_1 format201202Returns203-------204Tuple[pd.Series[int], List[Tuple[pd.Series[Any], pd.Series[bool]]]]205206'''207import numpy as np208import pandas as pd209210row_ids, cols = _load_vectors(colspec, data)211index = pd.Series(row_ids)212return pd.Series(row_ids, dtype=np.int64), [213(214pd.Series(data, index=index, name=name, dtype=PANDAS_TYPE_MAP[dtype]),215pd.Series(mask, index=index, dtype=np.bool_),216)217for (data, mask), (name, dtype) in zip(cols, colspec)218]219220221def _load_polars(222colspec: List[Tuple[str, int]],223data: bytes,224) -> Tuple[225'pl.Series[pl.Int64]',226List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],227]:228'''229Convert bytes in rowdat_1 format into rows of data.230231Parameters232----------233colspec : List[str]234An List of column data types235data : bytes236The data in rowdat_1 format237238Returns239-------240Tuple[polars.Series[int], List[polars.Series[Any]]]241242'''243import polars as pl244245row_ids, cols = _load_vectors(colspec, data)246return pl.Series(None, row_ids, dtype=pl.Int64), \247[248(249pl.Series(name=name, values=data, dtype=POLARS_TYPE_MAP[dtype]),250pl.Series(values=mask, dtype=pl.Boolean),251)252for (data, mask), (name, dtype) in zip(cols, colspec)253]254255256def _load_numpy(257colspec: List[Tuple[str, int]],258data: bytes,259) -> Tuple[260'np.typing.NDArray[np.int64]',261List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],262]:263'''264Convert bytes in rowdat_1 format into rows of data.265266Parameters267----------268colspec : List[str]269An List of column data types270data : bytes271The data in rowdat_1 format272273Returns274-------275Tuple[np.ndarray[int], List[np.ndarray[Any]]]276277'''278import numpy as np279280row_ids, cols = _load_vectors(colspec, data)281return np.asarray(row_ids, dtype=np.int64), \282[283(284np.asarray(data, dtype=NUMPY_TYPE_MAP[dtype]), # type: ignore285np.asarray(mask, dtype=np.bool_), # type: ignore286)287for (data, mask), (name, dtype) in zip(cols, colspec)288]289290291def _load_arrow(292colspec: List[Tuple[str, int]],293data: bytes,294) -> Tuple[295'pa.Array[pa.int64]',296List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],297]:298'''299Convert bytes in rowdat_1 format into rows of data.300301Parameters302----------303colspec : List[str]304An List of column data types305data : bytes306The data in rowdat_1 format307308Returns309-------310Tuple[pyarrow.Array[int], List[pyarrow.Array[Any]]]311312'''313import pyarrow as pa314315row_ids, cols = _load_vectors(colspec, data)316return pa.array(row_ids, type=pa.int64()), \317[318(319pa.array(320data, type=PYARROW_TYPE_MAP[dtype],321mask=pa.array(mask, type=pa.bool_()),322),323pa.array(mask, type=pa.bool_()),324)325for (data, mask), (name, dtype) in zip(cols, colspec)326]327328329def _dump(330returns: List[int],331row_ids: List[int],332rows: List[List[Any]],333) -> bytes:334'''335Convert a list of lists of data into rowdat_1 format.336337Parameters338----------339returns : List[int]340The returned data type341row_ids : List[int]342The row IDs343rows : List[List[Any]]344The rows of data and masks to serialize345346Returns347-------348bytes349350'''351out = BytesIO()352353if len(rows) == 0 or len(row_ids) == 0:354return out.getbuffer()355356for row_id, *values in zip(row_ids, *list(zip(*rows))):357out.write(struct.pack('<q', row_id))358for rtype, value in zip(returns, values):359out.write(b'\x01' if value is None else b'\x00')360default = DEFAULT_VALUES[rtype]361if rtype in numeric_formats:362if value is None:363out.write(struct.pack(numeric_formats[rtype], default))364else:365if rtype in int_types:366if rtype == ft.INT24:367if int(value) > 8388607 or int(value) < -8388608:368raise ValueError(369'value is outside range of MEDIUMINT',370)371elif rtype == -ft.INT24:372if int(value) > 16777215 or int(value) < 0:373raise ValueError(374'value is outside range of UNSIGNED MEDIUMINT',375)376out.write(struct.pack(numeric_formats[rtype], int(value)))377else:378out.write(struct.pack(numeric_formats[rtype], float(value)))379elif rtype in string_types:380if value is None:381out.write(struct.pack('<q', 0))382else:383sval = value.encode('utf-8')384out.write(struct.pack('<q', len(sval)))385out.write(sval)386elif rtype in binary_types:387if value is None:388out.write(struct.pack('<q', 0))389else:390out.write(struct.pack('<q', len(value)))391out.write(value)392else:393raise TypeError(f'unrecognized column type: {rtype}')394395return out.getbuffer()396397398def _dump_vectors(399returns: List[int],400row_ids: List[int],401cols: List[Tuple[Sequence[Any], Optional[Sequence[Any]]]],402) -> bytes:403'''404Convert a list of columns of data into rowdat_1 format.405406Parameters407----------408returns : List[int]409The returned data type410row_ids : List[int]411The row IDs412cols : List[Tuple[Any, Any]]413The rows of data and masks to serialize414415Returns416-------417bytes418419'''420out = BytesIO()421422if len(cols) == 0 or len(row_ids) == 0:423return out.getbuffer()424425for j, row_id in enumerate(row_ids):426427out.write(struct.pack('<q', row_id))428429for i, rtype in enumerate(returns):430value = cols[i][0][j]431if cols[i][1] is not None:432is_null = cols[i][1][j] # type: ignore433else:434is_null = False435436out.write(b'\x01' if is_null or value is None else b'\x00')437default = DEFAULT_VALUES[rtype]438try:439if rtype in numeric_formats:440if value is None:441out.write(struct.pack(numeric_formats[rtype], default))442else:443if rtype in int_types:444if rtype == ft.INT24:445if int(value) > 8388607 or int(value) < -8388608:446raise ValueError(447'value is outside range of MEDIUMINT',448)449elif rtype == -ft.INT24:450if int(value) > 16777215 or int(value) < 0:451raise ValueError(452'value is outside range of UNSIGNED MEDIUMINT',453)454out.write(struct.pack(numeric_formats[rtype], int(value)))455else:456out.write(struct.pack(numeric_formats[rtype], float(value)))457elif rtype in string_types:458if value is None:459out.write(struct.pack('<q', 0))460else:461sval = value.encode('utf-8')462out.write(struct.pack('<q', len(sval)))463out.write(sval)464elif rtype in binary_types:465if value is None:466out.write(struct.pack('<q', 0))467else:468out.write(struct.pack('<q', len(value)))469out.write(value)470else:471raise TypeError(f'unrecognized column type: {rtype}')472473except struct.error as exc:474raise ValueError(str(exc))475476return out.getbuffer()477478479def _dump_arrow(480returns: List[int],481row_ids: 'pa.Array[int]',482cols: List[Tuple['pa.Array[Any]', 'pa.Array[bool]']],483) -> bytes:484return _dump_vectors(485returns,486row_ids.tolist(),487[(x.tolist(), y.tolist() if y is not None else None) for x, y in cols],488)489490491def _dump_numpy(492returns: List[int],493row_ids: 'np.typing.NDArray[np.int64]',494cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],495) -> bytes:496return _dump_vectors(497returns,498row_ids.tolist(),499[(x.tolist(), y.tolist() if y is not None else None) for x, y in cols],500)501502503def _dump_pandas(504returns: List[int],505row_ids: 'pd.Series[np.int64]',506cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],507) -> bytes:508return _dump_vectors(509returns,510row_ids.to_list(),511[(x.to_list(), y.to_list() if y is not None else None) for x, y in cols],512)513514515def _dump_polars(516returns: List[int],517row_ids: 'pl.Series[pl.Int64]',518cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],519) -> bytes:520return _dump_vectors(521returns,522row_ids.to_list(),523[(x.to_list(), y.to_list() if y is not None else None) for x, y in cols],524)525526527def _load_numpy_accel(528colspec: List[Tuple[str, int]],529data: bytes,530) -> Tuple[531'np.typing.NDArray[np.int64]',532List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],533]:534if not has_accel:535raise RuntimeError('could not load SingleStoreDB extension')536537return _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)538539540def _dump_numpy_accel(541returns: List[int],542row_ids: 'np.typing.NDArray[np.int64]',543cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],544) -> bytes:545if not has_accel:546raise RuntimeError('could not load SingleStoreDB extension')547548return _singlestoredb_accel.dump_rowdat_1_numpy(returns, row_ids, cols)549550551def _load_pandas_accel(552colspec: List[Tuple[str, int]],553data: bytes,554) -> Tuple[555'pd.Series[np.int64]',556List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],557]:558if not has_accel:559raise RuntimeError('could not load SingleStoreDB extension')560561import numpy as np562import pandas as pd563564numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)565cols = [566(567pd.Series(data, name=name, dtype=PANDAS_TYPE_MAP[dtype]),568pd.Series(mask, dtype=np.bool_),569)570for (name, dtype), (data, mask) in zip(colspec, numpy_cols)571]572return pd.Series(numpy_ids, dtype=np.int64), cols573574575def _dump_pandas_accel(576returns: List[int],577row_ids: 'pd.Series[np.int64]',578cols: List[Tuple['pd.Series[Any]', 'pd.Series[np.bool_]']],579) -> bytes:580if not has_accel:581raise RuntimeError('could not load SingleStoreDB extension')582583numpy_ids = row_ids.to_numpy()584numpy_cols = [585(586data.to_numpy(),587mask.to_numpy() if mask is not None else None,588)589for data, mask in cols590]591return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols)592593594def _load_polars_accel(595colspec: List[Tuple[str, int]],596data: bytes,597) -> Tuple[598'pl.Series[pl.Int64]',599List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],600]:601if not has_accel:602raise RuntimeError('could not load SingleStoreDB extension')603604import polars as pl605606numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)607cols = [608(609pl.Series(610name=name, values=data.tolist()611if dtype in string_types or dtype in binary_types else data,612dtype=POLARS_TYPE_MAP[dtype],613),614pl.Series(values=mask, dtype=pl.Boolean),615)616for (name, dtype), (data, mask) in zip(colspec, numpy_cols)617]618return pl.Series(values=numpy_ids, dtype=pl.Int64), cols619620621def _dump_polars_accel(622returns: List[int],623row_ids: 'pl.Series[pl.Int64]',624cols: List[Tuple['pl.Series[Any]', 'pl.Series[pl.Boolean]']],625) -> bytes:626if not has_accel:627raise RuntimeError('could not load SingleStoreDB extension')628629numpy_ids = row_ids.to_numpy()630numpy_cols = [631(632data.to_numpy(),633mask.to_numpy() if mask is not None else None,634)635for data, mask in cols636]637return _singlestoredb_accel.dump_rowdat_1_numpy(returns, numpy_ids, numpy_cols)638639640def _load_arrow_accel(641colspec: List[Tuple[str, int]],642data: bytes,643) -> Tuple[644'pa.Array[pa.int64]',645List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],646]:647if not has_accel:648raise RuntimeError('could not load SingleStoreDB extension')649650import pyarrow as pa651652numpy_ids, numpy_cols = _singlestoredb_accel.load_rowdat_1_numpy(colspec, data)653cols = [654(655pa.array(data, type=PYARROW_TYPE_MAP[dtype], mask=mask),656pa.array(mask, type=pa.bool_()),657)658for (data, mask), (name, dtype) in zip(numpy_cols, colspec)659]660return pa.array(numpy_ids, type=pa.int64()), cols661662663def _create_arrow_mask(664data: 'pa.Array[Any]',665mask: 'pa.Array[pa.bool_]',666) -> 'pa.Array[pa.bool_]':667import pyarrow.compute as pc # noqa: F811668669if mask is None:670return data.is_null().to_numpy(zero_copy_only=False)671672return pc.or_(data.is_null(), mask.is_null()).to_numpy(zero_copy_only=False)673674675def _dump_arrow_accel(676returns: List[int],677row_ids: 'pa.Array[pa.int64]',678cols: List[Tuple['pa.Array[Any]', 'pa.Array[pa.bool_]']],679) -> bytes:680if not has_accel:681raise RuntimeError('could not load SingleStoreDB extension')682683numpy_cols = [684(685data.fill_null(DEFAULT_VALUES[dtype]).to_numpy(zero_copy_only=False),686_create_arrow_mask(data, mask),687)688for (data, mask), dtype in zip(cols, returns)689]690return _singlestoredb_accel.dump_rowdat_1_numpy(691returns, row_ids.to_numpy(), numpy_cols,692)693694695if not has_accel:696load = _load_accel = _load697dump = _dump_accel = _dump698load_list = _load_vectors # noqa: F811699dump_list = _dump_vectors # noqa: F811700load_pandas = _load_pandas_accel = _load_pandas # noqa: F811701dump_pandas = _dump_pandas_accel = _dump_pandas # noqa: F811702load_numpy = _load_numpy_accel = _load_numpy # noqa: F811703dump_numpy = _dump_numpy_accel = _dump_numpy # noqa: F811704load_arrow = _load_arrow_accel = _load_arrow # noqa: F811705dump_arrow = _dump_arrow_accel = _dump_arrow # noqa: F811706load_polars = _load_polars_accel = _load_polars # noqa: F811707dump_polars = _dump_polars_accel = _dump_polars # noqa: F811708709else:710_load_accel = _singlestoredb_accel.load_rowdat_1711_dump_accel = _singlestoredb_accel.dump_rowdat_1712load = _load_accel713dump = _dump_accel714load_list = _load_vectors715dump_list = _dump_vectors716load_pandas = _load_pandas_accel717dump_pandas = _dump_pandas_accel718load_numpy = _load_numpy_accel719dump_numpy = _dump_numpy_accel720load_arrow = _load_arrow_accel721dump_arrow = _dump_arrow_accel722load_polars = _load_polars_accel723dump_polars = _dump_polars_accel724725726