Path: blob/main/singlestoredb/functions/ext/json.py
469 views
#!/usr/bin/env python31import base642import json3from typing import Any4from typing import List5from typing import Tuple6from typing import TYPE_CHECKING78from ..dtypes import DEFAULT_VALUES9from ..dtypes import NUMPY_TYPE_MAP10from ..dtypes import PANDAS_TYPE_MAP11from ..dtypes import POLARS_TYPE_MAP12from ..dtypes import PYARROW_TYPE_MAP13from ..dtypes import PYTHON_CONVERTERS1415if TYPE_CHECKING:16try:17import numpy as np18except ImportError:19pass20try:21import pandas as pd22except ImportError:23pass24try:25import polars as pl26except ImportError:27pass28try:29import pyarrow as pa30except ImportError:31pass323334class JSONEncoder(json.JSONEncoder):3536def default(self, obj: Any) -> Any:37if isinstance(obj, bytes):38return base64.b64encode(obj).decode('utf-8')39return json.JSONEncoder.default(self, obj)404142def decode_row(coltypes: List[int], row: List[Any]) -> List[Any]:43out = []44for dtype, item in zip(coltypes, row):45out.append(PYTHON_CONVERTERS[dtype](item)) # type: ignore46return out474849def decode_value(coltype: int, data: Any) -> Any:50return PYTHON_CONVERTERS[coltype](data) # type: ignore515253def load(54colspec: List[Tuple[str, int]],55data: bytes,56) -> Tuple[List[int], List[Any]]:57'''58Convert bytes in JSON format into rows of data.5960Parameters61----------62colspec : Iterable[Tuple[str, int]]63An Iterable of column data types64data : bytes65The data in JSON format6667Returns68-------69Tuple[List[int], List[Any]]7071'''72row_ids = []73rows = []74for row_id, *row in json.loads(data.decode('utf-8'))['data']:75row_ids.append(row_id)76rows.append(decode_row([x[1] for x in colspec], row))77return row_ids, rows787980def _load_vectors(81colspec: List[Tuple[str, int]],82data: bytes,83) -> Tuple[List[int], List[Any]]:84'''85Convert bytes in JSON format into rows of data.8687Parameters88----------89colspec : Iterable[Tuple[str, int]]90An Iterable of column data types91data : bytes92The data in JSON format9394Returns95-------96Tuple[List[int] List[List[Any]]]9798'''99row_ids = []100cols: List[Tuple[Any, Any]] = []101defaults: List[Any] = []102for row_id, *row in json.loads(data.decode('utf-8'))['data']:103row_ids.append(row_id)104if not defaults:105defaults = [DEFAULT_VALUES[colspec[i][1]] for i, _ in enumerate(row)]106if not cols:107cols = [([], []) for _ in row]108for i, (spec, x) in enumerate(zip(colspec, row)):109cols[i][0].append(decode_value(spec[1], x) if x is not None else defaults[i])110cols[i][1].append(False if x is not None else True)111return row_ids, cols112113114def load_pandas(115colspec: List[Tuple[str, int]],116data: bytes,117) -> Tuple[List[int], List[Any]]:118'''119Convert bytes in JSON format into pd.Series120121Parameters122----------123colspec : Iterable[Tuple[str, int]]124An Iterable of column data types125data : bytes126The data in JSON format127128Returns129-------130Tuple[pd.Series[int], List[pd.Series[Any]]131132'''133import numpy as np134import pandas as pd135row_ids, cols = _load_vectors(colspec, data)136index = pd.Series(row_ids, dtype=np.longlong)137return index, \138[139(140pd.Series(141data, index=index, name=spec[0],142dtype=PANDAS_TYPE_MAP[spec[1]],143),144pd.Series(mask, index=index, dtype=np.longlong),145)146for (data, mask), spec in zip(cols, colspec)147]148149150def load_polars(151colspec: List[Tuple[str, int]],152data: bytes,153) -> Tuple[List[int], List[Any]]:154'''155Convert bytes in JSON format into polars.Series156157Parameters158----------159colspec : Iterable[Tuple[str, int]]160An Iterable of column data types161data : bytes162The data in JSON format163164Returns165-------166Tuple[polars.Series[int], List[polars.Series[Any]]167168'''169import polars as pl170row_ids, cols = _load_vectors(colspec, data)171return pl.Series(None, row_ids, dtype=pl.Int64), \172[173(174pl.Series(spec[0], data, dtype=POLARS_TYPE_MAP[spec[1]]),175pl.Series(None, mask, dtype=pl.Boolean),176)177for (data, mask), spec in zip(cols, colspec)178]179180181def load_numpy(182colspec: List[Tuple[str, int]],183data: bytes,184) -> Tuple[Any, List[Any]]:185'''186Convert bytes in JSON format into np.ndarrays187188Parameters189----------190colspec : Iterable[Tuple[str, int]]191An Iterable of column data types192data : bytes193The data in JSON format194195Returns196-------197Tuple[np.ndarray[int], List[np.ndarray[Any]]198199'''200import numpy as np201row_ids, cols = _load_vectors(colspec, data)202return np.asarray(row_ids, dtype=np.longlong), \203[204(205np.asarray(data, dtype=NUMPY_TYPE_MAP[spec[1]]), # type: ignore206np.asarray(mask, dtype=np.bool_), # type: ignore207)208for (data, mask), spec in zip(cols, colspec)209]210211212def load_arrow(213colspec: List[Tuple[str, int]],214data: bytes,215) -> Tuple[Any, List[Any]]:216'''217Convert bytes in JSON format into pyarrow.Arrays218219Parameters220----------221colspec : Iterable[Tuple[str, int]]222An Iterable of column data types223data : bytes224The data in JSON format225226Returns227-------228Tuple[pyarrow.Array[int], List[pyarrow.Array[Any]]229230'''231import pyarrow as pa232row_ids, cols = _load_vectors(colspec, data)233return pa.array(row_ids, type=pa.int64()), \234[235(236pa.array(237data, type=PYARROW_TYPE_MAP[dtype],238mask=pa.array(mask, type=pa.bool_()),239),240pa.array(mask, type=pa.bool_()),241)242for (data, mask), (name, dtype) in zip(cols, colspec)243]244245246def dump(247returns: List[int],248row_ids: List[int],249rows: List[List[Any]],250) -> bytes:251'''252Convert a list of lists of data into JSON format.253254Parameters255----------256returns : List[int]257The returned data type258row_ids : List[int]259Row IDs260rows : List[List[Any]]261The rows of data to serialize262263Returns264-------265bytes266267'''268data = list(zip(row_ids, *list(zip(*rows))))269return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8')270271272def _dump_vectors(273returns: List[int],274row_ids: List[int],275cols: List[Tuple[Any, Any]],276) -> bytes:277'''278Convert a list of lists of data into JSON format.279280Parameters281----------282returns : List[int]283The returned data type284row_ids : List[int]285Row IDs286cols : List[Tuple[Any, Any]]287The rows of data to serialize288289Returns290-------291bytes292293'''294masked_cols = []295for i, (data, mask) in enumerate(cols):296if mask is not None:297masked_cols.append([d if m is not None else None for d, m in zip(data, mask)])298else:299masked_cols.append(cols[i][0])300data = list(zip(row_ids, *masked_cols))301return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8')302303304load_list = _load_vectors305dump_list = _dump_vectors306307308def dump_pandas(309returns: List[int],310row_ids: 'pd.Series[int]',311cols: List[Tuple['pd.Series[int]', 'pd.Series[bool]']],312) -> bytes:313'''314Convert a list of pd.Series of data into JSON format.315316Parameters317----------318returns : List[int]319The returned data type320row_ids : pd.Series[int]321Row IDs322cols : List[Tuple[pd.Series[Any], pd.Series[bool]]]323The rows of data to serialize324325Returns326-------327bytes328329'''330import pandas as pd331row_ids.index = row_ids332df = pd.concat([row_ids] + [x[0] for x in cols], axis=1)333return ('{"data": %s}' % df.to_json(orient='values')).encode('utf-8')334335336def dump_polars(337returns: List[int],338row_ids: 'pl.Series[int]',339cols: List[Tuple['pl.Series[Any]', 'pl.Series[int]']],340) -> bytes:341'''342Convert a list of polars.Series of data into JSON format.343344Parameters345----------346returns : List[int]347The returned data type348row_ids : List[int]349cols : List[Tuple[polars.Series[Any], polars.Series[bool]]350The rows of data to serialize351352Returns353-------354bytes355356'''357return _dump_vectors(358returns,359row_ids.to_list(),360[(x[0].to_list(), x[1].to_list() if x[1] is not None else None) for x in cols],361)362363364def dump_numpy(365returns: List[int],366row_ids: 'np.typing.NDArray[np.int64]',367cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],368) -> bytes:369'''370Convert a list of np.ndarrays of data into JSON format.371372Parameters373----------374returns : List[int]375The returned data type376row_ids : List[int]377Row IDs378cols : List[Tuple[np.ndarray[Any], np.ndarray[bool]]]379The rows of data to serialize380381Returns382-------383bytes384385'''386return _dump_vectors(387returns,388row_ids.tolist(),389[(x[0].tolist(), x[1].tolist() if x[1] is not None else None) for x in cols],390)391392393def dump_arrow(394returns: List[int],395row_ids: 'pa.Array[int]',396cols: List[Tuple['pa.Array[int]', 'pa.Array[bool]']],397) -> bytes:398'''399Convert a list of pyarrow.Arrays of data into JSON format.400401Parameters402----------403returns : List[int]404The returned data type405row_ids : pyarrow.Array[int]406Row IDs407cols : List[Tuple[pyarrow.Array[Any], pyarrow.Array[Any]]]408The rows of data to serialize409410Returns411-------412bytes413414'''415return _dump_vectors(416returns,417row_ids.tolist(),418[(x[0].tolist(), x[1].tolist() if x[1] is not None else None) for x in cols],419)420421422