Path: blob/main/singlestoredb/utils/results.py
469 views
#!/usr/bin/env python1"""SingleStoreDB package utilities."""2import collections3import warnings4from typing import Any5from typing import Callable6from typing import Dict7from typing import List8from typing import NamedTuple9from typing import Optional10from typing import Tuple11from typing import Union1213from .dtypes import NUMPY_TYPE_MAP14from .dtypes import POLARS_TYPE_MAP15from .dtypes import PYARROW_TYPE_MAP1617UNSIGNED_FLAG = 3218BINARY_FLAG = 1281920try:21has_numpy = True22import numpy as np23except ImportError:24has_numpy = False2526try:27has_pandas = True28import pandas as pd29except ImportError:30has_pandas = False3132try:33has_polars = True34import polars as pl35except ImportError:36has_polars = False3738try:39has_pyarrow = True40import pyarrow as pa41except ImportError:42has_pyarrow = False4344DBAPIResult = Union[List[Tuple[Any, ...]], Tuple[Any, ...]]45OneResult = Union[46Tuple[Any, ...], Dict[str, Any],47'np.ndarray', 'pd.DataFrame', 'pl.DataFrame', 'pa.Table',48]49ManyResult = Union[50List[Tuple[Any, ...]], List[Dict[str, Any]],51'np.ndarray', 'pd.DataFrame', 'pl.DataFrame', 'pa.Table',52]53Result = Union[OneResult, ManyResult]545556class Description(NamedTuple):57"""Column definition."""58name: str59type_code: int60display_size: Optional[int]61internal_size: Optional[int]62precision: Optional[int]63scale: Optional[int]64null_ok: Optional[bool]65flags: Optional[int]66charset: Optional[int]676869if has_numpy:70# If an int column is nullable, we need to use floats rather than71# ints for numpy and pandas.72NUMPY_TYPE_MAP_CAST_FLOAT = NUMPY_TYPE_MAP.copy()73NUMPY_TYPE_MAP_CAST_FLOAT.update({741: np.float32, # Tiny75-1: np.float32, # Unsigned Tiny762: np.float32, # Short77-2: np.float32, # Unsigned Short783: np.float64, # Long79-3: np.float64, # Unsigned Long808: np.float64, # LongLong81-8: np.float64, # Unsigned LongLong829: np.float64, # Int2483-9: np.float64, # Unsigned Int248413: np.float64, # Year85})8687if has_polars:88# Remap date/times to strings; let polars do the parsing89POLARS_TYPE_MAP = POLARS_TYPE_MAP.copy()90POLARS_TYPE_MAP.update({917: pl.Utf8,9210: pl.Utf8,9312: pl.Utf8,94})959697INT_TYPES = set([1, 2, 3, 8, 9])98CHAR_TYPES = set([15, 249, 250, 251, 252, 253, 254])99DECIMAL_TYPES = set([0, 246])100101102def signed(desc: Description) -> int:103if ((desc.flags or 0) & UNSIGNED_FLAG and desc.type_code in INT_TYPES) or \104(desc.charset == 63 and desc.type_code in CHAR_TYPES):105return -desc.type_code106return desc.type_code107108109def _description_to_numpy_schema(desc: List[Description]) -> Dict[str, Any]:110"""Convert description to numpy array schema info."""111if has_numpy:112return dict(113dtype=[114(115x.name,116NUMPY_TYPE_MAP_CAST_FLOAT[signed(x)]117if x.null_ok else NUMPY_TYPE_MAP[signed(x)],118)119for x in desc120],121)122return {}123124125def _description_to_pandas_schema(desc: List[Description]) -> Dict[str, Any]:126"""Convert description to pandas DataFrame schema info."""127if has_pandas:128return dict(columns=[x.name for x in desc])129return {}130131132def _decimalize_polars(desc: Description) -> 'pl.Decimal':133return pl.Decimal(desc.precision or 10, desc.scale or 0)134135136def _description_to_polars_schema(desc: List[Description]) -> Dict[str, Any]:137"""Convert description to polars DataFrame schema info."""138if has_polars:139with_columns = {}140for x in desc:141if x.type_code in [7, 12]:142if x.scale == 6:143with_columns[x.name] = pl.col(x.name).str.to_datetime(144'%Y-%m-%d %H:%M:%S.%6f', time_unit='us',145)146else:147with_columns[x.name] = pl.col(x.name).str.to_datetime(148'%Y-%m-%d %H:%M:%S', time_unit='us',149)150elif x.type_code == 10:151with_columns[x.name] = pl.col(x.name).str.to_date('%Y-%m-%d')152153return dict(154schema=dict(155schema=[156(157x.name, _decimalize_polars(x)158if x.type_code in DECIMAL_TYPES else POLARS_TYPE_MAP[signed(x)],159)160for x in desc161],162),163with_columns=with_columns,164)165return {}166167168def _decimalize_arrow(desc: Description) -> 'pa.Decimal128':169return pa.decimal128(desc.precision or 10, desc.scale or 0)170171172def _description_to_arrow_schema(desc: List[Description]) -> Dict[str, Any]:173"""Convert description to Arrow Table schema info."""174if has_pyarrow:175return dict(176schema=pa.schema([177(178x.name, _decimalize_arrow(x)179if x.type_code in DECIMAL_TYPES else PYARROW_TYPE_MAP[signed(x)],180)181for x in desc182]),183)184return {}185186187def results_to_numpy(188desc: List[Description],189res: Optional[DBAPIResult],190single: Optional[bool] = False,191schema: Optional[Dict[str, Any]] = None,192) -> Optional[Result]:193"""194Convert results to numpy.195196Parameters197----------198desc : list of Descriptions199The column metadata200res : tuple or list of tuples201The query results202single : bool, optional203Is this a single result (i.e., from `fetchone`)?204schema : Dict[str, Any], optional205Cached schema for current output format206207Returns208-------209numpy.array210If `numpy` is available211tuple or list of tuples212If `numpy` is not available213214"""215if not res:216return res217if has_numpy:218schema = _description_to_numpy_schema(desc) if schema is None else schema219if single:220return np.array([res], **schema)221return np.array(list(res), **schema)222warnings.warn(223'numpy is not available; unable to convert to array',224RuntimeWarning,225)226return res227228229def results_to_pandas(230desc: List[Description],231res: Optional[DBAPIResult],232single: Optional[bool] = False,233schema: Optional[Dict[str, Any]] = None,234) -> Optional[Result]:235"""236Convert results to pandas.237238Parameters239----------240desc : list of Descriptions241The column metadata242res : tuple or list of tuples243The query results244single : bool, optional245Is this a single result (i.e., from `fetchone`)?246schema : Dict[str, Any], optional247Cached schema for current output format248249Returns250-------251DataFrame252If `pandas` is available253tuple or list of tuples254If `pandas` is not available255256"""257if not res:258return res259if has_pandas:260schema = _description_to_pandas_schema(desc) if schema is None else schema261return pd.DataFrame(results_to_numpy(desc, res, single=single, schema=schema))262warnings.warn(263'pandas is not available; unable to convert to DataFrame',264RuntimeWarning,265)266return res267268269def results_to_polars(270desc: List[Description],271res: Optional[DBAPIResult],272single: Optional[bool] = False,273schema: Optional[Dict[str, Any]] = None,274) -> Optional[Result]:275"""276Convert results to polars.277278Parameters279----------280desc : list of Descriptions281The column metadata282res : tuple or list of tuples283The query results284single : bool, optional285Is this a single result (i.e., from `fetchone`)?286schema : Dict[str, Any], optional287Cached schema for current output format288289Returns290-------291DataFrame292If `polars` is available293tuple or list of tuples294If `polars` is not available295296"""297if not res:298return res299if has_polars:300schema = _description_to_polars_schema(desc) if schema is None else schema301if single:302out = pl.DataFrame([res], **schema.get('schema', {}))303else:304out = pl.DataFrame(res, **schema.get('schema', {}))305with_columns = schema.get('with_columns')306if with_columns:307return out.with_columns(**with_columns)308return out309warnings.warn(310'polars is not available; unable to convert to DataFrame',311RuntimeWarning,312)313return res314315316def results_to_arrow(317desc: List[Description],318res: Optional[DBAPIResult],319single: Optional[bool] = False,320schema: Optional[Dict[str, Any]] = None,321) -> Optional[Result]:322"""323Convert results to Arrow.324325Parameters326----------327desc : list of Descriptions328The column metadata329res : tuple or list of tuples330The query results331single : bool, optional332Is this a single result (i.e., from `fetchone`)?333schema : Dict[str, Any], optional334Cached schema for current output format335336Returns337-------338Table339If `pyarrow` is available340tuple or list of tuples341If `pyarrow` is not available342343"""344if not res:345return res346if has_pyarrow:347names = [x[0] for x in desc]348schema = _description_to_arrow_schema(desc) if schema is None else schema349if single:350if isinstance(res, dict):351return pa.Table.from_pylist([res], **schema)352else:353return pa.Table.from_pylist([dict(zip(names, res))], **schema)354if isinstance(res[0], dict):355return pa.Table.from_pylist(res, **schema)356else:357return pa.Table.from_pylist([dict(zip(names, x)) for x in res], **schema)358warnings.warn(359'pyarrow is not available; unable to convert to Table',360RuntimeWarning,361)362return res363364365def results_to_namedtuple(366desc: List[Description],367res: Optional[DBAPIResult],368single: Optional[bool] = False,369schema: Optional[Dict[str, Any]] = None,370) -> Optional[Result]:371"""372Convert results to namedtuples.373374Parameters375----------376desc : list of Descriptions377The column metadata378res : tuple or list of tuples379The query results380single : bool, optional381Is this a single result (i.e., from `fetchone`)?382schema : Dict[str, Any], optional383Cached schema for current output format384385Returns386-------387namedtuple388If single is True389list of namedtuples390If single is False391392"""393if not res:394return res395tup = collections.namedtuple( # type: ignore396'Row', list(397[x[0] for x in desc],398), rename=True,399)400if single:401return tup(*res)402return [tup(*x) for x in res]403404405def results_to_dict(406desc: List[Description],407res: Optional[DBAPIResult],408single: Optional[bool] = False,409schema: Optional[Dict[str, Any]] = None,410) -> Optional[Result]:411"""412Convert results to dicts.413414Parameters415----------416desc : list of Descriptions417The column metadata418res : tuple or list of tuples419The query results420single : bool, optional421Is this a single result (i.e., from `fetchone`)?422schema : Dict[str, Any], optional423Cached schema for current output format424425Returns426-------427dict428If single is True429list of dicts430If single is False431432"""433if not res:434return res435names = [x[0] for x in desc]436if single:437return dict(zip(names, res))438return [dict(zip(names, x)) for x in res]439440441def results_to_tuple(442desc: List[Description],443res: Optional[DBAPIResult],444single: Optional[bool] = False,445schema: Optional[Dict[str, Any]] = None,446) -> Optional[Result]:447"""448Convert results to tuples.449450Parameters451----------452desc : list of Descriptions453The column metadata454res : tuple or list of tuples455The query results456single : bool, optional457Is this a single result (i.e., from `fetchone`)?458schema : Dict[str, Any], optional459Cached schema for current output format460461Returns462-------463tuple464If single is True465list of tuples466If single is False467468"""469if not res:470return res471if single:472if type(res) is tuple:473return res474return tuple(res)475if type(res[0]) is tuple:476return list(res)477return [tuple(x) for x in res]478479480def _no_schema(desc: List[Description]) -> Optional[Dict[str, Any]]:481return {}482483484_converters: Dict[485str, Callable[486[487List[Description], Optional[DBAPIResult],488Optional[bool], Optional[Dict[str, Any]],489],490Optional[Result],491],492] = {493'tuple': results_to_tuple,494'tuples': results_to_tuple,495'namedtuple': results_to_namedtuple,496'namedtuples': results_to_namedtuple,497'dict': results_to_dict,498'dicts': results_to_dict,499'numpy': results_to_numpy,500'pandas': results_to_pandas,501'polars': results_to_polars,502'arrow': results_to_arrow,503'pyarrow': results_to_arrow,504}505506_schema_converters: Dict[507str, Callable[508[List[Description]], Optional[Dict[str, Any]],509],510] = {511'tuple': _no_schema,512'tuples': _no_schema,513'namedtuple': _no_schema,514'namedtuples': _no_schema,515'dict': _no_schema,516'dicts': _no_schema,517'structsequence': _no_schema,518'structsequences': _no_schema,519'numpy': _description_to_numpy_schema,520'pandas': _description_to_numpy_schema,521'polars': _description_to_polars_schema,522'arrow': _description_to_arrow_schema,523'pyarrow': _description_to_arrow_schema,524}525526527def format_results(528format: str,529desc: List[Description],530res: Optional[DBAPIResult],531single: bool = False,532schema: Optional[Dict[str, Any]] = None,533) -> Optional[Result]:534"""535Convert results to format specified in the package options.536537Parameters538----------539format : str540Name of the format type541desc : list of Descriptions542The column metadata543res : tuple or list of tuples544The query results545single : bool, optional546Is this a single result (i.e., from `fetchone`)?547schema : Dict[str, Any], optional548Cached schema for current output format549550Returns551-------552list of (named)tuples, list of dicts or DataFrame553If single is False554(named)tuple, dict, or DataFrame555If single is True556557"""558return _converters[format](desc, res, single, schema)559560561def get_schema(562format: str,563desc: List[Description],564) -> Dict[str, Any]:565"""566Convert a DB-API description to a format schema.567568Parameters569----------570format : str571Name of the format type572desc : list of Descriptions573The column metadata574575Returns576-------577Dict[str, Any]578A dictionary of function parameters containing schema information579for the given format type580581"""582if format in _schema_converters:583return _schema_converters[format](desc) or {}584return {}585586587