Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/pandas/io/common.py
7813 views
1
"""Common IO api utilities"""
2
from __future__ import annotations
3
4
import bz2
5
import codecs
6
from collections import abc
7
import dataclasses
8
import functools
9
import gzip
10
from io import (
11
BufferedIOBase,
12
BytesIO,
13
RawIOBase,
14
StringIO,
15
TextIOBase,
16
TextIOWrapper,
17
)
18
import mmap
19
import os
20
from pathlib import Path
21
import re
22
from typing import (
23
IO,
24
Any,
25
AnyStr,
26
Generic,
27
Literal,
28
Mapping,
29
TypeVar,
30
cast,
31
overload,
32
)
33
from urllib.parse import (
34
urljoin,
35
urlparse as parse_url,
36
uses_netloc,
37
uses_params,
38
uses_relative,
39
)
40
import warnings
41
import zipfile
42
43
from pandas._typing import (
44
BaseBuffer,
45
CompressionDict,
46
CompressionOptions,
47
FilePath,
48
ReadBuffer,
49
StorageOptions,
50
WriteBuffer,
51
)
52
from pandas.compat import get_lzma_file
53
from pandas.compat._optional import import_optional_dependency
54
from pandas.util._decorators import doc
55
from pandas.util._exceptions import find_stack_level
56
57
from pandas.core.dtypes.common import is_file_like
58
59
from pandas.core.shared_docs import _shared_docs
60
61
_VALID_URLS = set(uses_relative + uses_netloc + uses_params)
62
_VALID_URLS.discard("")
63
_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://")
64
65
BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer)
66
67
68
@dataclasses.dataclass
69
class IOArgs:
70
"""
71
Return value of io/common.py:_get_filepath_or_buffer.
72
"""
73
74
filepath_or_buffer: str | BaseBuffer
75
encoding: str
76
mode: str
77
compression: CompressionDict
78
should_close: bool = False
79
80
81
@dataclasses.dataclass
82
class IOHandles(Generic[AnyStr]):
83
"""
84
Return value of io/common.py:get_handle
85
86
Can be used as a context manager.
87
88
This is used to easily close created buffers and to handle corner cases when
89
TextIOWrapper is inserted.
90
91
handle: The file handle to be used.
92
created_handles: All file handles that are created by get_handle
93
is_wrapped: Whether a TextIOWrapper needs to be detached.
94
"""
95
96
# handle might not implement the IO-interface
97
handle: IO[AnyStr]
98
compression: CompressionDict
99
created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list)
100
is_wrapped: bool = False
101
is_mmap: bool = False
102
103
def close(self) -> None:
104
"""
105
Close all created buffers.
106
107
Note: If a TextIOWrapper was inserted, it is flushed and detached to
108
avoid closing the potentially user-created buffer.
109
"""
110
if self.is_wrapped:
111
assert isinstance(self.handle, TextIOWrapper)
112
self.handle.flush()
113
self.handle.detach()
114
self.created_handles.remove(self.handle)
115
try:
116
for handle in self.created_handles:
117
handle.close()
118
except (OSError, ValueError):
119
pass
120
self.created_handles = []
121
self.is_wrapped = False
122
123
def __enter__(self) -> IOHandles[AnyStr]:
124
return self
125
126
def __exit__(self, *args: Any) -> None:
127
self.close()
128
129
130
def is_url(url: object) -> bool:
131
"""
132
Check to see if a URL has a valid protocol.
133
134
Parameters
135
----------
136
url : str or unicode
137
138
Returns
139
-------
140
isurl : bool
141
If `url` has a valid protocol return True otherwise False.
142
"""
143
if not isinstance(url, str):
144
return False
145
return parse_url(url).scheme in _VALID_URLS
146
147
148
@overload
149
def _expand_user(filepath_or_buffer: str) -> str:
150
...
151
152
153
@overload
154
def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT:
155
...
156
157
158
def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT:
159
"""
160
Return the argument with an initial component of ~ or ~user
161
replaced by that user's home directory.
162
163
Parameters
164
----------
165
filepath_or_buffer : object to be converted if possible
166
167
Returns
168
-------
169
expanded_filepath_or_buffer : an expanded filepath or the
170
input if not expandable
171
"""
172
if isinstance(filepath_or_buffer, str):
173
return os.path.expanduser(filepath_or_buffer)
174
return filepath_or_buffer
175
176
177
def validate_header_arg(header: object) -> None:
178
if isinstance(header, bool):
179
raise TypeError(
180
"Passing a bool to header is invalid. Use header=None for no header or "
181
"header=int or list-like of ints to specify "
182
"the row(s) making up the column names"
183
)
184
185
186
@overload
187
def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str:
188
...
189
190
191
@overload
192
def stringify_path(
193
filepath_or_buffer: BaseBufferT, convert_file_like: bool = ...
194
) -> BaseBufferT:
195
...
196
197
198
def stringify_path(
199
filepath_or_buffer: FilePath | BaseBufferT,
200
convert_file_like: bool = False,
201
) -> str | BaseBufferT:
202
"""
203
Attempt to convert a path-like object to a string.
204
205
Parameters
206
----------
207
filepath_or_buffer : object to be converted
208
209
Returns
210
-------
211
str_filepath_or_buffer : maybe a string version of the object
212
213
Notes
214
-----
215
Objects supporting the fspath protocol (python 3.6+) are coerced
216
according to its __fspath__ method.
217
218
Any other object is passed through unchanged, which includes bytes,
219
strings, buffers, or anything else that's not even path-like.
220
"""
221
if not convert_file_like and is_file_like(filepath_or_buffer):
222
# GH 38125: some fsspec objects implement os.PathLike but have already opened a
223
# file. This prevents opening the file a second time. infer_compression calls
224
# this function with convert_file_like=True to infer the compression.
225
return cast(BaseBufferT, filepath_or_buffer)
226
227
if isinstance(filepath_or_buffer, os.PathLike):
228
filepath_or_buffer = filepath_or_buffer.__fspath__()
229
return _expand_user(filepath_or_buffer)
230
231
232
def urlopen(*args, **kwargs):
233
"""
234
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
235
the stdlib.
236
"""
237
import urllib.request
238
239
return urllib.request.urlopen(*args, **kwargs)
240
241
242
def is_fsspec_url(url: FilePath | BaseBuffer) -> bool:
243
"""
244
Returns true if the given URL looks like
245
something fsspec can handle
246
"""
247
return (
248
isinstance(url, str)
249
and bool(_RFC_3986_PATTERN.match(url))
250
and not url.startswith(("http://", "https://"))
251
)
252
253
254
@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer")
255
def _get_filepath_or_buffer(
256
filepath_or_buffer: FilePath | BaseBuffer,
257
encoding: str = "utf-8",
258
compression: CompressionOptions = None,
259
mode: str = "r",
260
storage_options: StorageOptions = None,
261
) -> IOArgs:
262
"""
263
If the filepath_or_buffer is a url, translate and return the buffer.
264
Otherwise passthrough.
265
266
Parameters
267
----------
268
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
269
or buffer
270
{compression_options}
271
272
.. versionchanged:: 1.4.0 Zstandard support.
273
274
encoding : the encoding to use to decode bytes, default is 'utf-8'
275
mode : str, optional
276
277
storage_options : dict, optional
278
Extra options that make sense for a particular storage connection, e.g.
279
host, port, username, password, etc., if using a URL that will
280
be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error
281
will be raised if providing this argument with a local path or
282
a file-like buffer. See the fsspec and backend storage implementation
283
docs for the set of allowed keys and values
284
285
.. versionadded:: 1.2.0
286
287
..versionchange:: 1.2.0
288
289
Returns the dataclass IOArgs.
290
"""
291
filepath_or_buffer = stringify_path(filepath_or_buffer)
292
293
# handle compression dict
294
compression_method, compression = get_compression_method(compression)
295
compression_method = infer_compression(filepath_or_buffer, compression_method)
296
297
# GH21227 internal compression is not used for non-binary handles.
298
if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode:
299
warnings.warn(
300
"compression has no effect when passing a non-binary object as input.",
301
RuntimeWarning,
302
stacklevel=find_stack_level(),
303
)
304
compression_method = None
305
306
compression = dict(compression, method=compression_method)
307
308
# bz2 and xz do not write the byte order mark for utf-16 and utf-32
309
# print a warning when writing such files
310
if (
311
"w" in mode
312
and compression_method in ["bz2", "xz"]
313
and encoding in ["utf-16", "utf-32"]
314
):
315
warnings.warn(
316
f"{compression} will not write the byte order mark for {encoding}",
317
UnicodeWarning,
318
)
319
320
# Use binary mode when converting path-like objects to file-like objects (fsspec)
321
# except when text mode is explicitly requested. The original mode is returned if
322
# fsspec is not used.
323
fsspec_mode = mode
324
if "t" not in fsspec_mode and "b" not in fsspec_mode:
325
fsspec_mode += "b"
326
327
if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
328
# TODO: fsspec can also handle HTTP via requests, but leaving this
329
# unchanged. using fsspec appears to break the ability to infer if the
330
# server responded with gzipped data
331
storage_options = storage_options or {}
332
333
# waiting until now for importing to match intended lazy logic of
334
# urlopen function defined elsewhere in this module
335
import urllib.request
336
337
# assuming storage_options is to be interpreted as headers
338
req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options)
339
with urlopen(req_info) as req:
340
content_encoding = req.headers.get("Content-Encoding", None)
341
if content_encoding == "gzip":
342
# Override compression based on Content-Encoding header
343
compression = {"method": "gzip"}
344
reader = BytesIO(req.read())
345
return IOArgs(
346
filepath_or_buffer=reader,
347
encoding=encoding,
348
compression=compression,
349
should_close=True,
350
mode=fsspec_mode,
351
)
352
353
if is_fsspec_url(filepath_or_buffer):
354
assert isinstance(
355
filepath_or_buffer, str
356
) # just to appease mypy for this branch
357
# two special-case s3-like protocols; these have special meaning in Hadoop,
358
# but are equivalent to just "s3" from fsspec's point of view
359
# cc #11071
360
if filepath_or_buffer.startswith("s3a://"):
361
filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://")
362
if filepath_or_buffer.startswith("s3n://"):
363
filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://")
364
fsspec = import_optional_dependency("fsspec")
365
366
# If botocore is installed we fallback to reading with anon=True
367
# to allow reads from public buckets
368
err_types_to_retry_with_anon: list[Any] = []
369
try:
370
import_optional_dependency("botocore")
371
from botocore.exceptions import (
372
ClientError,
373
NoCredentialsError,
374
)
375
376
err_types_to_retry_with_anon = [
377
ClientError,
378
NoCredentialsError,
379
PermissionError,
380
]
381
except ImportError:
382
pass
383
384
try:
385
file_obj = fsspec.open(
386
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
387
).open()
388
# GH 34626 Reads from Public Buckets without Credentials needs anon=True
389
except tuple(err_types_to_retry_with_anon):
390
if storage_options is None:
391
storage_options = {"anon": True}
392
else:
393
# don't mutate user input.
394
storage_options = dict(storage_options)
395
storage_options["anon"] = True
396
file_obj = fsspec.open(
397
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
398
).open()
399
400
return IOArgs(
401
filepath_or_buffer=file_obj,
402
encoding=encoding,
403
compression=compression,
404
should_close=True,
405
mode=fsspec_mode,
406
)
407
elif storage_options:
408
raise ValueError(
409
"storage_options passed with file object or non-fsspec file path"
410
)
411
412
if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
413
return IOArgs(
414
filepath_or_buffer=_expand_user(filepath_or_buffer),
415
encoding=encoding,
416
compression=compression,
417
should_close=False,
418
mode=mode,
419
)
420
421
# is_file_like requires (read | write) & __iter__ but __iter__ is only
422
# needed for read_csv(engine=python)
423
if not (
424
hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write")
425
):
426
msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
427
raise ValueError(msg)
428
429
return IOArgs(
430
filepath_or_buffer=filepath_or_buffer,
431
encoding=encoding,
432
compression=compression,
433
should_close=False,
434
mode=mode,
435
)
436
437
438
def file_path_to_url(path: str) -> str:
439
"""
440
converts an absolute native path to a FILE URL.
441
442
Parameters
443
----------
444
path : a path in native format
445
446
Returns
447
-------
448
a valid FILE URL
449
"""
450
# lazify expensive import (~30ms)
451
from urllib.request import pathname2url
452
453
return urljoin("file:", pathname2url(path))
454
455
456
_compression_to_extension = {
457
"gzip": ".gz",
458
"bz2": ".bz2",
459
"zip": ".zip",
460
"xz": ".xz",
461
"zstd": ".zst",
462
}
463
464
465
def get_compression_method(
466
compression: CompressionOptions,
467
) -> tuple[str | None, CompressionDict]:
468
"""
469
Simplifies a compression argument to a compression method string and
470
a mapping containing additional arguments.
471
472
Parameters
473
----------
474
compression : str or mapping
475
If string, specifies the compression method. If mapping, value at key
476
'method' specifies compression method.
477
478
Returns
479
-------
480
tuple of ({compression method}, Optional[str]
481
{compression arguments}, Dict[str, Any])
482
483
Raises
484
------
485
ValueError on mapping missing 'method' key
486
"""
487
compression_method: str | None
488
if isinstance(compression, Mapping):
489
compression_args = dict(compression)
490
try:
491
compression_method = compression_args.pop("method")
492
except KeyError as err:
493
raise ValueError("If mapping, compression must have key 'method'") from err
494
else:
495
compression_args = {}
496
compression_method = compression
497
return compression_method, compression_args
498
499
500
@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer")
501
def infer_compression(
502
filepath_or_buffer: FilePath | BaseBuffer, compression: str | None
503
) -> str | None:
504
"""
505
Get the compression method for filepath_or_buffer. If compression='infer',
506
the inferred compression method is returned. Otherwise, the input
507
compression method is returned unchanged, unless it's invalid, in which
508
case an error is raised.
509
510
Parameters
511
----------
512
filepath_or_buffer : str or file handle
513
File path or object.
514
{compression_options}
515
516
.. versionchanged:: 1.4.0 Zstandard support.
517
518
Returns
519
-------
520
string or None
521
522
Raises
523
------
524
ValueError on invalid compression specified.
525
"""
526
if compression is None:
527
return None
528
529
# Infer compression
530
if compression == "infer":
531
# Convert all path types (e.g. pathlib.Path) to strings
532
filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True)
533
if not isinstance(filepath_or_buffer, str):
534
# Cannot infer compression of a buffer, assume no compression
535
return None
536
537
# Infer compression from the filename/URL extension
538
for compression, extension in _compression_to_extension.items():
539
if filepath_or_buffer.lower().endswith(extension):
540
return compression
541
return None
542
543
# Compression has been specified. Check that it's valid
544
if compression in _compression_to_extension:
545
return compression
546
547
# https://github.com/python/mypy/issues/5492
548
# Unsupported operand types for + ("List[Optional[str]]" and "List[str]")
549
valid = ["infer", None] + sorted(
550
_compression_to_extension
551
) # type: ignore[operator]
552
msg = (
553
f"Unrecognized compression type: {compression}\n"
554
f"Valid compression types are {valid}"
555
)
556
raise ValueError(msg)
557
558
559
def check_parent_directory(path: Path | str) -> None:
560
"""
561
Check if parent directory of a file exists, raise OSError if it does not
562
563
Parameters
564
----------
565
path: Path or str
566
Path to check parent directory of
567
568
"""
569
parent = Path(path).parent
570
if not parent.is_dir():
571
raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'")
572
573
574
@overload
575
def get_handle(
576
path_or_buf: FilePath | BaseBuffer,
577
mode: str,
578
*,
579
encoding: str | None = ...,
580
compression: CompressionOptions = ...,
581
memory_map: bool = ...,
582
is_text: Literal[False],
583
errors: str | None = ...,
584
storage_options: StorageOptions = ...,
585
) -> IOHandles[bytes]:
586
...
587
588
589
@overload
590
def get_handle(
591
path_or_buf: FilePath | BaseBuffer,
592
mode: str,
593
*,
594
encoding: str | None = ...,
595
compression: CompressionOptions = ...,
596
memory_map: bool = ...,
597
is_text: Literal[True] = ...,
598
errors: str | None = ...,
599
storage_options: StorageOptions = ...,
600
) -> IOHandles[str]:
601
...
602
603
604
@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf")
605
def get_handle(
606
path_or_buf: FilePath | BaseBuffer,
607
mode: str,
608
*,
609
encoding: str | None = None,
610
compression: CompressionOptions = None,
611
memory_map: bool = False,
612
is_text: bool = True,
613
errors: str | None = None,
614
storage_options: StorageOptions = None,
615
) -> IOHandles[str] | IOHandles[bytes]:
616
"""
617
Get file handle for given path/buffer and mode.
618
619
Parameters
620
----------
621
path_or_buf : str or file handle
622
File path or object.
623
mode : str
624
Mode to open path_or_buf with.
625
encoding : str or None
626
Encoding to use.
627
{compression_options}
628
629
.. versionchanged:: 1.0.0
630
May now be a dict with key 'method' as compression mode
631
and other keys as compression options if compression
632
mode is 'zip'.
633
634
.. versionchanged:: 1.1.0
635
Passing compression options as keys in dict is now
636
supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'.
637
638
.. versionchanged:: 1.4.0 Zstandard support.
639
640
memory_map : bool, default False
641
See parsers._parser_params for more information.
642
is_text : bool, default True
643
Whether the type of the content passed to the file/buffer is string or
644
bytes. This is not the same as `"b" not in mode`. If a string content is
645
passed to a binary file/buffer, a wrapper is inserted.
646
errors : str, default 'strict'
647
Specifies how encoding and decoding errors are to be handled.
648
See the errors argument for :func:`open` for a full list
649
of options.
650
storage_options: StorageOptions = None
651
Passed to _get_filepath_or_buffer
652
653
.. versionchanged:: 1.2.0
654
655
Returns the dataclass IOHandles
656
"""
657
# Windows does not default to utf-8. Set to utf-8 for a consistent behavior
658
encoding = encoding or "utf-8"
659
660
# read_csv does not know whether the buffer is opened in binary/text mode
661
if _is_binary_mode(path_or_buf, mode) and "b" not in mode:
662
mode += "b"
663
664
# validate encoding and errors
665
codecs.lookup(encoding)
666
if isinstance(errors, str):
667
codecs.lookup_error(errors)
668
669
# open URLs
670
ioargs = _get_filepath_or_buffer(
671
path_or_buf,
672
encoding=encoding,
673
compression=compression,
674
mode=mode,
675
storage_options=storage_options,
676
)
677
678
handle = ioargs.filepath_or_buffer
679
handles: list[BaseBuffer]
680
681
# memory mapping needs to be the first step
682
handle, memory_map, handles = _maybe_memory_map(
683
handle,
684
memory_map,
685
ioargs.encoding,
686
ioargs.mode,
687
errors,
688
ioargs.compression["method"] not in _compression_to_extension,
689
)
690
691
is_path = isinstance(handle, str)
692
compression_args = dict(ioargs.compression)
693
compression = compression_args.pop("method")
694
695
# Only for write methods
696
if "r" not in mode and is_path:
697
check_parent_directory(str(handle))
698
699
if compression:
700
if compression != "zstd":
701
# compression libraries do not like an explicit text-mode
702
ioargs.mode = ioargs.mode.replace("t", "")
703
elif compression == "zstd" and "b" not in ioargs.mode:
704
# python-zstandard defaults to text mode, but we always expect
705
# compression libraries to use binary mode.
706
ioargs.mode += "b"
707
708
# GZ Compression
709
if compression == "gzip":
710
if is_path:
711
assert isinstance(handle, str)
712
# error: Incompatible types in assignment (expression has type
713
# "GzipFile", variable has type "Union[str, BaseBuffer]")
714
handle = gzip.GzipFile( # type: ignore[assignment]
715
filename=handle,
716
mode=ioargs.mode,
717
**compression_args,
718
)
719
else:
720
handle = gzip.GzipFile(
721
# No overload variant of "GzipFile" matches argument types
722
# "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
723
fileobj=handle, # type: ignore[call-overload]
724
mode=ioargs.mode,
725
**compression_args,
726
)
727
728
# BZ Compression
729
elif compression == "bz2":
730
# No overload variant of "BZ2File" matches argument types
731
# "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
732
handle = bz2.BZ2File( # type: ignore[call-overload]
733
handle,
734
mode=ioargs.mode,
735
**compression_args,
736
)
737
738
# ZIP Compression
739
elif compression == "zip":
740
# error: Argument 1 to "_BytesZipFile" has incompatible type "Union[str,
741
# BaseBuffer]"; expected "Union[Union[str, PathLike[str]],
742
# ReadBuffer[bytes], WriteBuffer[bytes]]"
743
handle = _BytesZipFile(
744
handle, ioargs.mode, **compression_args # type: ignore[arg-type]
745
)
746
if handle.mode == "r":
747
handles.append(handle)
748
zip_names = handle.namelist()
749
if len(zip_names) == 1:
750
handle = handle.open(zip_names.pop())
751
elif len(zip_names) == 0:
752
raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
753
else:
754
raise ValueError(
755
"Multiple files found in ZIP file. "
756
f"Only one file per ZIP: {zip_names}"
757
)
758
759
# XZ Compression
760
elif compression == "xz":
761
handle = get_lzma_file()(handle, ioargs.mode)
762
763
# Zstd Compression
764
elif compression == "zstd":
765
zstd = import_optional_dependency("zstandard")
766
if "r" in ioargs.mode:
767
open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)}
768
else:
769
open_args = {"cctx": zstd.ZstdCompressor(**compression_args)}
770
handle = zstd.open(
771
handle,
772
mode=ioargs.mode,
773
**open_args,
774
)
775
776
# Unrecognized Compression
777
else:
778
msg = f"Unrecognized compression type: {compression}"
779
raise ValueError(msg)
780
781
assert not isinstance(handle, str)
782
handles.append(handle)
783
784
elif isinstance(handle, str):
785
# Check whether the filename is to be opened in binary mode.
786
# Binary mode does not support 'encoding' and 'newline'.
787
if ioargs.encoding and "b" not in ioargs.mode:
788
# Encoding
789
handle = open(
790
handle,
791
ioargs.mode,
792
encoding=ioargs.encoding,
793
errors=errors,
794
newline="",
795
)
796
else:
797
# Binary mode
798
handle = open(handle, ioargs.mode)
799
handles.append(handle)
800
801
# Convert BytesIO or file objects passed with an encoding
802
is_wrapped = False
803
if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase):
804
# not added to handles as it does not open/buffer resources
805
handle = _BytesIOWrapper(
806
handle,
807
encoding=ioargs.encoding,
808
)
809
elif is_text and (compression or _is_binary_mode(handle, ioargs.mode)):
810
handle = TextIOWrapper(
811
# error: Argument 1 to "TextIOWrapper" has incompatible type
812
# "Union[IO[bytes], IO[Any], RawIOBase, BufferedIOBase, TextIOBase, mmap]";
813
# expected "IO[bytes]"
814
_IOWrapper(handle), # type: ignore[arg-type]
815
encoding=ioargs.encoding,
816
errors=errors,
817
newline="",
818
)
819
handles.append(handle)
820
# only marked as wrapped when the caller provided a handle
821
is_wrapped = not (
822
isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close
823
)
824
825
if "r" in ioargs.mode and not hasattr(handle, "read"):
826
raise TypeError(
827
"Expected file path name or file-like object, "
828
f"got {type(ioargs.filepath_or_buffer)} type"
829
)
830
831
handles.reverse() # close the most recently added buffer first
832
if ioargs.should_close:
833
assert not isinstance(ioargs.filepath_or_buffer, str)
834
handles.append(ioargs.filepath_or_buffer)
835
836
return IOHandles(
837
# error: Argument "handle" to "IOHandles" has incompatible type
838
# "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes],
839
# typing.IO[Any]]"; expected "pandas._typing.IO[Any]"
840
handle=handle, # type: ignore[arg-type]
841
# error: Argument "created_handles" to "IOHandles" has incompatible type
842
# "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]"
843
created_handles=handles, # type: ignore[arg-type]
844
is_wrapped=is_wrapped,
845
is_mmap=memory_map,
846
compression=ioargs.compression,
847
)
848
849
850
# error: Definition of "__exit__" in base class "ZipFile" is incompatible with
851
# definition in base class "BytesIO" [misc]
852
# error: Definition of "__enter__" in base class "ZipFile" is incompatible with
853
# definition in base class "BytesIO" [misc]
854
# error: Definition of "__enter__" in base class "ZipFile" is incompatible with
855
# definition in base class "BinaryIO" [misc]
856
# error: Definition of "__enter__" in base class "ZipFile" is incompatible with
857
# definition in base class "IO" [misc]
858
# error: Definition of "read" in base class "ZipFile" is incompatible with
859
# definition in base class "BytesIO" [misc]
860
# error: Definition of "read" in base class "ZipFile" is incompatible with
861
# definition in base class "IO" [misc]
862
class _BytesZipFile(zipfile.ZipFile, BytesIO): # type: ignore[misc]
863
"""
864
Wrapper for standard library class ZipFile and allow the returned file-like
865
handle to accept byte strings via `write` method.
866
867
BytesIO provides attributes of file-like object and ZipFile.writestr writes
868
bytes strings into a member of the archive.
869
"""
870
871
# GH 17778
872
def __init__(
873
self,
874
file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
875
mode: str,
876
archive_name: str | None = None,
877
**kwargs,
878
):
879
mode = mode.replace("b", "")
880
self.archive_name = archive_name
881
self.multiple_write_buffer: StringIO | BytesIO | None = None
882
883
kwargs_zip: dict[str, Any] = {"compression": zipfile.ZIP_DEFLATED}
884
kwargs_zip.update(kwargs)
885
886
# error: Argument 1 to "__init__" of "ZipFile" has incompatible type
887
# "Union[_PathLike[str], Union[str, Union[IO[Any], RawIOBase, BufferedIOBase,
888
# TextIOBase, TextIOWrapper, mmap]]]"; expected "Union[Union[str,
889
# _PathLike[str]], IO[bytes]]"
890
super().__init__(file, mode, **kwargs_zip) # type: ignore[arg-type]
891
892
def infer_filename(self):
893
"""
894
If an explicit archive_name is not given, we still want the file inside the zip
895
file not to be named something.zip, because that causes confusion (GH39465).
896
"""
897
if isinstance(self.filename, (os.PathLike, str)):
898
filename = Path(self.filename)
899
if filename.suffix == ".zip":
900
return filename.with_suffix("").name
901
return filename.name
902
return None
903
904
def write(self, data):
905
# buffer multiple write calls, write on flush
906
if self.multiple_write_buffer is None:
907
self.multiple_write_buffer = (
908
BytesIO() if isinstance(data, bytes) else StringIO()
909
)
910
self.multiple_write_buffer.write(data)
911
912
def flush(self) -> None:
913
# write to actual handle and close write buffer
914
if self.multiple_write_buffer is None or self.multiple_write_buffer.closed:
915
return
916
917
# ZipFile needs a non-empty string
918
archive_name = self.archive_name or self.infer_filename() or "zip"
919
with self.multiple_write_buffer:
920
super().writestr(archive_name, self.multiple_write_buffer.getvalue())
921
922
def close(self):
923
self.flush()
924
super().close()
925
926
@property
927
def closed(self):
928
return self.fp is None
929
930
931
class _MMapWrapper(abc.Iterator):
932
"""
933
Wrapper for the Python's mmap class so that it can be properly read in
934
by Python's csv.reader class.
935
936
Parameters
937
----------
938
f : file object
939
File object to be mapped onto memory. Must support the 'fileno'
940
method or have an equivalent attribute
941
942
"""
943
944
def __init__(
945
self,
946
f: IO,
947
encoding: str = "utf-8",
948
errors: str = "strict",
949
decode: bool = True,
950
):
951
self.encoding = encoding
952
self.errors = errors
953
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
954
self.decode = decode
955
956
self.attributes = {}
957
for attribute in ("seekable", "readable"):
958
if not hasattr(f, attribute):
959
continue
960
self.attributes[attribute] = getattr(f, attribute)()
961
self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
962
963
def __getattr__(self, name: str):
964
if name in self.attributes:
965
return lambda: self.attributes[name]
966
return getattr(self.mmap, name)
967
968
def __iter__(self) -> _MMapWrapper:
969
return self
970
971
def read(self, size: int = -1) -> str | bytes:
972
# CSV c-engine uses read instead of iterating
973
content: bytes = self.mmap.read(size)
974
if self.decode and self.encoding != "utf-8":
975
# memory mapping is applied before compression. Encoding should
976
# be applied to the de-compressed data.
977
final = size == -1 or len(content) < size
978
return self.decoder.decode(content, final=final)
979
return content
980
981
def __next__(self) -> str:
982
newbytes = self.mmap.readline()
983
984
# readline returns bytes, not str, but Python's CSV reader
985
# expects str, so convert the output to str before continuing
986
newline = self.decoder.decode(newbytes)
987
988
# mmap doesn't raise if reading past the allocated
989
# data but instead returns an empty string, so raise
990
# if that is returned
991
if newline == "":
992
raise StopIteration
993
994
# IncrementalDecoder seems to push newline to the next line
995
return newline.lstrip("\n")
996
997
998
class _IOWrapper:
999
# TextIOWrapper is overly strict: it request that the buffer has seekable, readable,
1000
# and writable. If we have a read-only buffer, we shouldn't need writable and vice
1001
# versa. Some buffers, are seek/read/writ-able but they do not have the "-able"
1002
# methods, e.g., tempfile.SpooledTemporaryFile.
1003
# If a buffer does not have the above "-able" methods, we simple assume they are
1004
# seek/read/writ-able.
1005
def __init__(self, buffer: BaseBuffer):
1006
self.buffer = buffer
1007
1008
def __getattr__(self, name: str):
1009
return getattr(self.buffer, name)
1010
1011
def readable(self) -> bool:
1012
if hasattr(self.buffer, "readable"):
1013
# error: "BaseBuffer" has no attribute "readable"
1014
return self.buffer.readable() # type: ignore[attr-defined]
1015
return True
1016
1017
def seekable(self) -> bool:
1018
if hasattr(self.buffer, "seekable"):
1019
return self.buffer.seekable()
1020
return True
1021
1022
def writable(self) -> bool:
1023
if hasattr(self.buffer, "writable"):
1024
# error: "BaseBuffer" has no attribute "writable"
1025
return self.buffer.writable() # type: ignore[attr-defined]
1026
return True
1027
1028
1029
class _BytesIOWrapper:
1030
# Wrapper that wraps a StringIO buffer and reads bytes from it
1031
# Created for compat with pyarrow read_csv
1032
def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8"):
1033
self.buffer = buffer
1034
self.encoding = encoding
1035
# Because a character can be represented by more than 1 byte,
1036
# it is possible that reading will produce more bytes than n
1037
# We store the extra bytes in this overflow variable, and append the
1038
# overflow to the front of the bytestring the next time reading is performed
1039
self.overflow = b""
1040
1041
def __getattr__(self, attr: str):
1042
return getattr(self.buffer, attr)
1043
1044
def read(self, n: int | None = -1) -> bytes:
1045
assert self.buffer is not None
1046
bytestring = self.buffer.read(n).encode(self.encoding)
1047
# When n=-1/n greater than remaining bytes: Read entire file/rest of file
1048
combined_bytestring = self.overflow + bytestring
1049
if n is None or n < 0 or n >= len(combined_bytestring):
1050
self.overflow = b""
1051
return combined_bytestring
1052
else:
1053
to_return = combined_bytestring[:n]
1054
self.overflow = combined_bytestring[n:]
1055
return to_return
1056
1057
1058
def _maybe_memory_map(
1059
handle: str | BaseBuffer,
1060
memory_map: bool,
1061
encoding: str,
1062
mode: str,
1063
errors: str | None,
1064
decode: bool,
1065
) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
1066
"""Try to memory map file/buffer."""
1067
handles: list[BaseBuffer] = []
1068
memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
1069
if not memory_map:
1070
return handle, memory_map, handles
1071
1072
# need to open the file first
1073
if isinstance(handle, str):
1074
if encoding and "b" not in mode:
1075
# Encoding
1076
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
1077
else:
1078
# Binary mode
1079
handle = open(handle, mode)
1080
handles.append(handle)
1081
1082
# error: Argument 1 to "_MMapWrapper" has incompatible type "Union[IO[Any],
1083
# RawIOBase, BufferedIOBase, TextIOBase, mmap]"; expected "IO[Any]"
1084
try:
1085
wrapped = cast(
1086
BaseBuffer,
1087
_MMapWrapper(handle, encoding, errors, decode), # type: ignore[arg-type]
1088
)
1089
finally:
1090
for handle in reversed(handles):
1091
# error: "BaseBuffer" has no attribute "close"
1092
handle.close() # type: ignore[attr-defined]
1093
handles.append(wrapped)
1094
1095
return wrapped, memory_map, handles
1096
1097
1098
def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool:
1099
"""Test whether file exists."""
1100
exists = False
1101
filepath_or_buffer = stringify_path(filepath_or_buffer)
1102
if not isinstance(filepath_or_buffer, str):
1103
return exists
1104
try:
1105
exists = os.path.exists(filepath_or_buffer)
1106
# gh-5874: if the filepath is too long will raise here
1107
except (TypeError, ValueError):
1108
pass
1109
return exists
1110
1111
1112
def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool:
1113
"""Whether the handle is opened in binary mode"""
1114
# specified by user
1115
if "t" in mode or "b" in mode:
1116
return "b" in mode
1117
1118
# exceptions
1119
text_classes = (
1120
# classes that expect string but have 'b' in mode
1121
codecs.StreamWriter,
1122
codecs.StreamReader,
1123
codecs.StreamReaderWriter,
1124
)
1125
if issubclass(type(handle), text_classes):
1126
return False
1127
1128
return isinstance(handle, _get_binary_io_classes()) or "b" in getattr(
1129
handle, "mode", mode
1130
)
1131
1132
1133
@functools.lru_cache
1134
def _get_binary_io_classes() -> tuple[type, ...]:
1135
"""IO classes that that expect bytes"""
1136
binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase)
1137
1138
# python-zstandard doesn't use any of the builtin base classes; instead we
1139
# have to use the `zstd.ZstdDecompressionReader` class for isinstance checks.
1140
# Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard
1141
# so we have to get it from a `zstd.ZstdDecompressor` instance.
1142
# See also https://github.com/indygreg/python-zstandard/pull/165.
1143
zstd = import_optional_dependency("zstandard", errors="ignore")
1144
if zstd is not None:
1145
with zstd.ZstdDecompressor().stream_reader(b"") as reader:
1146
binary_classes += (type(reader),)
1147
1148
return binary_classes
1149
1150