Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/pandas/io/parquet.py
7827 views
1
""" parquet compat """
2
from __future__ import annotations
3
4
import io
5
import os
6
from typing import Any
7
from warnings import catch_warnings
8
9
from pandas._typing import (
10
FilePath,
11
ReadBuffer,
12
StorageOptions,
13
WriteBuffer,
14
)
15
from pandas.compat._optional import import_optional_dependency
16
from pandas.errors import AbstractMethodError
17
from pandas.util._decorators import doc
18
19
from pandas import (
20
DataFrame,
21
MultiIndex,
22
get_option,
23
)
24
from pandas.core.shared_docs import _shared_docs
25
from pandas.util.version import Version
26
27
from pandas.io.common import (
28
IOHandles,
29
get_handle,
30
is_fsspec_url,
31
is_url,
32
stringify_path,
33
)
34
35
36
def get_engine(engine: str) -> BaseImpl:
37
"""return our implementation"""
38
if engine == "auto":
39
engine = get_option("io.parquet.engine")
40
41
if engine == "auto":
42
# try engines in this order
43
engine_classes = [PyArrowImpl, FastParquetImpl]
44
45
error_msgs = ""
46
for engine_class in engine_classes:
47
try:
48
return engine_class()
49
except ImportError as err:
50
error_msgs += "\n - " + str(err)
51
52
raise ImportError(
53
"Unable to find a usable engine; "
54
"tried using: 'pyarrow', 'fastparquet'.\n"
55
"A suitable version of "
56
"pyarrow or fastparquet is required for parquet "
57
"support.\n"
58
"Trying to import the above resulted in these errors:"
59
f"{error_msgs}"
60
)
61
62
if engine == "pyarrow":
63
return PyArrowImpl()
64
elif engine == "fastparquet":
65
return FastParquetImpl()
66
67
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
68
69
70
def _get_path_or_handle(
71
path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
72
fs: Any,
73
storage_options: StorageOptions = None,
74
mode: str = "rb",
75
is_dir: bool = False,
76
) -> tuple[
77
FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any
78
]:
79
"""File handling for PyArrow."""
80
path_or_handle = stringify_path(path)
81
if is_fsspec_url(path_or_handle) and fs is None:
82
fsspec = import_optional_dependency("fsspec")
83
84
fs, path_or_handle = fsspec.core.url_to_fs(
85
path_or_handle, **(storage_options or {})
86
)
87
elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
88
# can't write to a remote url
89
# without making use of fsspec at the moment
90
raise ValueError("storage_options passed with buffer, or non-supported URL")
91
92
handles = None
93
if (
94
not fs
95
and not is_dir
96
and isinstance(path_or_handle, str)
97
and not os.path.isdir(path_or_handle)
98
):
99
# use get_handle only when we are very certain that it is not a directory
100
# fsspec resources can also point to directories
101
# this branch is used for example when reading from non-fsspec URLs
102
handles = get_handle(
103
path_or_handle, mode, is_text=False, storage_options=storage_options
104
)
105
fs = None
106
path_or_handle = handles.handle
107
return path_or_handle, handles, fs
108
109
110
class BaseImpl:
111
@staticmethod
112
def validate_dataframe(df: DataFrame) -> None:
113
114
if not isinstance(df, DataFrame):
115
raise ValueError("to_parquet only supports IO with DataFrames")
116
117
# must have value column names for all index levels (strings only)
118
if isinstance(df.columns, MultiIndex):
119
if not all(
120
x.inferred_type in {"string", "empty"} for x in df.columns.levels
121
):
122
raise ValueError(
123
"""
124
parquet must have string column names for all values in
125
each level of the MultiIndex
126
"""
127
)
128
else:
129
if df.columns.inferred_type not in {"string", "empty"}:
130
raise ValueError("parquet must have string column names")
131
132
# index level names must be strings
133
valid_names = all(
134
isinstance(name, str) for name in df.index.names if name is not None
135
)
136
if not valid_names:
137
raise ValueError("Index level names must be strings")
138
139
def write(self, df: DataFrame, path, compression, **kwargs):
140
raise AbstractMethodError(self)
141
142
def read(self, path, columns=None, **kwargs) -> DataFrame:
143
raise AbstractMethodError(self)
144
145
146
class PyArrowImpl(BaseImpl):
147
def __init__(self):
148
import_optional_dependency(
149
"pyarrow", extra="pyarrow is required for parquet support."
150
)
151
import pyarrow.parquet
152
153
# import utils to register the pyarrow extension types
154
import pandas.core.arrays._arrow_utils # noqa:F401
155
156
self.api = pyarrow
157
158
def write(
159
self,
160
df: DataFrame,
161
path: FilePath | WriteBuffer[bytes],
162
compression: str | None = "snappy",
163
index: bool | None = None,
164
storage_options: StorageOptions = None,
165
partition_cols: list[str] | None = None,
166
**kwargs,
167
) -> None:
168
self.validate_dataframe(df)
169
170
from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
171
if index is not None:
172
from_pandas_kwargs["preserve_index"] = index
173
174
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
175
176
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
177
path,
178
kwargs.pop("filesystem", None),
179
storage_options=storage_options,
180
mode="wb",
181
is_dir=partition_cols is not None,
182
)
183
try:
184
if partition_cols is not None:
185
# writes to multiple files under the given path
186
self.api.parquet.write_to_dataset(
187
table,
188
path_or_handle,
189
compression=compression,
190
partition_cols=partition_cols,
191
**kwargs,
192
)
193
else:
194
# write to single output file
195
self.api.parquet.write_table(
196
table, path_or_handle, compression=compression, **kwargs
197
)
198
finally:
199
if handles is not None:
200
handles.close()
201
202
def read(
203
self,
204
path,
205
columns=None,
206
use_nullable_dtypes=False,
207
storage_options: StorageOptions = None,
208
**kwargs,
209
) -> DataFrame:
210
kwargs["use_pandas_metadata"] = True
211
212
to_pandas_kwargs = {}
213
if use_nullable_dtypes:
214
import pandas as pd
215
216
mapping = {
217
self.api.int8(): pd.Int8Dtype(),
218
self.api.int16(): pd.Int16Dtype(),
219
self.api.int32(): pd.Int32Dtype(),
220
self.api.int64(): pd.Int64Dtype(),
221
self.api.uint8(): pd.UInt8Dtype(),
222
self.api.uint16(): pd.UInt16Dtype(),
223
self.api.uint32(): pd.UInt32Dtype(),
224
self.api.uint64(): pd.UInt64Dtype(),
225
self.api.bool_(): pd.BooleanDtype(),
226
self.api.string(): pd.StringDtype(),
227
}
228
to_pandas_kwargs["types_mapper"] = mapping.get
229
manager = get_option("mode.data_manager")
230
if manager == "array":
231
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
232
233
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
234
path,
235
kwargs.pop("filesystem", None),
236
storage_options=storage_options,
237
mode="rb",
238
)
239
try:
240
result = self.api.parquet.read_table(
241
path_or_handle, columns=columns, **kwargs
242
).to_pandas(**to_pandas_kwargs)
243
if manager == "array":
244
result = result._as_manager("array", copy=False)
245
return result
246
finally:
247
if handles is not None:
248
handles.close()
249
250
251
class FastParquetImpl(BaseImpl):
252
def __init__(self):
253
# since pandas is a dependency of fastparquet
254
# we need to import on first use
255
fastparquet = import_optional_dependency(
256
"fastparquet", extra="fastparquet is required for parquet support."
257
)
258
self.api = fastparquet
259
260
def write(
261
self,
262
df: DataFrame,
263
path,
264
compression="snappy",
265
index=None,
266
partition_cols=None,
267
storage_options: StorageOptions = None,
268
**kwargs,
269
) -> None:
270
self.validate_dataframe(df)
271
# thriftpy/protocol/compact.py:339:
272
# DeprecationWarning: tostring() is deprecated.
273
# Use tobytes() instead.
274
275
if "partition_on" in kwargs and partition_cols is not None:
276
raise ValueError(
277
"Cannot use both partition_on and "
278
"partition_cols. Use partition_cols for partitioning data"
279
)
280
elif "partition_on" in kwargs:
281
partition_cols = kwargs.pop("partition_on")
282
283
if partition_cols is not None:
284
kwargs["file_scheme"] = "hive"
285
286
# cannot use get_handle as write() does not accept file buffers
287
path = stringify_path(path)
288
if is_fsspec_url(path):
289
fsspec = import_optional_dependency("fsspec")
290
291
# if filesystem is provided by fsspec, file must be opened in 'wb' mode.
292
kwargs["open_with"] = lambda path, _: fsspec.open(
293
path, "wb", **(storage_options or {})
294
).open()
295
elif storage_options:
296
raise ValueError(
297
"storage_options passed with file object or non-fsspec file path"
298
)
299
300
with catch_warnings(record=True):
301
self.api.write(
302
path,
303
df,
304
compression=compression,
305
write_index=index,
306
partition_on=partition_cols,
307
**kwargs,
308
)
309
310
def read(
311
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
312
) -> DataFrame:
313
parquet_kwargs: dict[str, Any] = {}
314
use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
315
if Version(self.api.__version__) >= Version("0.7.1"):
316
# We are disabling nullable dtypes for fastparquet pending discussion
317
parquet_kwargs["pandas_nulls"] = False
318
if use_nullable_dtypes:
319
raise ValueError(
320
"The 'use_nullable_dtypes' argument is not supported for the "
321
"fastparquet engine"
322
)
323
path = stringify_path(path)
324
handles = None
325
if is_fsspec_url(path):
326
fsspec = import_optional_dependency("fsspec")
327
328
if Version(self.api.__version__) > Version("0.6.1"):
329
parquet_kwargs["fs"] = fsspec.open(
330
path, "rb", **(storage_options or {})
331
).fs
332
else:
333
parquet_kwargs["open_with"] = lambda path, _: fsspec.open(
334
path, "rb", **(storage_options or {})
335
).open()
336
elif isinstance(path, str) and not os.path.isdir(path):
337
# use get_handle only when we are very certain that it is not a directory
338
# fsspec resources can also point to directories
339
# this branch is used for example when reading from non-fsspec URLs
340
handles = get_handle(
341
path, "rb", is_text=False, storage_options=storage_options
342
)
343
path = handles.handle
344
345
parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
346
347
result = parquet_file.to_pandas(columns=columns, **kwargs)
348
349
if handles is not None:
350
handles.close()
351
return result
352
353
354
@doc(storage_options=_shared_docs["storage_options"])
355
def to_parquet(
356
df: DataFrame,
357
path: FilePath | WriteBuffer[bytes] | None = None,
358
engine: str = "auto",
359
compression: str | None = "snappy",
360
index: bool | None = None,
361
storage_options: StorageOptions = None,
362
partition_cols: list[str] | None = None,
363
**kwargs,
364
) -> bytes | None:
365
"""
366
Write a DataFrame to the parquet format.
367
368
Parameters
369
----------
370
df : DataFrame
371
path : str, path object, file-like object, or None, default None
372
String, path object (implementing ``os.PathLike[str]``), or file-like
373
object implementing a binary ``write()`` function. If None, the result is
374
returned as bytes. If a string, it will be used as Root Directory path
375
when writing a partitioned dataset. The engine fastparquet does not
376
accept file-like objects.
377
378
.. versionchanged:: 1.2.0
379
380
engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
381
Parquet library to use. If 'auto', then the option
382
``io.parquet.engine`` is used. The default ``io.parquet.engine``
383
behavior is to try 'pyarrow', falling back to 'fastparquet' if
384
'pyarrow' is unavailable.
385
compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
386
default 'snappy'. Name of the compression to use. Use ``None``
387
for no compression. The supported compression methods actually
388
depend on which engine is used. For 'pyarrow', 'snappy', 'gzip',
389
'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet',
390
only 'gzip' and 'snappy' are supported.
391
index : bool, default None
392
If ``True``, include the dataframe's index(es) in the file output. If
393
``False``, they will not be written to the file.
394
If ``None``, similar to ``True`` the dataframe's index(es)
395
will be saved. However, instead of being saved as values,
396
the RangeIndex will be stored as a range in the metadata so it
397
doesn't require much space and is faster. Other indexes will
398
be included as columns in the file output.
399
partition_cols : str or list, optional, default None
400
Column names by which to partition the dataset.
401
Columns are partitioned in the order they are given.
402
Must be None if path is not a string.
403
{storage_options}
404
405
.. versionadded:: 1.2.0
406
407
kwargs
408
Additional keyword arguments passed to the engine
409
410
Returns
411
-------
412
bytes if no path argument is provided else None
413
"""
414
if isinstance(partition_cols, str):
415
partition_cols = [partition_cols]
416
impl = get_engine(engine)
417
418
path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
419
420
impl.write(
421
df,
422
path_or_buf,
423
compression=compression,
424
index=index,
425
partition_cols=partition_cols,
426
storage_options=storage_options,
427
**kwargs,
428
)
429
430
if path is None:
431
assert isinstance(path_or_buf, io.BytesIO)
432
return path_or_buf.getvalue()
433
else:
434
return None
435
436
437
@doc(storage_options=_shared_docs["storage_options"])
438
def read_parquet(
439
path,
440
engine: str = "auto",
441
columns=None,
442
storage_options: StorageOptions = None,
443
use_nullable_dtypes: bool = False,
444
**kwargs,
445
) -> DataFrame:
446
"""
447
Load a parquet object from the file path, returning a DataFrame.
448
449
Parameters
450
----------
451
path : str, path object or file-like object
452
String, path object (implementing ``os.PathLike[str]``), or file-like
453
object implementing a binary ``read()`` function.
454
The string could be a URL. Valid URL schemes include http, ftp, s3,
455
gs, and file. For file URLs, a host is expected. A local file could be:
456
``file://localhost/path/to/table.parquet``.
457
A file URL can also be a path to a directory that contains multiple
458
partitioned parquet files. Both pyarrow and fastparquet support
459
paths to directories as well as file URLs. A directory path could be:
460
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
461
engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
462
Parquet library to use. If 'auto', then the option
463
``io.parquet.engine`` is used. The default ``io.parquet.engine``
464
behavior is to try 'pyarrow', falling back to 'fastparquet' if
465
'pyarrow' is unavailable.
466
columns : list, default=None
467
If not None, only these columns will be read from the file.
468
469
{storage_options}
470
471
.. versionadded:: 1.3.0
472
473
use_nullable_dtypes : bool, default False
474
If True, use dtypes that use ``pd.NA`` as missing value indicator
475
for the resulting DataFrame. (only applicable for the ``pyarrow``
476
engine)
477
As new dtypes are added that support ``pd.NA`` in the future, the
478
output with this option will change to use those dtypes.
479
Note: this is an experimental option, and behaviour (e.g. additional
480
support dtypes) may change without notice.
481
482
.. versionadded:: 1.2.0
483
484
**kwargs
485
Any additional kwargs are passed to the engine.
486
487
Returns
488
-------
489
DataFrame
490
"""
491
impl = get_engine(engine)
492
493
return impl.read(
494
path,
495
columns=columns,
496
storage_options=storage_options,
497
use_nullable_dtypes=use_nullable_dtypes,
498
**kwargs,
499
)
500
501