Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/http_parser.py
7730 views
1
import abc
2
import asyncio
3
import collections
4
import re
5
import string
6
import zlib
7
from contextlib import suppress
8
from enum import IntEnum
9
from typing import (
10
Any,
11
Generic,
12
List,
13
NamedTuple,
14
Optional,
15
Pattern,
16
Set,
17
Tuple,
18
Type,
19
TypeVar,
20
Union,
21
cast,
22
)
23
24
from multidict import CIMultiDict, CIMultiDictProxy, istr
25
from yarl import URL
26
27
from . import hdrs
28
from .base_protocol import BaseProtocol
29
from .helpers import NO_EXTENSIONS, BaseTimerContext
30
from .http_exceptions import (
31
BadHttpMessage,
32
BadStatusLine,
33
ContentEncodingError,
34
ContentLengthError,
35
InvalidHeader,
36
LineTooLong,
37
TransferEncodingError,
38
)
39
from .http_writer import HttpVersion, HttpVersion10
40
from .log import internal_logger
41
from .streams import EMPTY_PAYLOAD, StreamReader
42
from .typedefs import Final, RawHeaders
43
44
try:
45
import brotli
46
47
HAS_BROTLI = True
48
except ImportError: # pragma: no cover
49
HAS_BROTLI = False
50
51
52
__all__ = (
53
"HeadersParser",
54
"HttpParser",
55
"HttpRequestParser",
56
"HttpResponseParser",
57
"RawRequestMessage",
58
"RawResponseMessage",
59
)
60
61
ASCIISET: Final[Set[str]] = set(string.printable)
62
63
# See https://tools.ietf.org/html/rfc7230#section-3.1.1
64
# and https://tools.ietf.org/html/rfc7230#appendix-B
65
#
66
# method = token
67
# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
68
# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
69
# token = 1*tchar
70
METHRE: Final[Pattern[str]] = re.compile(r"[!#$%&'*+\-.^_`|~0-9A-Za-z]+")
71
VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d+).(\d+)")
72
HDRRE: Final[Pattern[bytes]] = re.compile(rb"[\x00-\x1F\x7F()<>@,;:\[\]={} \t\\\\\"]")
73
74
75
class RawRequestMessage(NamedTuple):
76
method: str
77
path: str
78
version: HttpVersion
79
headers: "CIMultiDictProxy[str]"
80
raw_headers: RawHeaders
81
should_close: bool
82
compression: Optional[str]
83
upgrade: bool
84
chunked: bool
85
url: URL
86
87
88
RawResponseMessage = collections.namedtuple(
89
"RawResponseMessage",
90
[
91
"version",
92
"code",
93
"reason",
94
"headers",
95
"raw_headers",
96
"should_close",
97
"compression",
98
"upgrade",
99
"chunked",
100
],
101
)
102
103
104
_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
105
106
107
class ParseState(IntEnum):
108
109
PARSE_NONE = 0
110
PARSE_LENGTH = 1
111
PARSE_CHUNKED = 2
112
PARSE_UNTIL_EOF = 3
113
114
115
class ChunkState(IntEnum):
116
PARSE_CHUNKED_SIZE = 0
117
PARSE_CHUNKED_CHUNK = 1
118
PARSE_CHUNKED_CHUNK_EOF = 2
119
PARSE_MAYBE_TRAILERS = 3
120
PARSE_TRAILERS = 4
121
122
123
class HeadersParser:
124
def __init__(
125
self,
126
max_line_size: int = 8190,
127
max_headers: int = 32768,
128
max_field_size: int = 8190,
129
) -> None:
130
self.max_line_size = max_line_size
131
self.max_headers = max_headers
132
self.max_field_size = max_field_size
133
134
def parse_headers(
135
self, lines: List[bytes]
136
) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
137
headers = CIMultiDict() # type: CIMultiDict[str]
138
raw_headers = []
139
140
lines_idx = 1
141
line = lines[1]
142
line_count = len(lines)
143
144
while line:
145
# Parse initial header name : value pair.
146
try:
147
bname, bvalue = line.split(b":", 1)
148
except ValueError:
149
raise InvalidHeader(line) from None
150
151
bname = bname.strip(b" \t")
152
bvalue = bvalue.lstrip()
153
if HDRRE.search(bname):
154
raise InvalidHeader(bname)
155
if len(bname) > self.max_field_size:
156
raise LineTooLong(
157
"request header name {}".format(
158
bname.decode("utf8", "xmlcharrefreplace")
159
),
160
str(self.max_field_size),
161
str(len(bname)),
162
)
163
164
header_length = len(bvalue)
165
166
# next line
167
lines_idx += 1
168
line = lines[lines_idx]
169
170
# consume continuation lines
171
continuation = line and line[0] in (32, 9) # (' ', '\t')
172
173
if continuation:
174
bvalue_lst = [bvalue]
175
while continuation:
176
header_length += len(line)
177
if header_length > self.max_field_size:
178
raise LineTooLong(
179
"request header field {}".format(
180
bname.decode("utf8", "xmlcharrefreplace")
181
),
182
str(self.max_field_size),
183
str(header_length),
184
)
185
bvalue_lst.append(line)
186
187
# next line
188
lines_idx += 1
189
if lines_idx < line_count:
190
line = lines[lines_idx]
191
if line:
192
continuation = line[0] in (32, 9) # (' ', '\t')
193
else:
194
line = b""
195
break
196
bvalue = b"".join(bvalue_lst)
197
else:
198
if header_length > self.max_field_size:
199
raise LineTooLong(
200
"request header field {}".format(
201
bname.decode("utf8", "xmlcharrefreplace")
202
),
203
str(self.max_field_size),
204
str(header_length),
205
)
206
207
bvalue = bvalue.strip()
208
name = bname.decode("utf-8", "surrogateescape")
209
value = bvalue.decode("utf-8", "surrogateescape")
210
211
headers.add(name, value)
212
raw_headers.append((bname, bvalue))
213
214
return (CIMultiDictProxy(headers), tuple(raw_headers))
215
216
217
class HttpParser(abc.ABC, Generic[_MsgT]):
218
def __init__(
219
self,
220
protocol: Optional[BaseProtocol] = None,
221
loop: Optional[asyncio.AbstractEventLoop] = None,
222
limit: int = 2 ** 16,
223
max_line_size: int = 8190,
224
max_headers: int = 32768,
225
max_field_size: int = 8190,
226
timer: Optional[BaseTimerContext] = None,
227
code: Optional[int] = None,
228
method: Optional[str] = None,
229
readall: bool = False,
230
payload_exception: Optional[Type[BaseException]] = None,
231
response_with_body: bool = True,
232
read_until_eof: bool = False,
233
auto_decompress: bool = True,
234
) -> None:
235
self.protocol = protocol
236
self.loop = loop
237
self.max_line_size = max_line_size
238
self.max_headers = max_headers
239
self.max_field_size = max_field_size
240
self.timer = timer
241
self.code = code
242
self.method = method
243
self.readall = readall
244
self.payload_exception = payload_exception
245
self.response_with_body = response_with_body
246
self.read_until_eof = read_until_eof
247
248
self._lines = [] # type: List[bytes]
249
self._tail = b""
250
self._upgraded = False
251
self._payload = None
252
self._payload_parser = None # type: Optional[HttpPayloadParser]
253
self._auto_decompress = auto_decompress
254
self._limit = limit
255
self._headers_parser = HeadersParser(max_line_size, max_headers, max_field_size)
256
257
@abc.abstractmethod
258
def parse_message(self, lines: List[bytes]) -> _MsgT:
259
pass
260
261
def feed_eof(self) -> Optional[_MsgT]:
262
if self._payload_parser is not None:
263
self._payload_parser.feed_eof()
264
self._payload_parser = None
265
else:
266
# try to extract partial message
267
if self._tail:
268
self._lines.append(self._tail)
269
270
if self._lines:
271
if self._lines[-1] != "\r\n":
272
self._lines.append(b"")
273
with suppress(Exception):
274
return self.parse_message(self._lines)
275
return None
276
277
def feed_data(
278
self,
279
data: bytes,
280
SEP: bytes = b"\r\n",
281
EMPTY: bytes = b"",
282
CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
283
METH_CONNECT: str = hdrs.METH_CONNECT,
284
SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
285
) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
286
287
messages = []
288
289
if self._tail:
290
data, self._tail = self._tail + data, b""
291
292
data_len = len(data)
293
start_pos = 0
294
loop = self.loop
295
296
while start_pos < data_len:
297
298
# read HTTP message (request/response line + headers), \r\n\r\n
299
# and split by lines
300
if self._payload_parser is None and not self._upgraded:
301
pos = data.find(SEP, start_pos)
302
# consume \r\n
303
if pos == start_pos and not self._lines:
304
start_pos = pos + 2
305
continue
306
307
if pos >= start_pos:
308
# line found
309
self._lines.append(data[start_pos:pos])
310
start_pos = pos + 2
311
312
# \r\n\r\n found
313
if self._lines[-1] == EMPTY:
314
try:
315
msg: _MsgT = self.parse_message(self._lines)
316
finally:
317
self._lines.clear()
318
319
def get_content_length() -> Optional[int]:
320
# payload length
321
length_hdr = msg.headers.get(CONTENT_LENGTH)
322
if length_hdr is None:
323
return None
324
325
try:
326
length = int(length_hdr)
327
except ValueError:
328
raise InvalidHeader(CONTENT_LENGTH)
329
330
if length < 0:
331
raise InvalidHeader(CONTENT_LENGTH)
332
333
return length
334
335
length = get_content_length()
336
# do not support old websocket spec
337
if SEC_WEBSOCKET_KEY1 in msg.headers:
338
raise InvalidHeader(SEC_WEBSOCKET_KEY1)
339
340
self._upgraded = msg.upgrade
341
342
method = getattr(msg, "method", self.method)
343
344
assert self.protocol is not None
345
# calculate payload
346
if (
347
(length is not None and length > 0)
348
or msg.chunked
349
and not msg.upgrade
350
):
351
payload = StreamReader(
352
self.protocol,
353
timer=self.timer,
354
loop=loop,
355
limit=self._limit,
356
)
357
payload_parser = HttpPayloadParser(
358
payload,
359
length=length,
360
chunked=msg.chunked,
361
method=method,
362
compression=msg.compression,
363
code=self.code,
364
readall=self.readall,
365
response_with_body=self.response_with_body,
366
auto_decompress=self._auto_decompress,
367
)
368
if not payload_parser.done:
369
self._payload_parser = payload_parser
370
elif method == METH_CONNECT:
371
assert isinstance(msg, RawRequestMessage)
372
payload = StreamReader(
373
self.protocol,
374
timer=self.timer,
375
loop=loop,
376
limit=self._limit,
377
)
378
self._upgraded = True
379
self._payload_parser = HttpPayloadParser(
380
payload,
381
method=msg.method,
382
compression=msg.compression,
383
readall=True,
384
auto_decompress=self._auto_decompress,
385
)
386
else:
387
if (
388
getattr(msg, "code", 100) >= 199
389
and length is None
390
and self.read_until_eof
391
):
392
payload = StreamReader(
393
self.protocol,
394
timer=self.timer,
395
loop=loop,
396
limit=self._limit,
397
)
398
payload_parser = HttpPayloadParser(
399
payload,
400
length=length,
401
chunked=msg.chunked,
402
method=method,
403
compression=msg.compression,
404
code=self.code,
405
readall=True,
406
response_with_body=self.response_with_body,
407
auto_decompress=self._auto_decompress,
408
)
409
if not payload_parser.done:
410
self._payload_parser = payload_parser
411
else:
412
payload = EMPTY_PAYLOAD
413
414
messages.append((msg, payload))
415
else:
416
self._tail = data[start_pos:]
417
data = EMPTY
418
break
419
420
# no parser, just store
421
elif self._payload_parser is None and self._upgraded:
422
assert not self._lines
423
break
424
425
# feed payload
426
elif data and start_pos < data_len:
427
assert not self._lines
428
assert self._payload_parser is not None
429
try:
430
eof, data = self._payload_parser.feed_data(data[start_pos:])
431
except BaseException as exc:
432
if self.payload_exception is not None:
433
self._payload_parser.payload.set_exception(
434
self.payload_exception(str(exc))
435
)
436
else:
437
self._payload_parser.payload.set_exception(exc)
438
439
eof = True
440
data = b""
441
442
if eof:
443
start_pos = 0
444
data_len = len(data)
445
self._payload_parser = None
446
continue
447
else:
448
break
449
450
if data and start_pos < data_len:
451
data = data[start_pos:]
452
else:
453
data = EMPTY
454
455
return messages, self._upgraded, data
456
457
def parse_headers(
458
self, lines: List[bytes]
459
) -> Tuple[
460
"CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
461
]:
462
"""Parses RFC 5322 headers from a stream.
463
464
Line continuations are supported. Returns list of header name
465
and value pairs. Header name is in upper case.
466
"""
467
headers, raw_headers = self._headers_parser.parse_headers(lines)
468
close_conn = None
469
encoding = None
470
upgrade = False
471
chunked = False
472
473
# keep-alive
474
conn = headers.get(hdrs.CONNECTION)
475
if conn:
476
v = conn.lower()
477
if v == "close":
478
close_conn = True
479
elif v == "keep-alive":
480
close_conn = False
481
elif v == "upgrade":
482
upgrade = True
483
484
# encoding
485
enc = headers.get(hdrs.CONTENT_ENCODING)
486
if enc:
487
enc = enc.lower()
488
if enc in ("gzip", "deflate", "br"):
489
encoding = enc
490
491
# chunking
492
te = headers.get(hdrs.TRANSFER_ENCODING)
493
if te is not None:
494
if "chunked" == te.lower():
495
chunked = True
496
else:
497
raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
498
499
if hdrs.CONTENT_LENGTH in headers:
500
raise BadHttpMessage(
501
"Content-Length can't be present with Transfer-Encoding",
502
)
503
504
return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
505
506
def set_upgraded(self, val: bool) -> None:
507
"""Set connection upgraded (to websocket) mode.
508
509
:param bool val: new state.
510
"""
511
self._upgraded = val
512
513
514
class HttpRequestParser(HttpParser[RawRequestMessage]):
515
"""Read request status line.
516
517
Exception .http_exceptions.BadStatusLine
518
could be raised in case of any errors in status line.
519
Returns RawRequestMessage.
520
"""
521
522
def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
523
# request line
524
line = lines[0].decode("utf-8", "surrogateescape")
525
try:
526
method, path, version = line.split(None, 2)
527
except ValueError:
528
raise BadStatusLine(line) from None
529
530
if len(path) > self.max_line_size:
531
raise LineTooLong(
532
"Status line is too long", str(self.max_line_size), str(len(path))
533
)
534
535
path_part, _hash_separator, url_fragment = path.partition("#")
536
path_part, _question_mark_separator, qs_part = path_part.partition("?")
537
538
# method
539
if not METHRE.match(method):
540
raise BadStatusLine(method)
541
542
# version
543
try:
544
if version.startswith("HTTP/"):
545
n1, n2 = version[5:].split(".", 1)
546
version_o = HttpVersion(int(n1), int(n2))
547
else:
548
raise BadStatusLine(version)
549
except Exception:
550
raise BadStatusLine(version)
551
552
# read headers
553
(
554
headers,
555
raw_headers,
556
close,
557
compression,
558
upgrade,
559
chunked,
560
) = self.parse_headers(lines)
561
562
if close is None: # then the headers weren't set in the request
563
if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
564
close = True
565
else: # HTTP 1.1 must ask to close.
566
close = False
567
568
return RawRequestMessage(
569
method,
570
path,
571
version_o,
572
headers,
573
raw_headers,
574
close,
575
compression,
576
upgrade,
577
chunked,
578
# NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
579
# NOTE: parser does, otherwise it results into the same
580
# NOTE: HTTP Request-Line input producing different
581
# NOTE: `yarl.URL()` objects
582
URL.build(
583
path=path_part,
584
query_string=qs_part,
585
fragment=url_fragment,
586
encoded=True,
587
),
588
)
589
590
591
class HttpResponseParser(HttpParser[RawResponseMessage]):
592
"""Read response status line and headers.
593
594
BadStatusLine could be raised in case of any errors in status line.
595
Returns RawResponseMessage.
596
"""
597
598
def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
599
line = lines[0].decode("utf-8", "surrogateescape")
600
try:
601
version, status = line.split(None, 1)
602
except ValueError:
603
raise BadStatusLine(line) from None
604
605
try:
606
status, reason = status.split(None, 1)
607
except ValueError:
608
reason = ""
609
610
if len(reason) > self.max_line_size:
611
raise LineTooLong(
612
"Status line is too long", str(self.max_line_size), str(len(reason))
613
)
614
615
# version
616
match = VERSRE.match(version)
617
if match is None:
618
raise BadStatusLine(line)
619
version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
620
621
# The status code is a three-digit number
622
try:
623
status_i = int(status)
624
except ValueError:
625
raise BadStatusLine(line) from None
626
627
if status_i > 999:
628
raise BadStatusLine(line)
629
630
# read headers
631
(
632
headers,
633
raw_headers,
634
close,
635
compression,
636
upgrade,
637
chunked,
638
) = self.parse_headers(lines)
639
640
if close is None:
641
close = version_o <= HttpVersion10
642
643
return RawResponseMessage(
644
version_o,
645
status_i,
646
reason.strip(),
647
headers,
648
raw_headers,
649
close,
650
compression,
651
upgrade,
652
chunked,
653
)
654
655
656
class HttpPayloadParser:
657
def __init__(
658
self,
659
payload: StreamReader,
660
length: Optional[int] = None,
661
chunked: bool = False,
662
compression: Optional[str] = None,
663
code: Optional[int] = None,
664
method: Optional[str] = None,
665
readall: bool = False,
666
response_with_body: bool = True,
667
auto_decompress: bool = True,
668
) -> None:
669
self._length = 0
670
self._type = ParseState.PARSE_NONE
671
self._chunk = ChunkState.PARSE_CHUNKED_SIZE
672
self._chunk_size = 0
673
self._chunk_tail = b""
674
self._auto_decompress = auto_decompress
675
self.done = False
676
677
# payload decompression wrapper
678
if response_with_body and compression and self._auto_decompress:
679
real_payload = DeflateBuffer(
680
payload, compression
681
) # type: Union[StreamReader, DeflateBuffer]
682
else:
683
real_payload = payload
684
685
# payload parser
686
if not response_with_body:
687
# don't parse payload if it's not expected to be received
688
self._type = ParseState.PARSE_NONE
689
real_payload.feed_eof()
690
self.done = True
691
692
elif chunked:
693
self._type = ParseState.PARSE_CHUNKED
694
elif length is not None:
695
self._type = ParseState.PARSE_LENGTH
696
self._length = length
697
if self._length == 0:
698
real_payload.feed_eof()
699
self.done = True
700
else:
701
if readall and code != 204:
702
self._type = ParseState.PARSE_UNTIL_EOF
703
elif method in ("PUT", "POST"):
704
internal_logger.warning( # pragma: no cover
705
"Content-Length or Transfer-Encoding header is required"
706
)
707
self._type = ParseState.PARSE_NONE
708
real_payload.feed_eof()
709
self.done = True
710
711
self.payload = real_payload
712
713
def feed_eof(self) -> None:
714
if self._type == ParseState.PARSE_UNTIL_EOF:
715
self.payload.feed_eof()
716
elif self._type == ParseState.PARSE_LENGTH:
717
raise ContentLengthError(
718
"Not enough data for satisfy content length header."
719
)
720
elif self._type == ParseState.PARSE_CHUNKED:
721
raise TransferEncodingError(
722
"Not enough data for satisfy transfer length header."
723
)
724
725
def feed_data(
726
self, chunk: bytes, SEP: bytes = b"\r\n", CHUNK_EXT: bytes = b";"
727
) -> Tuple[bool, bytes]:
728
# Read specified amount of bytes
729
if self._type == ParseState.PARSE_LENGTH:
730
required = self._length
731
chunk_len = len(chunk)
732
733
if required >= chunk_len:
734
self._length = required - chunk_len
735
self.payload.feed_data(chunk, chunk_len)
736
if self._length == 0:
737
self.payload.feed_eof()
738
return True, b""
739
else:
740
self._length = 0
741
self.payload.feed_data(chunk[:required], required)
742
self.payload.feed_eof()
743
return True, chunk[required:]
744
745
# Chunked transfer encoding parser
746
elif self._type == ParseState.PARSE_CHUNKED:
747
if self._chunk_tail:
748
chunk = self._chunk_tail + chunk
749
self._chunk_tail = b""
750
751
while chunk:
752
753
# read next chunk size
754
if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
755
pos = chunk.find(SEP)
756
if pos >= 0:
757
i = chunk.find(CHUNK_EXT, 0, pos)
758
if i >= 0:
759
size_b = chunk[:i] # strip chunk-extensions
760
else:
761
size_b = chunk[:pos]
762
763
try:
764
size = int(bytes(size_b), 16)
765
except ValueError:
766
exc = TransferEncodingError(
767
chunk[:pos].decode("ascii", "surrogateescape")
768
)
769
self.payload.set_exception(exc)
770
raise exc from None
771
772
chunk = chunk[pos + 2 :]
773
if size == 0: # eof marker
774
self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
775
else:
776
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
777
self._chunk_size = size
778
self.payload.begin_http_chunk_receiving()
779
else:
780
self._chunk_tail = chunk
781
return False, b""
782
783
# read chunk and feed buffer
784
if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
785
required = self._chunk_size
786
chunk_len = len(chunk)
787
788
if required > chunk_len:
789
self._chunk_size = required - chunk_len
790
self.payload.feed_data(chunk, chunk_len)
791
return False, b""
792
else:
793
self._chunk_size = 0
794
self.payload.feed_data(chunk[:required], required)
795
chunk = chunk[required:]
796
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
797
self.payload.end_http_chunk_receiving()
798
799
# toss the CRLF at the end of the chunk
800
if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
801
if chunk[:2] == SEP:
802
chunk = chunk[2:]
803
self._chunk = ChunkState.PARSE_CHUNKED_SIZE
804
else:
805
self._chunk_tail = chunk
806
return False, b""
807
808
# if stream does not contain trailer, after 0\r\n
809
# we should get another \r\n otherwise
810
# trailers needs to be skiped until \r\n\r\n
811
if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS:
812
head = chunk[:2]
813
if head == SEP:
814
# end of stream
815
self.payload.feed_eof()
816
return True, chunk[2:]
817
# Both CR and LF, or only LF may not be received yet. It is
818
# expected that CRLF or LF will be shown at the very first
819
# byte next time, otherwise trailers should come. The last
820
# CRLF which marks the end of response might not be
821
# contained in the same TCP segment which delivered the
822
# size indicator.
823
if not head:
824
return False, b""
825
if head == SEP[:1]:
826
self._chunk_tail = head
827
return False, b""
828
self._chunk = ChunkState.PARSE_TRAILERS
829
830
# read and discard trailer up to the CRLF terminator
831
if self._chunk == ChunkState.PARSE_TRAILERS:
832
pos = chunk.find(SEP)
833
if pos >= 0:
834
chunk = chunk[pos + 2 :]
835
self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
836
else:
837
self._chunk_tail = chunk
838
return False, b""
839
840
# Read all bytes until eof
841
elif self._type == ParseState.PARSE_UNTIL_EOF:
842
self.payload.feed_data(chunk, len(chunk))
843
844
return False, b""
845
846
847
class DeflateBuffer:
848
"""DeflateStream decompress stream and feed data into specified stream."""
849
850
decompressor: Any
851
852
def __init__(self, out: StreamReader, encoding: Optional[str]) -> None:
853
self.out = out
854
self.size = 0
855
self.encoding = encoding
856
self._started_decoding = False
857
858
if encoding == "br":
859
if not HAS_BROTLI: # pragma: no cover
860
raise ContentEncodingError(
861
"Can not decode content-encoding: brotli (br). "
862
"Please install `Brotli`"
863
)
864
865
class BrotliDecoder:
866
# Supports both 'brotlipy' and 'Brotli' packages
867
# since they share an import name. The top branches
868
# are for 'brotlipy' and bottom branches for 'Brotli'
869
def __init__(self) -> None:
870
self._obj = brotli.Decompressor()
871
872
def decompress(self, data: bytes) -> bytes:
873
if hasattr(self._obj, "decompress"):
874
return cast(bytes, self._obj.decompress(data))
875
return cast(bytes, self._obj.process(data))
876
877
def flush(self) -> bytes:
878
if hasattr(self._obj, "flush"):
879
return cast(bytes, self._obj.flush())
880
return b""
881
882
self.decompressor = BrotliDecoder()
883
else:
884
zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else zlib.MAX_WBITS
885
self.decompressor = zlib.decompressobj(wbits=zlib_mode)
886
887
def set_exception(self, exc: BaseException) -> None:
888
self.out.set_exception(exc)
889
890
def feed_data(self, chunk: bytes, size: int) -> None:
891
if not size:
892
return
893
894
self.size += size
895
896
# RFC1950
897
# bits 0..3 = CM = 0b1000 = 8 = "deflate"
898
# bits 4..7 = CINFO = 1..7 = windows size.
899
if (
900
not self._started_decoding
901
and self.encoding == "deflate"
902
and chunk[0] & 0xF != 8
903
):
904
# Change the decoder to decompress incorrectly compressed data
905
# Actually we should issue a warning about non-RFC-compliant data.
906
self.decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
907
908
try:
909
chunk = self.decompressor.decompress(chunk)
910
except Exception:
911
raise ContentEncodingError(
912
"Can not decode content-encoding: %s" % self.encoding
913
)
914
915
self._started_decoding = True
916
917
if chunk:
918
self.out.feed_data(chunk, len(chunk))
919
920
def feed_eof(self) -> None:
921
chunk = self.decompressor.flush()
922
923
if chunk or self.size > 0:
924
self.out.feed_data(chunk, len(chunk))
925
if self.encoding == "deflate" and not self.decompressor.eof:
926
raise ContentEncodingError("deflate")
927
928
self.out.feed_eof()
929
930
def begin_http_chunk_receiving(self) -> None:
931
self.out.begin_http_chunk_receiving()
932
933
def end_http_chunk_receiving(self) -> None:
934
self.out.end_http_chunk_receiving()
935
936
937
HttpRequestParserPy = HttpRequestParser
938
HttpResponseParserPy = HttpResponseParser
939
RawRequestMessagePy = RawRequestMessage
940
RawResponseMessagePy = RawResponseMessage
941
942
try:
943
if not NO_EXTENSIONS:
944
from ._http_parser import ( # type: ignore[import,no-redef]
945
HttpRequestParser,
946
HttpResponseParser,
947
RawRequestMessage,
948
RawResponseMessage,
949
)
950
951
HttpRequestParserC = HttpRequestParser
952
HttpResponseParserC = HttpResponseParser
953
RawRequestMessageC = RawRequestMessage
954
RawResponseMessageC = RawResponseMessage
955
except ImportError: # pragma: no cover
956
pass
957
958