Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/ext/json.py
469 views
1
#!/usr/bin/env python3
2
import base64
3
import json
4
from typing import Any
5
from typing import List
6
from typing import Tuple
7
from typing import TYPE_CHECKING
8
9
from ..dtypes import DEFAULT_VALUES
10
from ..dtypes import NUMPY_TYPE_MAP
11
from ..dtypes import PANDAS_TYPE_MAP
12
from ..dtypes import POLARS_TYPE_MAP
13
from ..dtypes import PYARROW_TYPE_MAP
14
from ..dtypes import PYTHON_CONVERTERS
15
16
if TYPE_CHECKING:
17
try:
18
import numpy as np
19
except ImportError:
20
pass
21
try:
22
import pandas as pd
23
except ImportError:
24
pass
25
try:
26
import polars as pl
27
except ImportError:
28
pass
29
try:
30
import pyarrow as pa
31
except ImportError:
32
pass
33
34
35
class JSONEncoder(json.JSONEncoder):
36
37
def default(self, obj: Any) -> Any:
38
if isinstance(obj, bytes):
39
return base64.b64encode(obj).decode('utf-8')
40
return json.JSONEncoder.default(self, obj)
41
42
43
def decode_row(coltypes: List[int], row: List[Any]) -> List[Any]:
44
out = []
45
for dtype, item in zip(coltypes, row):
46
out.append(PYTHON_CONVERTERS[dtype](item)) # type: ignore
47
return out
48
49
50
def decode_value(coltype: int, data: Any) -> Any:
51
return PYTHON_CONVERTERS[coltype](data) # type: ignore
52
53
54
def load(
55
colspec: List[Tuple[str, int]],
56
data: bytes,
57
) -> Tuple[List[int], List[Any]]:
58
'''
59
Convert bytes in JSON format into rows of data.
60
61
Parameters
62
----------
63
colspec : Iterable[Tuple[str, int]]
64
An Iterable of column data types
65
data : bytes
66
The data in JSON format
67
68
Returns
69
-------
70
Tuple[List[int], List[Any]]
71
72
'''
73
row_ids = []
74
rows = []
75
for row_id, *row in json.loads(data.decode('utf-8'))['data']:
76
row_ids.append(row_id)
77
rows.append(decode_row([x[1] for x in colspec], row))
78
return row_ids, rows
79
80
81
def _load_vectors(
82
colspec: List[Tuple[str, int]],
83
data: bytes,
84
) -> Tuple[List[int], List[Any]]:
85
'''
86
Convert bytes in JSON format into rows of data.
87
88
Parameters
89
----------
90
colspec : Iterable[Tuple[str, int]]
91
An Iterable of column data types
92
data : bytes
93
The data in JSON format
94
95
Returns
96
-------
97
Tuple[List[int] List[List[Any]]]
98
99
'''
100
row_ids = []
101
cols: List[Tuple[Any, Any]] = []
102
defaults: List[Any] = []
103
for row_id, *row in json.loads(data.decode('utf-8'))['data']:
104
row_ids.append(row_id)
105
if not defaults:
106
defaults = [DEFAULT_VALUES[colspec[i][1]] for i, _ in enumerate(row)]
107
if not cols:
108
cols = [([], []) for _ in row]
109
for i, (spec, x) in enumerate(zip(colspec, row)):
110
cols[i][0].append(decode_value(spec[1], x) if x is not None else defaults[i])
111
cols[i][1].append(False if x is not None else True)
112
return row_ids, cols
113
114
115
def load_pandas(
116
colspec: List[Tuple[str, int]],
117
data: bytes,
118
) -> Tuple[List[int], List[Any]]:
119
'''
120
Convert bytes in JSON format into pd.Series
121
122
Parameters
123
----------
124
colspec : Iterable[Tuple[str, int]]
125
An Iterable of column data types
126
data : bytes
127
The data in JSON format
128
129
Returns
130
-------
131
Tuple[pd.Series[int], List[pd.Series[Any]]
132
133
'''
134
import numpy as np
135
import pandas as pd
136
row_ids, cols = _load_vectors(colspec, data)
137
index = pd.Series(row_ids, dtype=np.longlong)
138
return index, \
139
[
140
(
141
pd.Series(
142
data, index=index, name=spec[0],
143
dtype=PANDAS_TYPE_MAP[spec[1]],
144
),
145
pd.Series(mask, index=index, dtype=np.longlong),
146
)
147
for (data, mask), spec in zip(cols, colspec)
148
]
149
150
151
def load_polars(
152
colspec: List[Tuple[str, int]],
153
data: bytes,
154
) -> Tuple[List[int], List[Any]]:
155
'''
156
Convert bytes in JSON format into polars.Series
157
158
Parameters
159
----------
160
colspec : Iterable[Tuple[str, int]]
161
An Iterable of column data types
162
data : bytes
163
The data in JSON format
164
165
Returns
166
-------
167
Tuple[polars.Series[int], List[polars.Series[Any]]
168
169
'''
170
import polars as pl
171
row_ids, cols = _load_vectors(colspec, data)
172
return pl.Series(None, row_ids, dtype=pl.Int64), \
173
[
174
(
175
pl.Series(spec[0], data, dtype=POLARS_TYPE_MAP[spec[1]]),
176
pl.Series(None, mask, dtype=pl.Boolean),
177
)
178
for (data, mask), spec in zip(cols, colspec)
179
]
180
181
182
def load_numpy(
183
colspec: List[Tuple[str, int]],
184
data: bytes,
185
) -> Tuple[Any, List[Any]]:
186
'''
187
Convert bytes in JSON format into np.ndarrays
188
189
Parameters
190
----------
191
colspec : Iterable[Tuple[str, int]]
192
An Iterable of column data types
193
data : bytes
194
The data in JSON format
195
196
Returns
197
-------
198
Tuple[np.ndarray[int], List[np.ndarray[Any]]
199
200
'''
201
import numpy as np
202
row_ids, cols = _load_vectors(colspec, data)
203
return np.asarray(row_ids, dtype=np.longlong), \
204
[
205
(
206
np.asarray(data, dtype=NUMPY_TYPE_MAP[spec[1]]), # type: ignore
207
np.asarray(mask, dtype=np.bool_), # type: ignore
208
)
209
for (data, mask), spec in zip(cols, colspec)
210
]
211
212
213
def load_arrow(
214
colspec: List[Tuple[str, int]],
215
data: bytes,
216
) -> Tuple[Any, List[Any]]:
217
'''
218
Convert bytes in JSON format into pyarrow.Arrays
219
220
Parameters
221
----------
222
colspec : Iterable[Tuple[str, int]]
223
An Iterable of column data types
224
data : bytes
225
The data in JSON format
226
227
Returns
228
-------
229
Tuple[pyarrow.Array[int], List[pyarrow.Array[Any]]
230
231
'''
232
import pyarrow as pa
233
row_ids, cols = _load_vectors(colspec, data)
234
return pa.array(row_ids, type=pa.int64()), \
235
[
236
(
237
pa.array(
238
data, type=PYARROW_TYPE_MAP[dtype],
239
mask=pa.array(mask, type=pa.bool_()),
240
),
241
pa.array(mask, type=pa.bool_()),
242
)
243
for (data, mask), (name, dtype) in zip(cols, colspec)
244
]
245
246
247
def dump(
248
returns: List[int],
249
row_ids: List[int],
250
rows: List[List[Any]],
251
) -> bytes:
252
'''
253
Convert a list of lists of data into JSON format.
254
255
Parameters
256
----------
257
returns : List[int]
258
The returned data type
259
row_ids : List[int]
260
Row IDs
261
rows : List[List[Any]]
262
The rows of data to serialize
263
264
Returns
265
-------
266
bytes
267
268
'''
269
data = list(zip(row_ids, *list(zip(*rows))))
270
return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8')
271
272
273
def _dump_vectors(
274
returns: List[int],
275
row_ids: List[int],
276
cols: List[Tuple[Any, Any]],
277
) -> bytes:
278
'''
279
Convert a list of lists of data into JSON format.
280
281
Parameters
282
----------
283
returns : List[int]
284
The returned data type
285
row_ids : List[int]
286
Row IDs
287
cols : List[Tuple[Any, Any]]
288
The rows of data to serialize
289
290
Returns
291
-------
292
bytes
293
294
'''
295
masked_cols = []
296
for i, (data, mask) in enumerate(cols):
297
if mask is not None:
298
masked_cols.append([d if m is not None else None for d, m in zip(data, mask)])
299
else:
300
masked_cols.append(cols[i][0])
301
data = list(zip(row_ids, *masked_cols))
302
return json.dumps(dict(data=data), cls=JSONEncoder).encode('utf-8')
303
304
305
load_list = _load_vectors
306
dump_list = _dump_vectors
307
308
309
def dump_pandas(
310
returns: List[int],
311
row_ids: 'pd.Series[int]',
312
cols: List[Tuple['pd.Series[int]', 'pd.Series[bool]']],
313
) -> bytes:
314
'''
315
Convert a list of pd.Series of data into JSON format.
316
317
Parameters
318
----------
319
returns : List[int]
320
The returned data type
321
row_ids : pd.Series[int]
322
Row IDs
323
cols : List[Tuple[pd.Series[Any], pd.Series[bool]]]
324
The rows of data to serialize
325
326
Returns
327
-------
328
bytes
329
330
'''
331
import pandas as pd
332
row_ids.index = row_ids
333
df = pd.concat([row_ids] + [x[0] for x in cols], axis=1)
334
return ('{"data": %s}' % df.to_json(orient='values')).encode('utf-8')
335
336
337
def dump_polars(
338
returns: List[int],
339
row_ids: 'pl.Series[int]',
340
cols: List[Tuple['pl.Series[Any]', 'pl.Series[int]']],
341
) -> bytes:
342
'''
343
Convert a list of polars.Series of data into JSON format.
344
345
Parameters
346
----------
347
returns : List[int]
348
The returned data type
349
row_ids : List[int]
350
cols : List[Tuple[polars.Series[Any], polars.Series[bool]]
351
The rows of data to serialize
352
353
Returns
354
-------
355
bytes
356
357
'''
358
return _dump_vectors(
359
returns,
360
row_ids.to_list(),
361
[(x[0].to_list(), x[1].to_list() if x[1] is not None else None) for x in cols],
362
)
363
364
365
def dump_numpy(
366
returns: List[int],
367
row_ids: 'np.typing.NDArray[np.int64]',
368
cols: List[Tuple['np.typing.NDArray[Any]', 'np.typing.NDArray[np.bool_]']],
369
) -> bytes:
370
'''
371
Convert a list of np.ndarrays of data into JSON format.
372
373
Parameters
374
----------
375
returns : List[int]
376
The returned data type
377
row_ids : List[int]
378
Row IDs
379
cols : List[Tuple[np.ndarray[Any], np.ndarray[bool]]]
380
The rows of data to serialize
381
382
Returns
383
-------
384
bytes
385
386
'''
387
return _dump_vectors(
388
returns,
389
row_ids.tolist(),
390
[(x[0].tolist(), x[1].tolist() if x[1] is not None else None) for x in cols],
391
)
392
393
394
def dump_arrow(
395
returns: List[int],
396
row_ids: 'pa.Array[int]',
397
cols: List[Tuple['pa.Array[int]', 'pa.Array[bool]']],
398
) -> bytes:
399
'''
400
Convert a list of pyarrow.Arrays of data into JSON format.
401
402
Parameters
403
----------
404
returns : List[int]
405
The returned data type
406
row_ids : pyarrow.Array[int]
407
Row IDs
408
cols : List[Tuple[pyarrow.Array[Any], pyarrow.Array[Any]]]
409
The rows of data to serialize
410
411
Returns
412
-------
413
bytes
414
415
'''
416
return _dump_vectors(
417
returns,
418
row_ids.tolist(),
419
[(x[0].tolist(), x[1].tolist() if x[1] is not None else None) for x in cols],
420
)
421
422