Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/pandas/io/parquet.py
7827 views
""" parquet compat """1from __future__ import annotations23import io4import os5from typing import Any6from warnings import catch_warnings78from pandas._typing import (9FilePath,10ReadBuffer,11StorageOptions,12WriteBuffer,13)14from pandas.compat._optional import import_optional_dependency15from pandas.errors import AbstractMethodError16from pandas.util._decorators import doc1718from pandas import (19DataFrame,20MultiIndex,21get_option,22)23from pandas.core.shared_docs import _shared_docs24from pandas.util.version import Version2526from pandas.io.common import (27IOHandles,28get_handle,29is_fsspec_url,30is_url,31stringify_path,32)333435def get_engine(engine: str) -> BaseImpl:36"""return our implementation"""37if engine == "auto":38engine = get_option("io.parquet.engine")3940if engine == "auto":41# try engines in this order42engine_classes = [PyArrowImpl, FastParquetImpl]4344error_msgs = ""45for engine_class in engine_classes:46try:47return engine_class()48except ImportError as err:49error_msgs += "\n - " + str(err)5051raise ImportError(52"Unable to find a usable engine; "53"tried using: 'pyarrow', 'fastparquet'.\n"54"A suitable version of "55"pyarrow or fastparquet is required for parquet "56"support.\n"57"Trying to import the above resulted in these errors:"58f"{error_msgs}"59)6061if engine == "pyarrow":62return PyArrowImpl()63elif engine == "fastparquet":64return FastParquetImpl()6566raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")676869def _get_path_or_handle(70path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],71fs: Any,72storage_options: StorageOptions = None,73mode: str = "rb",74is_dir: bool = False,75) -> tuple[76FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any77]:78"""File handling for PyArrow."""79path_or_handle = stringify_path(path)80if is_fsspec_url(path_or_handle) and fs is None:81fsspec = import_optional_dependency("fsspec")8283fs, path_or_handle = fsspec.core.url_to_fs(84path_or_handle, **(storage_options or {})85)86elif storage_options and (not is_url(path_or_handle) or mode != "rb"):87# can't write to a remote url88# without making use of fsspec at the moment89raise ValueError("storage_options passed with buffer, or non-supported URL")9091handles = None92if (93not fs94and not is_dir95and isinstance(path_or_handle, str)96and not os.path.isdir(path_or_handle)97):98# use get_handle only when we are very certain that it is not a directory99# fsspec resources can also point to directories100# this branch is used for example when reading from non-fsspec URLs101handles = get_handle(102path_or_handle, mode, is_text=False, storage_options=storage_options103)104fs = None105path_or_handle = handles.handle106return path_or_handle, handles, fs107108109class BaseImpl:110@staticmethod111def validate_dataframe(df: DataFrame) -> None:112113if not isinstance(df, DataFrame):114raise ValueError("to_parquet only supports IO with DataFrames")115116# must have value column names for all index levels (strings only)117if isinstance(df.columns, MultiIndex):118if not all(119x.inferred_type in {"string", "empty"} for x in df.columns.levels120):121raise ValueError(122"""123parquet must have string column names for all values in124each level of the MultiIndex125"""126)127else:128if df.columns.inferred_type not in {"string", "empty"}:129raise ValueError("parquet must have string column names")130131# index level names must be strings132valid_names = all(133isinstance(name, str) for name in df.index.names if name is not None134)135if not valid_names:136raise ValueError("Index level names must be strings")137138def write(self, df: DataFrame, path, compression, **kwargs):139raise AbstractMethodError(self)140141def read(self, path, columns=None, **kwargs) -> DataFrame:142raise AbstractMethodError(self)143144145class PyArrowImpl(BaseImpl):146def __init__(self):147import_optional_dependency(148"pyarrow", extra="pyarrow is required for parquet support."149)150import pyarrow.parquet151152# import utils to register the pyarrow extension types153import pandas.core.arrays._arrow_utils # noqa:F401154155self.api = pyarrow156157def write(158self,159df: DataFrame,160path: FilePath | WriteBuffer[bytes],161compression: str | None = "snappy",162index: bool | None = None,163storage_options: StorageOptions = None,164partition_cols: list[str] | None = None,165**kwargs,166) -> None:167self.validate_dataframe(df)168169from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}170if index is not None:171from_pandas_kwargs["preserve_index"] = index172173table = self.api.Table.from_pandas(df, **from_pandas_kwargs)174175path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(176path,177kwargs.pop("filesystem", None),178storage_options=storage_options,179mode="wb",180is_dir=partition_cols is not None,181)182try:183if partition_cols is not None:184# writes to multiple files under the given path185self.api.parquet.write_to_dataset(186table,187path_or_handle,188compression=compression,189partition_cols=partition_cols,190**kwargs,191)192else:193# write to single output file194self.api.parquet.write_table(195table, path_or_handle, compression=compression, **kwargs196)197finally:198if handles is not None:199handles.close()200201def read(202self,203path,204columns=None,205use_nullable_dtypes=False,206storage_options: StorageOptions = None,207**kwargs,208) -> DataFrame:209kwargs["use_pandas_metadata"] = True210211to_pandas_kwargs = {}212if use_nullable_dtypes:213import pandas as pd214215mapping = {216self.api.int8(): pd.Int8Dtype(),217self.api.int16(): pd.Int16Dtype(),218self.api.int32(): pd.Int32Dtype(),219self.api.int64(): pd.Int64Dtype(),220self.api.uint8(): pd.UInt8Dtype(),221self.api.uint16(): pd.UInt16Dtype(),222self.api.uint32(): pd.UInt32Dtype(),223self.api.uint64(): pd.UInt64Dtype(),224self.api.bool_(): pd.BooleanDtype(),225self.api.string(): pd.StringDtype(),226}227to_pandas_kwargs["types_mapper"] = mapping.get228manager = get_option("mode.data_manager")229if manager == "array":230to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]231232path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(233path,234kwargs.pop("filesystem", None),235storage_options=storage_options,236mode="rb",237)238try:239result = self.api.parquet.read_table(240path_or_handle, columns=columns, **kwargs241).to_pandas(**to_pandas_kwargs)242if manager == "array":243result = result._as_manager("array", copy=False)244return result245finally:246if handles is not None:247handles.close()248249250class FastParquetImpl(BaseImpl):251def __init__(self):252# since pandas is a dependency of fastparquet253# we need to import on first use254fastparquet = import_optional_dependency(255"fastparquet", extra="fastparquet is required for parquet support."256)257self.api = fastparquet258259def write(260self,261df: DataFrame,262path,263compression="snappy",264index=None,265partition_cols=None,266storage_options: StorageOptions = None,267**kwargs,268) -> None:269self.validate_dataframe(df)270# thriftpy/protocol/compact.py:339:271# DeprecationWarning: tostring() is deprecated.272# Use tobytes() instead.273274if "partition_on" in kwargs and partition_cols is not None:275raise ValueError(276"Cannot use both partition_on and "277"partition_cols. Use partition_cols for partitioning data"278)279elif "partition_on" in kwargs:280partition_cols = kwargs.pop("partition_on")281282if partition_cols is not None:283kwargs["file_scheme"] = "hive"284285# cannot use get_handle as write() does not accept file buffers286path = stringify_path(path)287if is_fsspec_url(path):288fsspec = import_optional_dependency("fsspec")289290# if filesystem is provided by fsspec, file must be opened in 'wb' mode.291kwargs["open_with"] = lambda path, _: fsspec.open(292path, "wb", **(storage_options or {})293).open()294elif storage_options:295raise ValueError(296"storage_options passed with file object or non-fsspec file path"297)298299with catch_warnings(record=True):300self.api.write(301path,302df,303compression=compression,304write_index=index,305partition_on=partition_cols,306**kwargs,307)308309def read(310self, path, columns=None, storage_options: StorageOptions = None, **kwargs311) -> DataFrame:312parquet_kwargs: dict[str, Any] = {}313use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)314if Version(self.api.__version__) >= Version("0.7.1"):315# We are disabling nullable dtypes for fastparquet pending discussion316parquet_kwargs["pandas_nulls"] = False317if use_nullable_dtypes:318raise ValueError(319"The 'use_nullable_dtypes' argument is not supported for the "320"fastparquet engine"321)322path = stringify_path(path)323handles = None324if is_fsspec_url(path):325fsspec = import_optional_dependency("fsspec")326327if Version(self.api.__version__) > Version("0.6.1"):328parquet_kwargs["fs"] = fsspec.open(329path, "rb", **(storage_options or {})330).fs331else:332parquet_kwargs["open_with"] = lambda path, _: fsspec.open(333path, "rb", **(storage_options or {})334).open()335elif isinstance(path, str) and not os.path.isdir(path):336# use get_handle only when we are very certain that it is not a directory337# fsspec resources can also point to directories338# this branch is used for example when reading from non-fsspec URLs339handles = get_handle(340path, "rb", is_text=False, storage_options=storage_options341)342path = handles.handle343344parquet_file = self.api.ParquetFile(path, **parquet_kwargs)345346result = parquet_file.to_pandas(columns=columns, **kwargs)347348if handles is not None:349handles.close()350return result351352353@doc(storage_options=_shared_docs["storage_options"])354def to_parquet(355df: DataFrame,356path: FilePath | WriteBuffer[bytes] | None = None,357engine: str = "auto",358compression: str | None = "snappy",359index: bool | None = None,360storage_options: StorageOptions = None,361partition_cols: list[str] | None = None,362**kwargs,363) -> bytes | None:364"""365Write a DataFrame to the parquet format.366367Parameters368----------369df : DataFrame370path : str, path object, file-like object, or None, default None371String, path object (implementing ``os.PathLike[str]``), or file-like372object implementing a binary ``write()`` function. If None, the result is373returned as bytes. If a string, it will be used as Root Directory path374when writing a partitioned dataset. The engine fastparquet does not375accept file-like objects.376377.. versionchanged:: 1.2.0378379engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'380Parquet library to use. If 'auto', then the option381``io.parquet.engine`` is used. The default ``io.parquet.engine``382behavior is to try 'pyarrow', falling back to 'fastparquet' if383'pyarrow' is unavailable.384compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},385default 'snappy'. Name of the compression to use. Use ``None``386for no compression. The supported compression methods actually387depend on which engine is used. For 'pyarrow', 'snappy', 'gzip',388'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet',389only 'gzip' and 'snappy' are supported.390index : bool, default None391If ``True``, include the dataframe's index(es) in the file output. If392``False``, they will not be written to the file.393If ``None``, similar to ``True`` the dataframe's index(es)394will be saved. However, instead of being saved as values,395the RangeIndex will be stored as a range in the metadata so it396doesn't require much space and is faster. Other indexes will397be included as columns in the file output.398partition_cols : str or list, optional, default None399Column names by which to partition the dataset.400Columns are partitioned in the order they are given.401Must be None if path is not a string.402{storage_options}403404.. versionadded:: 1.2.0405406kwargs407Additional keyword arguments passed to the engine408409Returns410-------411bytes if no path argument is provided else None412"""413if isinstance(partition_cols, str):414partition_cols = [partition_cols]415impl = get_engine(engine)416417path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path418419impl.write(420df,421path_or_buf,422compression=compression,423index=index,424partition_cols=partition_cols,425storage_options=storage_options,426**kwargs,427)428429if path is None:430assert isinstance(path_or_buf, io.BytesIO)431return path_or_buf.getvalue()432else:433return None434435436@doc(storage_options=_shared_docs["storage_options"])437def read_parquet(438path,439engine: str = "auto",440columns=None,441storage_options: StorageOptions = None,442use_nullable_dtypes: bool = False,443**kwargs,444) -> DataFrame:445"""446Load a parquet object from the file path, returning a DataFrame.447448Parameters449----------450path : str, path object or file-like object451String, path object (implementing ``os.PathLike[str]``), or file-like452object implementing a binary ``read()`` function.453The string could be a URL. Valid URL schemes include http, ftp, s3,454gs, and file. For file URLs, a host is expected. A local file could be:455``file://localhost/path/to/table.parquet``.456A file URL can also be a path to a directory that contains multiple457partitioned parquet files. Both pyarrow and fastparquet support458paths to directories as well as file URLs. A directory path could be:459``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.460engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'461Parquet library to use. If 'auto', then the option462``io.parquet.engine`` is used. The default ``io.parquet.engine``463behavior is to try 'pyarrow', falling back to 'fastparquet' if464'pyarrow' is unavailable.465columns : list, default=None466If not None, only these columns will be read from the file.467468{storage_options}469470.. versionadded:: 1.3.0471472use_nullable_dtypes : bool, default False473If True, use dtypes that use ``pd.NA`` as missing value indicator474for the resulting DataFrame. (only applicable for the ``pyarrow``475engine)476As new dtypes are added that support ``pd.NA`` in the future, the477output with this option will change to use those dtypes.478Note: this is an experimental option, and behaviour (e.g. additional479support dtypes) may change without notice.480481.. versionadded:: 1.2.0482483**kwargs484Any additional kwargs are passed to the engine.485486Returns487-------488DataFrame489"""490impl = get_engine(engine)491492return impl.read(493path,494columns=columns,495storage_options=storage_options,496use_nullable_dtypes=use_nullable_dtypes,497**kwargs,498)499500501