Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py
7757 views
1
import asyncio
2
import codecs
3
import functools
4
import io
5
import re
6
import sys
7
import traceback
8
import warnings
9
from hashlib import md5, sha1, sha256
10
from http.cookies import CookieError, Morsel, SimpleCookie
11
from types import MappingProxyType, TracebackType
12
from typing import (
13
TYPE_CHECKING,
14
Any,
15
Dict,
16
Iterable,
17
List,
18
Mapping,
19
Optional,
20
Tuple,
21
Type,
22
Union,
23
cast,
24
)
25
26
import attr
27
from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
28
from yarl import URL
29
30
from . import hdrs, helpers, http, multipart, payload
31
from .abc import AbstractStreamWriter
32
from .client_exceptions import (
33
ClientConnectionError,
34
ClientOSError,
35
ClientResponseError,
36
ContentTypeError,
37
InvalidURL,
38
ServerFingerprintMismatch,
39
)
40
from .formdata import FormData
41
from .helpers import (
42
PY_36,
43
BaseTimerContext,
44
BasicAuth,
45
HeadersMixin,
46
TimerNoop,
47
noop,
48
reify,
49
set_result,
50
)
51
from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
52
from .log import client_logger
53
from .streams import StreamReader
54
from .typedefs import (
55
DEFAULT_JSON_DECODER,
56
JSONDecoder,
57
LooseCookies,
58
LooseHeaders,
59
RawHeaders,
60
)
61
62
try:
63
import ssl
64
from ssl import SSLContext
65
except ImportError: # pragma: no cover
66
ssl = None # type: ignore[assignment]
67
SSLContext = object # type: ignore[misc,assignment]
68
69
try:
70
import cchardet as chardet
71
except ImportError: # pragma: no cover
72
import charset_normalizer as chardet # type: ignore[no-redef]
73
74
75
__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
76
77
78
if TYPE_CHECKING: # pragma: no cover
79
from .client import ClientSession
80
from .connector import Connection
81
from .tracing import Trace
82
83
84
json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
85
86
87
@attr.s(auto_attribs=True, frozen=True, slots=True)
88
class ContentDisposition:
89
type: Optional[str]
90
parameters: "MappingProxyType[str, str]"
91
filename: Optional[str]
92
93
94
@attr.s(auto_attribs=True, frozen=True, slots=True)
95
class RequestInfo:
96
url: URL
97
method: str
98
headers: "CIMultiDictProxy[str]"
99
real_url: URL = attr.ib()
100
101
@real_url.default
102
def real_url_default(self) -> URL:
103
return self.url
104
105
106
class Fingerprint:
107
HASHFUNC_BY_DIGESTLEN = {
108
16: md5,
109
20: sha1,
110
32: sha256,
111
}
112
113
def __init__(self, fingerprint: bytes) -> None:
114
digestlen = len(fingerprint)
115
hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
116
if not hashfunc:
117
raise ValueError("fingerprint has invalid length")
118
elif hashfunc is md5 or hashfunc is sha1:
119
raise ValueError(
120
"md5 and sha1 are insecure and " "not supported. Use sha256."
121
)
122
self._hashfunc = hashfunc
123
self._fingerprint = fingerprint
124
125
@property
126
def fingerprint(self) -> bytes:
127
return self._fingerprint
128
129
def check(self, transport: asyncio.Transport) -> None:
130
if not transport.get_extra_info("sslcontext"):
131
return
132
sslobj = transport.get_extra_info("ssl_object")
133
cert = sslobj.getpeercert(binary_form=True)
134
got = self._hashfunc(cert).digest()
135
if got != self._fingerprint:
136
host, port, *_ = transport.get_extra_info("peername")
137
raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
138
139
140
if ssl is not None:
141
SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
142
else: # pragma: no cover
143
SSL_ALLOWED_TYPES = type(None)
144
145
146
def _merge_ssl_params(
147
ssl: Union["SSLContext", bool, Fingerprint, None],
148
verify_ssl: Optional[bool],
149
ssl_context: Optional["SSLContext"],
150
fingerprint: Optional[bytes],
151
) -> Union["SSLContext", bool, Fingerprint, None]:
152
if verify_ssl is not None and not verify_ssl:
153
warnings.warn(
154
"verify_ssl is deprecated, use ssl=False instead",
155
DeprecationWarning,
156
stacklevel=3,
157
)
158
if ssl is not None:
159
raise ValueError(
160
"verify_ssl, ssl_context, fingerprint and ssl "
161
"parameters are mutually exclusive"
162
)
163
else:
164
ssl = False
165
if ssl_context is not None:
166
warnings.warn(
167
"ssl_context is deprecated, use ssl=context instead",
168
DeprecationWarning,
169
stacklevel=3,
170
)
171
if ssl is not None:
172
raise ValueError(
173
"verify_ssl, ssl_context, fingerprint and ssl "
174
"parameters are mutually exclusive"
175
)
176
else:
177
ssl = ssl_context
178
if fingerprint is not None:
179
warnings.warn(
180
"fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead",
181
DeprecationWarning,
182
stacklevel=3,
183
)
184
if ssl is not None:
185
raise ValueError(
186
"verify_ssl, ssl_context, fingerprint and ssl "
187
"parameters are mutually exclusive"
188
)
189
else:
190
ssl = Fingerprint(fingerprint)
191
if not isinstance(ssl, SSL_ALLOWED_TYPES):
192
raise TypeError(
193
"ssl should be SSLContext, bool, Fingerprint or None, "
194
"got {!r} instead.".format(ssl)
195
)
196
return ssl
197
198
199
@attr.s(auto_attribs=True, slots=True, frozen=True)
200
class ConnectionKey:
201
# the key should contain an information about used proxy / TLS
202
# to prevent reusing wrong connections from a pool
203
host: str
204
port: Optional[int]
205
is_ssl: bool
206
ssl: Union[SSLContext, None, bool, Fingerprint]
207
proxy: Optional[URL]
208
proxy_auth: Optional[BasicAuth]
209
proxy_headers_hash: Optional[int] # hash(CIMultiDict)
210
211
212
def _is_expected_content_type(
213
response_content_type: str, expected_content_type: str
214
) -> bool:
215
if expected_content_type == "application/json":
216
return json_re.match(response_content_type) is not None
217
return expected_content_type in response_content_type
218
219
220
class ClientRequest:
221
GET_METHODS = {
222
hdrs.METH_GET,
223
hdrs.METH_HEAD,
224
hdrs.METH_OPTIONS,
225
hdrs.METH_TRACE,
226
}
227
POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
228
ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
229
230
DEFAULT_HEADERS = {
231
hdrs.ACCEPT: "*/*",
232
hdrs.ACCEPT_ENCODING: "gzip, deflate",
233
}
234
235
body = b""
236
auth = None
237
response = None
238
239
_writer = None # async task for streaming data
240
_continue = None # waiter future for '100 Continue' response
241
242
# N.B.
243
# Adding __del__ method with self._writer closing doesn't make sense
244
# because _writer is instance method, thus it keeps a reference to self.
245
# Until writer has finished finalizer will not be called.
246
247
def __init__(
248
self,
249
method: str,
250
url: URL,
251
*,
252
params: Optional[Mapping[str, str]] = None,
253
headers: Optional[LooseHeaders] = None,
254
skip_auto_headers: Iterable[str] = frozenset(),
255
data: Any = None,
256
cookies: Optional[LooseCookies] = None,
257
auth: Optional[BasicAuth] = None,
258
version: http.HttpVersion = http.HttpVersion11,
259
compress: Optional[str] = None,
260
chunked: Optional[bool] = None,
261
expect100: bool = False,
262
loop: Optional[asyncio.AbstractEventLoop] = None,
263
response_class: Optional[Type["ClientResponse"]] = None,
264
proxy: Optional[URL] = None,
265
proxy_auth: Optional[BasicAuth] = None,
266
timer: Optional[BaseTimerContext] = None,
267
session: Optional["ClientSession"] = None,
268
ssl: Union[SSLContext, bool, Fingerprint, None] = None,
269
proxy_headers: Optional[LooseHeaders] = None,
270
traces: Optional[List["Trace"]] = None,
271
):
272
273
if loop is None:
274
loop = asyncio.get_event_loop()
275
276
assert isinstance(url, URL), url
277
assert isinstance(proxy, (URL, type(None))), proxy
278
# FIXME: session is None in tests only, need to fix tests
279
# assert session is not None
280
self._session = cast("ClientSession", session)
281
if params:
282
q = MultiDict(url.query)
283
url2 = url.with_query(params)
284
q.extend(url2.query)
285
url = url.with_query(q)
286
self.original_url = url
287
self.url = url.with_fragment(None)
288
self.method = method.upper()
289
self.chunked = chunked
290
self.compress = compress
291
self.loop = loop
292
self.length = None
293
if response_class is None:
294
real_response_class = ClientResponse
295
else:
296
real_response_class = response_class
297
self.response_class = real_response_class # type: Type[ClientResponse]
298
self._timer = timer if timer is not None else TimerNoop()
299
self._ssl = ssl
300
301
if loop.get_debug():
302
self._source_traceback = traceback.extract_stack(sys._getframe(1))
303
304
self.update_version(version)
305
self.update_host(url)
306
self.update_headers(headers)
307
self.update_auto_headers(skip_auto_headers)
308
self.update_cookies(cookies)
309
self.update_content_encoding(data)
310
self.update_auth(auth)
311
self.update_proxy(proxy, proxy_auth, proxy_headers)
312
313
self.update_body_from_data(data)
314
if data is not None or self.method not in self.GET_METHODS:
315
self.update_transfer_encoding()
316
self.update_expect_continue(expect100)
317
if traces is None:
318
traces = []
319
self._traces = traces
320
321
def is_ssl(self) -> bool:
322
return self.url.scheme in ("https", "wss")
323
324
@property
325
def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]:
326
return self._ssl
327
328
@property
329
def connection_key(self) -> ConnectionKey:
330
proxy_headers = self.proxy_headers
331
if proxy_headers:
332
h = hash(
333
tuple((k, v) for k, v in proxy_headers.items())
334
) # type: Optional[int]
335
else:
336
h = None
337
return ConnectionKey(
338
self.host,
339
self.port,
340
self.is_ssl(),
341
self.ssl,
342
self.proxy,
343
self.proxy_auth,
344
h,
345
)
346
347
@property
348
def host(self) -> str:
349
ret = self.url.raw_host
350
assert ret is not None
351
return ret
352
353
@property
354
def port(self) -> Optional[int]:
355
return self.url.port
356
357
@property
358
def request_info(self) -> RequestInfo:
359
headers = CIMultiDictProxy(self.headers) # type: CIMultiDictProxy[str]
360
return RequestInfo(self.url, self.method, headers, self.original_url)
361
362
def update_host(self, url: URL) -> None:
363
"""Update destination host, port and connection type (ssl)."""
364
# get host/port
365
if not url.raw_host:
366
raise InvalidURL(url)
367
368
# basic auth info
369
username, password = url.user, url.password
370
if username:
371
self.auth = helpers.BasicAuth(username, password or "")
372
373
def update_version(self, version: Union[http.HttpVersion, str]) -> None:
374
"""Convert request version to two elements tuple.
375
376
parser HTTP version '1.1' => (1, 1)
377
"""
378
if isinstance(version, str):
379
v = [part.strip() for part in version.split(".", 1)]
380
try:
381
version = http.HttpVersion(int(v[0]), int(v[1]))
382
except ValueError:
383
raise ValueError(
384
f"Can not parse http version number: {version}"
385
) from None
386
self.version = version
387
388
def update_headers(self, headers: Optional[LooseHeaders]) -> None:
389
"""Update request headers."""
390
self.headers = CIMultiDict() # type: CIMultiDict[str]
391
392
# add host
393
netloc = cast(str, self.url.raw_host)
394
if helpers.is_ipv6_address(netloc):
395
netloc = f"[{netloc}]"
396
if self.url.port is not None and not self.url.is_default_port():
397
netloc += ":" + str(self.url.port)
398
self.headers[hdrs.HOST] = netloc
399
400
if headers:
401
if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
402
headers = headers.items() # type: ignore[assignment]
403
404
for key, value in headers: # type: ignore[misc]
405
# A special case for Host header
406
if key.lower() == "host":
407
self.headers[key] = value
408
else:
409
self.headers.add(key, value)
410
411
def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
412
self.skip_auto_headers = CIMultiDict(
413
(hdr, None) for hdr in sorted(skip_auto_headers)
414
)
415
used_headers = self.headers.copy()
416
used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
417
418
for hdr, val in self.DEFAULT_HEADERS.items():
419
if hdr not in used_headers:
420
self.headers.add(hdr, val)
421
422
if hdrs.USER_AGENT not in used_headers:
423
self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
424
425
def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
426
"""Update request cookies header."""
427
if not cookies:
428
return
429
430
c = SimpleCookie() # type: SimpleCookie[str]
431
if hdrs.COOKIE in self.headers:
432
c.load(self.headers.get(hdrs.COOKIE, ""))
433
del self.headers[hdrs.COOKIE]
434
435
if isinstance(cookies, Mapping):
436
iter_cookies = cookies.items()
437
else:
438
iter_cookies = cookies # type: ignore[assignment]
439
for name, value in iter_cookies:
440
if isinstance(value, Morsel):
441
# Preserve coded_value
442
mrsl_val = value.get(value.key, Morsel())
443
mrsl_val.set(value.key, value.value, value.coded_value)
444
c[name] = mrsl_val
445
else:
446
c[name] = value # type: ignore[assignment]
447
448
self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
449
450
def update_content_encoding(self, data: Any) -> None:
451
"""Set request content encoding."""
452
if data is None:
453
return
454
455
enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
456
if enc:
457
if self.compress:
458
raise ValueError(
459
"compress can not be set " "if Content-Encoding header is set"
460
)
461
elif self.compress:
462
if not isinstance(self.compress, str):
463
self.compress = "deflate"
464
self.headers[hdrs.CONTENT_ENCODING] = self.compress
465
self.chunked = True # enable chunked, no need to deal with length
466
467
def update_transfer_encoding(self) -> None:
468
"""Analyze transfer-encoding header."""
469
te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
470
471
if "chunked" in te:
472
if self.chunked:
473
raise ValueError(
474
"chunked can not be set "
475
'if "Transfer-Encoding: chunked" header is set'
476
)
477
478
elif self.chunked:
479
if hdrs.CONTENT_LENGTH in self.headers:
480
raise ValueError(
481
"chunked can not be set " "if Content-Length header is set"
482
)
483
484
self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
485
else:
486
if hdrs.CONTENT_LENGTH not in self.headers:
487
self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
488
489
def update_auth(self, auth: Optional[BasicAuth]) -> None:
490
"""Set basic auth."""
491
if auth is None:
492
auth = self.auth
493
if auth is None:
494
return
495
496
if not isinstance(auth, helpers.BasicAuth):
497
raise TypeError("BasicAuth() tuple is required instead")
498
499
self.headers[hdrs.AUTHORIZATION] = auth.encode()
500
501
def update_body_from_data(self, body: Any) -> None:
502
if body is None:
503
return
504
505
# FormData
506
if isinstance(body, FormData):
507
body = body()
508
509
try:
510
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
511
except payload.LookupError:
512
body = FormData(body)()
513
514
self.body = body
515
516
# enable chunked encoding if needed
517
if not self.chunked:
518
if hdrs.CONTENT_LENGTH not in self.headers:
519
size = body.size
520
if size is None:
521
self.chunked = True
522
else:
523
if hdrs.CONTENT_LENGTH not in self.headers:
524
self.headers[hdrs.CONTENT_LENGTH] = str(size)
525
526
# copy payload headers
527
assert body.headers
528
for (key, value) in body.headers.items():
529
if key in self.headers:
530
continue
531
if key in self.skip_auto_headers:
532
continue
533
self.headers[key] = value
534
535
def update_expect_continue(self, expect: bool = False) -> None:
536
if expect:
537
self.headers[hdrs.EXPECT] = "100-continue"
538
elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
539
expect = True
540
541
if expect:
542
self._continue = self.loop.create_future()
543
544
def update_proxy(
545
self,
546
proxy: Optional[URL],
547
proxy_auth: Optional[BasicAuth],
548
proxy_headers: Optional[LooseHeaders],
549
) -> None:
550
if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
551
raise ValueError("proxy_auth must be None or BasicAuth() tuple")
552
self.proxy = proxy
553
self.proxy_auth = proxy_auth
554
self.proxy_headers = proxy_headers
555
556
def keep_alive(self) -> bool:
557
if self.version < HttpVersion10:
558
# keep alive not supported at all
559
return False
560
if self.version == HttpVersion10:
561
if self.headers.get(hdrs.CONNECTION) == "keep-alive":
562
return True
563
else: # no headers means we close for Http 1.0
564
return False
565
elif self.headers.get(hdrs.CONNECTION) == "close":
566
return False
567
568
return True
569
570
async def write_bytes(
571
self, writer: AbstractStreamWriter, conn: "Connection"
572
) -> None:
573
"""Support coroutines that yields bytes objects."""
574
# 100 response
575
if self._continue is not None:
576
await writer.drain()
577
await self._continue
578
579
protocol = conn.protocol
580
assert protocol is not None
581
try:
582
if isinstance(self.body, payload.Payload):
583
await self.body.write(writer)
584
else:
585
if isinstance(self.body, (bytes, bytearray)):
586
self.body = (self.body,) # type: ignore[assignment]
587
588
for chunk in self.body:
589
await writer.write(chunk) # type: ignore[arg-type]
590
591
await writer.write_eof()
592
except OSError as exc:
593
new_exc = ClientOSError(
594
exc.errno, "Can not write request body for %s" % self.url
595
)
596
new_exc.__context__ = exc
597
new_exc.__cause__ = exc
598
protocol.set_exception(new_exc)
599
except asyncio.CancelledError as exc:
600
if not conn.closed:
601
protocol.set_exception(exc)
602
except Exception as exc:
603
protocol.set_exception(exc)
604
finally:
605
self._writer = None
606
607
async def send(self, conn: "Connection") -> "ClientResponse":
608
# Specify request target:
609
# - CONNECT request must send authority form URI
610
# - not CONNECT proxy must send absolute form URI
611
# - most common is origin form URI
612
if self.method == hdrs.METH_CONNECT:
613
connect_host = self.url.raw_host
614
assert connect_host is not None
615
if helpers.is_ipv6_address(connect_host):
616
connect_host = f"[{connect_host}]"
617
path = f"{connect_host}:{self.url.port}"
618
elif self.proxy and not self.is_ssl():
619
path = str(self.url)
620
else:
621
path = self.url.raw_path
622
if self.url.raw_query_string:
623
path += "?" + self.url.raw_query_string
624
625
protocol = conn.protocol
626
assert protocol is not None
627
writer = StreamWriter(
628
protocol,
629
self.loop,
630
on_chunk_sent=functools.partial(
631
self._on_chunk_request_sent, self.method, self.url
632
),
633
on_headers_sent=functools.partial(
634
self._on_headers_request_sent, self.method, self.url
635
),
636
)
637
638
if self.compress:
639
writer.enable_compression(self.compress)
640
641
if self.chunked is not None:
642
writer.enable_chunking()
643
644
# set default content-type
645
if (
646
self.method in self.POST_METHODS
647
and hdrs.CONTENT_TYPE not in self.skip_auto_headers
648
and hdrs.CONTENT_TYPE not in self.headers
649
):
650
self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
651
652
# set the connection header
653
connection = self.headers.get(hdrs.CONNECTION)
654
if not connection:
655
if self.keep_alive():
656
if self.version == HttpVersion10:
657
connection = "keep-alive"
658
else:
659
if self.version == HttpVersion11:
660
connection = "close"
661
662
if connection is not None:
663
self.headers[hdrs.CONNECTION] = connection
664
665
# status + headers
666
status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format(
667
self.method, path, self.version
668
)
669
await writer.write_headers(status_line, self.headers)
670
671
self._writer = self.loop.create_task(self.write_bytes(writer, conn))
672
673
response_class = self.response_class
674
assert response_class is not None
675
self.response = response_class(
676
self.method,
677
self.original_url,
678
writer=self._writer,
679
continue100=self._continue,
680
timer=self._timer,
681
request_info=self.request_info,
682
traces=self._traces,
683
loop=self.loop,
684
session=self._session,
685
)
686
return self.response
687
688
async def close(self) -> None:
689
if self._writer is not None:
690
try:
691
await self._writer
692
finally:
693
self._writer = None
694
695
def terminate(self) -> None:
696
if self._writer is not None:
697
if not self.loop.is_closed():
698
self._writer.cancel()
699
self._writer = None
700
701
async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
702
for trace in self._traces:
703
await trace.send_request_chunk_sent(method, url, chunk)
704
705
async def _on_headers_request_sent(
706
self, method: str, url: URL, headers: "CIMultiDict[str]"
707
) -> None:
708
for trace in self._traces:
709
await trace.send_request_headers(method, url, headers)
710
711
712
class ClientResponse(HeadersMixin):
713
714
# from the Status-Line of the response
715
version = None # HTTP-Version
716
status = None # type: int # Status-Code
717
reason = None # Reason-Phrase
718
719
content = None # type: StreamReader # Payload stream
720
_headers = None # type: CIMultiDictProxy[str] # Response headers
721
_raw_headers = None # type: RawHeaders # Response raw headers
722
723
_connection = None # current connection
724
_source_traceback = None
725
# setted up by ClientRequest after ClientResponse object creation
726
# post-init stage allows to not change ctor signature
727
_closed = True # to allow __del__ for non-initialized properly response
728
_released = False
729
730
def __init__(
731
self,
732
method: str,
733
url: URL,
734
*,
735
writer: "asyncio.Task[None]",
736
continue100: Optional["asyncio.Future[bool]"],
737
timer: BaseTimerContext,
738
request_info: RequestInfo,
739
traces: List["Trace"],
740
loop: asyncio.AbstractEventLoop,
741
session: "ClientSession",
742
) -> None:
743
assert isinstance(url, URL)
744
745
self.method = method
746
self.cookies = SimpleCookie() # type: SimpleCookie[str]
747
748
self._real_url = url
749
self._url = url.with_fragment(None)
750
self._body = None # type: Any
751
self._writer = writer # type: Optional[asyncio.Task[None]]
752
self._continue = continue100 # None by default
753
self._closed = True
754
self._history = () # type: Tuple[ClientResponse, ...]
755
self._request_info = request_info
756
self._timer = timer if timer is not None else TimerNoop()
757
self._cache = {} # type: Dict[str, Any]
758
self._traces = traces
759
self._loop = loop
760
# store a reference to session #1985
761
self._session = session # type: Optional[ClientSession]
762
if loop.get_debug():
763
self._source_traceback = traceback.extract_stack(sys._getframe(1))
764
765
@reify
766
def url(self) -> URL:
767
return self._url
768
769
@reify
770
def url_obj(self) -> URL:
771
warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
772
return self._url
773
774
@reify
775
def real_url(self) -> URL:
776
return self._real_url
777
778
@reify
779
def host(self) -> str:
780
assert self._url.host is not None
781
return self._url.host
782
783
@reify
784
def headers(self) -> "CIMultiDictProxy[str]":
785
return self._headers
786
787
@reify
788
def raw_headers(self) -> RawHeaders:
789
return self._raw_headers
790
791
@reify
792
def request_info(self) -> RequestInfo:
793
return self._request_info
794
795
@reify
796
def content_disposition(self) -> Optional[ContentDisposition]:
797
raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
798
if raw is None:
799
return None
800
disposition_type, params_dct = multipart.parse_content_disposition(raw)
801
params = MappingProxyType(params_dct)
802
filename = multipart.content_disposition_filename(params)
803
return ContentDisposition(disposition_type, params, filename)
804
805
def __del__(self, _warnings: Any = warnings) -> None:
806
if self._closed:
807
return
808
809
if self._connection is not None:
810
self._connection.release()
811
self._cleanup_writer()
812
813
if self._loop.get_debug():
814
if PY_36:
815
kwargs = {"source": self}
816
else:
817
kwargs = {}
818
_warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
819
context = {"client_response": self, "message": "Unclosed response"}
820
if self._source_traceback:
821
context["source_traceback"] = self._source_traceback
822
self._loop.call_exception_handler(context)
823
824
def __repr__(self) -> str:
825
out = io.StringIO()
826
ascii_encodable_url = str(self.url)
827
if self.reason:
828
ascii_encodable_reason = self.reason.encode(
829
"ascii", "backslashreplace"
830
).decode("ascii")
831
else:
832
ascii_encodable_reason = self.reason
833
print(
834
"<ClientResponse({}) [{} {}]>".format(
835
ascii_encodable_url, self.status, ascii_encodable_reason
836
),
837
file=out,
838
)
839
print(self.headers, file=out)
840
return out.getvalue()
841
842
@property
843
def connection(self) -> Optional["Connection"]:
844
return self._connection
845
846
@reify
847
def history(self) -> Tuple["ClientResponse", ...]:
848
"""A sequence of of responses, if redirects occurred."""
849
return self._history
850
851
@reify
852
def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
853
links_str = ", ".join(self.headers.getall("link", []))
854
855
if not links_str:
856
return MultiDictProxy(MultiDict())
857
858
links = MultiDict() # type: MultiDict[MultiDictProxy[Union[str, URL]]]
859
860
for val in re.split(r",(?=\s*<)", links_str):
861
match = re.match(r"\s*<(.*)>(.*)", val)
862
if match is None: # pragma: no cover
863
# the check exists to suppress mypy error
864
continue
865
url, params_str = match.groups()
866
params = params_str.split(";")[1:]
867
868
link = MultiDict() # type: MultiDict[Union[str, URL]]
869
870
for param in params:
871
match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
872
if match is None: # pragma: no cover
873
# the check exists to suppress mypy error
874
continue
875
key, _, value, _ = match.groups()
876
877
link.add(key, value)
878
879
key = link.get("rel", url) # type: ignore[assignment]
880
881
link.add("url", self.url.join(URL(url)))
882
883
links.add(key, MultiDictProxy(link))
884
885
return MultiDictProxy(links)
886
887
async def start(self, connection: "Connection") -> "ClientResponse":
888
"""Start response processing."""
889
self._closed = False
890
self._protocol = connection.protocol
891
self._connection = connection
892
893
with self._timer:
894
while True:
895
# read response
896
try:
897
protocol = self._protocol
898
message, payload = await protocol.read() # type: ignore[union-attr]
899
except http.HttpProcessingError as exc:
900
raise ClientResponseError(
901
self.request_info,
902
self.history,
903
status=exc.code,
904
message=exc.message,
905
headers=exc.headers,
906
) from exc
907
908
if message.code < 100 or message.code > 199 or message.code == 101:
909
break
910
911
if self._continue is not None:
912
set_result(self._continue, True)
913
self._continue = None
914
915
# payload eof handler
916
payload.on_eof(self._response_eof)
917
918
# response status
919
self.version = message.version
920
self.status = message.code
921
self.reason = message.reason
922
923
# headers
924
self._headers = message.headers # type is CIMultiDictProxy
925
self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
926
927
# payload
928
self.content = payload
929
930
# cookies
931
for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
932
try:
933
self.cookies.load(hdr)
934
except CookieError as exc:
935
client_logger.warning("Can not load response cookies: %s", exc)
936
return self
937
938
def _response_eof(self) -> None:
939
if self._closed:
940
return
941
942
if self._connection is not None:
943
# websocket, protocol could be None because
944
# connection could be detached
945
if (
946
self._connection.protocol is not None
947
and self._connection.protocol.upgraded
948
):
949
return
950
951
self._connection.release()
952
self._connection = None
953
954
self._closed = True
955
self._cleanup_writer()
956
957
@property
958
def closed(self) -> bool:
959
return self._closed
960
961
def close(self) -> None:
962
if not self._released:
963
self._notify_content()
964
if self._closed:
965
return
966
967
self._closed = True
968
if self._loop is None or self._loop.is_closed():
969
return
970
971
if self._connection is not None:
972
self._connection.close()
973
self._connection = None
974
self._cleanup_writer()
975
976
def release(self) -> Any:
977
if not self._released:
978
self._notify_content()
979
if self._closed:
980
return noop()
981
982
self._closed = True
983
if self._connection is not None:
984
self._connection.release()
985
self._connection = None
986
987
self._cleanup_writer()
988
return noop()
989
990
@property
991
def ok(self) -> bool:
992
"""Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
993
994
This is **not** a check for ``200 OK`` but a check that the response
995
status is under 400.
996
"""
997
return 400 > self.status
998
999
def raise_for_status(self) -> None:
1000
if not self.ok:
1001
# reason should always be not None for a started response
1002
assert self.reason is not None
1003
self.release()
1004
raise ClientResponseError(
1005
self.request_info,
1006
self.history,
1007
status=self.status,
1008
message=self.reason,
1009
headers=self.headers,
1010
)
1011
1012
def _cleanup_writer(self) -> None:
1013
if self._writer is not None:
1014
self._writer.cancel()
1015
self._writer = None
1016
self._session = None
1017
1018
def _notify_content(self) -> None:
1019
content = self.content
1020
if content and content.exception() is None:
1021
content.set_exception(ClientConnectionError("Connection closed"))
1022
self._released = True
1023
1024
async def wait_for_close(self) -> None:
1025
if self._writer is not None:
1026
try:
1027
await self._writer
1028
finally:
1029
self._writer = None
1030
self.release()
1031
1032
async def read(self) -> bytes:
1033
"""Read response payload."""
1034
if self._body is None:
1035
try:
1036
self._body = await self.content.read()
1037
for trace in self._traces:
1038
await trace.send_response_chunk_received(
1039
self.method, self.url, self._body
1040
)
1041
except BaseException:
1042
self.close()
1043
raise
1044
elif self._released:
1045
raise ClientConnectionError("Connection closed")
1046
1047
return self._body # type: ignore[no-any-return]
1048
1049
def get_encoding(self) -> str:
1050
ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1051
mimetype = helpers.parse_mimetype(ctype)
1052
1053
encoding = mimetype.parameters.get("charset")
1054
if encoding:
1055
try:
1056
codecs.lookup(encoding)
1057
except LookupError:
1058
encoding = None
1059
if not encoding:
1060
if mimetype.type == "application" and (
1061
mimetype.subtype == "json" or mimetype.subtype == "rdap"
1062
):
1063
# RFC 7159 states that the default encoding is UTF-8.
1064
# RFC 7483 defines application/rdap+json
1065
encoding = "utf-8"
1066
elif self._body is None:
1067
raise RuntimeError(
1068
"Cannot guess the encoding of " "a not yet read body"
1069
)
1070
else:
1071
encoding = chardet.detect(self._body)["encoding"]
1072
if not encoding:
1073
encoding = "utf-8"
1074
1075
return encoding
1076
1077
async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1078
"""Read response payload and decode."""
1079
if self._body is None:
1080
await self.read()
1081
1082
if encoding is None:
1083
encoding = self.get_encoding()
1084
1085
return self._body.decode( # type: ignore[no-any-return,union-attr]
1086
encoding, errors=errors
1087
)
1088
1089
async def json(
1090
self,
1091
*,
1092
encoding: Optional[str] = None,
1093
loads: JSONDecoder = DEFAULT_JSON_DECODER,
1094
content_type: Optional[str] = "application/json",
1095
) -> Any:
1096
"""Read and decodes JSON response."""
1097
if self._body is None:
1098
await self.read()
1099
1100
if content_type:
1101
ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1102
if not _is_expected_content_type(ctype, content_type):
1103
raise ContentTypeError(
1104
self.request_info,
1105
self.history,
1106
message=(
1107
"Attempt to decode JSON with " "unexpected mimetype: %s" % ctype
1108
),
1109
headers=self.headers,
1110
)
1111
1112
stripped = self._body.strip() # type: ignore[union-attr]
1113
if not stripped:
1114
return None
1115
1116
if encoding is None:
1117
encoding = self.get_encoding()
1118
1119
return loads(stripped.decode(encoding))
1120
1121
async def __aenter__(self) -> "ClientResponse":
1122
return self
1123
1124
async def __aexit__(
1125
self,
1126
exc_type: Optional[Type[BaseException]],
1127
exc_val: Optional[BaseException],
1128
exc_tb: Optional[TracebackType],
1129
) -> None:
1130
# similar to _RequestContextManager, we do not need to check
1131
# for exceptions, response object can close connection
1132
# if state is broken
1133
self.release()
1134
1135