Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/_compression.py
12 views
1
"""Internal classes used by the gzip, lzma and bz2 modules"""
2
3
import io
4
import sys
5
6
BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size
7
8
9
class BaseStream(io.BufferedIOBase):
10
"""Mode-checking helper functions."""
11
12
def _check_not_closed(self):
13
if self.closed:
14
raise ValueError("I/O operation on closed file")
15
16
def _check_can_read(self):
17
if not self.readable():
18
raise io.UnsupportedOperation("File not open for reading")
19
20
def _check_can_write(self):
21
if not self.writable():
22
raise io.UnsupportedOperation("File not open for writing")
23
24
def _check_can_seek(self):
25
if not self.readable():
26
raise io.UnsupportedOperation("Seeking is only supported "
27
"on files open for reading")
28
if not self.seekable():
29
raise io.UnsupportedOperation("The underlying file object "
30
"does not support seeking")
31
32
33
class DecompressReader(io.RawIOBase):
34
"""Adapts the decompressor API to a RawIOBase reader API"""
35
36
def readable(self):
37
return True
38
39
def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
40
self._fp = fp
41
self._eof = False
42
self._pos = 0 # Current offset in decompressed stream
43
44
# Set to size of decompressed stream once it is known, for SEEK_END
45
self._size = -1
46
47
# Save the decompressor factory and arguments.
48
# If the file contains multiple compressed streams, each
49
# stream will need a separate decompressor object. A new decompressor
50
# object is also needed when implementing a backwards seek().
51
self._decomp_factory = decomp_factory
52
self._decomp_args = decomp_args
53
self._decompressor = self._decomp_factory(**self._decomp_args)
54
55
# Exception class to catch from decompressor signifying invalid
56
# trailing data to ignore
57
self._trailing_error = trailing_error
58
59
def close(self):
60
self._decompressor = None
61
return super().close()
62
63
def seekable(self):
64
return self._fp.seekable()
65
66
def readinto(self, b):
67
with memoryview(b) as view, view.cast("B") as byte_view:
68
data = self.read(len(byte_view))
69
byte_view[:len(data)] = data
70
return len(data)
71
72
def read(self, size=-1):
73
if size < 0:
74
return self.readall()
75
76
if not size or self._eof:
77
return b""
78
data = None # Default if EOF is encountered
79
# Depending on the input data, our call to the decompressor may not
80
# return any data. In this case, try again after reading another block.
81
while True:
82
if self._decompressor.eof:
83
rawblock = (self._decompressor.unused_data or
84
self._fp.read(BUFFER_SIZE))
85
if not rawblock:
86
break
87
# Continue to next stream.
88
self._decompressor = self._decomp_factory(
89
**self._decomp_args)
90
try:
91
data = self._decompressor.decompress(rawblock, size)
92
except self._trailing_error:
93
# Trailing data isn't a valid compressed stream; ignore it.
94
break
95
else:
96
if self._decompressor.needs_input:
97
rawblock = self._fp.read(BUFFER_SIZE)
98
if not rawblock:
99
raise EOFError("Compressed file ended before the "
100
"end-of-stream marker was reached")
101
else:
102
rawblock = b""
103
data = self._decompressor.decompress(rawblock, size)
104
if data:
105
break
106
if not data:
107
self._eof = True
108
self._size = self._pos
109
return b""
110
self._pos += len(data)
111
return data
112
113
def readall(self):
114
chunks = []
115
# sys.maxsize means the max length of output buffer is unlimited,
116
# so that the whole input buffer can be decompressed within one
117
# .decompress() call.
118
while data := self.read(sys.maxsize):
119
chunks.append(data)
120
121
return b"".join(chunks)
122
123
# Rewind the file to the beginning of the data stream.
124
def _rewind(self):
125
self._fp.seek(0)
126
self._eof = False
127
self._pos = 0
128
self._decompressor = self._decomp_factory(**self._decomp_args)
129
130
def seek(self, offset, whence=io.SEEK_SET):
131
# Recalculate offset as an absolute file position.
132
if whence == io.SEEK_SET:
133
pass
134
elif whence == io.SEEK_CUR:
135
offset = self._pos + offset
136
elif whence == io.SEEK_END:
137
# Seeking relative to EOF - we need to know the file's size.
138
if self._size < 0:
139
while self.read(io.DEFAULT_BUFFER_SIZE):
140
pass
141
offset = self._size + offset
142
else:
143
raise ValueError("Invalid value for whence: {}".format(whence))
144
145
# Make it so that offset is the number of bytes to skip forward.
146
if offset < self._pos:
147
self._rewind()
148
else:
149
offset -= self._pos
150
151
# Read and discard data until we reach the desired position.
152
while offset > 0:
153
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
154
if not data:
155
break
156
offset -= len(data)
157
158
return self._pos
159
160
def tell(self):
161
"""Return the current file position."""
162
return self._pos
163
164