Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client.py
7730 views
1
"""HTTP Client for asyncio."""
2
3
import asyncio
4
import base64
5
import hashlib
6
import json
7
import os
8
import sys
9
import traceback
10
import warnings
11
from contextlib import suppress
12
from types import SimpleNamespace, TracebackType
13
from typing import (
14
Any,
15
Awaitable,
16
Callable,
17
Coroutine,
18
FrozenSet,
19
Generator,
20
Generic,
21
Iterable,
22
List,
23
Mapping,
24
Optional,
25
Set,
26
Tuple,
27
Type,
28
TypeVar,
29
Union,
30
)
31
32
import attr
33
from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
34
from yarl import URL
35
36
from . import hdrs, http, payload
37
from .abc import AbstractCookieJar
38
from .client_exceptions import (
39
ClientConnectionError as ClientConnectionError,
40
ClientConnectorCertificateError as ClientConnectorCertificateError,
41
ClientConnectorError as ClientConnectorError,
42
ClientConnectorSSLError as ClientConnectorSSLError,
43
ClientError as ClientError,
44
ClientHttpProxyError as ClientHttpProxyError,
45
ClientOSError as ClientOSError,
46
ClientPayloadError as ClientPayloadError,
47
ClientProxyConnectionError as ClientProxyConnectionError,
48
ClientResponseError as ClientResponseError,
49
ClientSSLError as ClientSSLError,
50
ContentTypeError as ContentTypeError,
51
InvalidURL as InvalidURL,
52
ServerConnectionError as ServerConnectionError,
53
ServerDisconnectedError as ServerDisconnectedError,
54
ServerFingerprintMismatch as ServerFingerprintMismatch,
55
ServerTimeoutError as ServerTimeoutError,
56
TooManyRedirects as TooManyRedirects,
57
WSServerHandshakeError as WSServerHandshakeError,
58
)
59
from .client_reqrep import (
60
ClientRequest as ClientRequest,
61
ClientResponse as ClientResponse,
62
Fingerprint as Fingerprint,
63
RequestInfo as RequestInfo,
64
_merge_ssl_params,
65
)
66
from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse
67
from .connector import (
68
BaseConnector as BaseConnector,
69
NamedPipeConnector as NamedPipeConnector,
70
TCPConnector as TCPConnector,
71
UnixConnector as UnixConnector,
72
)
73
from .cookiejar import CookieJar
74
from .helpers import (
75
DEBUG,
76
PY_36,
77
BasicAuth,
78
TimeoutHandle,
79
ceil_timeout,
80
get_env_proxy_for_url,
81
get_running_loop,
82
sentinel,
83
strip_auth_from_url,
84
)
85
from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
86
from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
87
from .streams import FlowControlDataQueue
88
from .tracing import Trace, TraceConfig
89
from .typedefs import Final, JSONEncoder, LooseCookies, LooseHeaders, StrOrURL
90
91
__all__ = (
92
# client_exceptions
93
"ClientConnectionError",
94
"ClientConnectorCertificateError",
95
"ClientConnectorError",
96
"ClientConnectorSSLError",
97
"ClientError",
98
"ClientHttpProxyError",
99
"ClientOSError",
100
"ClientPayloadError",
101
"ClientProxyConnectionError",
102
"ClientResponseError",
103
"ClientSSLError",
104
"ContentTypeError",
105
"InvalidURL",
106
"ServerConnectionError",
107
"ServerDisconnectedError",
108
"ServerFingerprintMismatch",
109
"ServerTimeoutError",
110
"TooManyRedirects",
111
"WSServerHandshakeError",
112
# client_reqrep
113
"ClientRequest",
114
"ClientResponse",
115
"Fingerprint",
116
"RequestInfo",
117
# connector
118
"BaseConnector",
119
"TCPConnector",
120
"UnixConnector",
121
"NamedPipeConnector",
122
# client_ws
123
"ClientWebSocketResponse",
124
# client
125
"ClientSession",
126
"ClientTimeout",
127
"request",
128
)
129
130
131
try:
132
from ssl import SSLContext
133
except ImportError: # pragma: no cover
134
SSLContext = object # type: ignore[misc,assignment]
135
136
137
@attr.s(auto_attribs=True, frozen=True, slots=True)
138
class ClientTimeout:
139
total: Optional[float] = None
140
connect: Optional[float] = None
141
sock_read: Optional[float] = None
142
sock_connect: Optional[float] = None
143
144
# pool_queue_timeout: Optional[float] = None
145
# dns_resolution_timeout: Optional[float] = None
146
# socket_connect_timeout: Optional[float] = None
147
# connection_acquiring_timeout: Optional[float] = None
148
# new_connection_timeout: Optional[float] = None
149
# http_header_timeout: Optional[float] = None
150
# response_body_timeout: Optional[float] = None
151
152
# to create a timeout specific for a single request, either
153
# - create a completely new one to overwrite the default
154
# - or use http://www.attrs.org/en/stable/api.html#attr.evolve
155
# to overwrite the defaults
156
157
158
# 5 Minute default read timeout
159
DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60)
160
161
_RetType = TypeVar("_RetType")
162
163
164
class ClientSession:
165
"""First-class interface for making HTTP requests."""
166
167
ATTRS = frozenset(
168
[
169
"_base_url",
170
"_source_traceback",
171
"_connector",
172
"requote_redirect_url",
173
"_loop",
174
"_cookie_jar",
175
"_connector_owner",
176
"_default_auth",
177
"_version",
178
"_json_serialize",
179
"_requote_redirect_url",
180
"_timeout",
181
"_raise_for_status",
182
"_auto_decompress",
183
"_trust_env",
184
"_default_headers",
185
"_skip_auto_headers",
186
"_request_class",
187
"_response_class",
188
"_ws_response_class",
189
"_trace_configs",
190
"_read_bufsize",
191
]
192
)
193
194
_source_traceback = None
195
196
def __init__(
197
self,
198
base_url: Optional[StrOrURL] = None,
199
*,
200
connector: Optional[BaseConnector] = None,
201
loop: Optional[asyncio.AbstractEventLoop] = None,
202
cookies: Optional[LooseCookies] = None,
203
headers: Optional[LooseHeaders] = None,
204
skip_auto_headers: Optional[Iterable[str]] = None,
205
auth: Optional[BasicAuth] = None,
206
json_serialize: JSONEncoder = json.dumps,
207
request_class: Type[ClientRequest] = ClientRequest,
208
response_class: Type[ClientResponse] = ClientResponse,
209
ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse,
210
version: HttpVersion = http.HttpVersion11,
211
cookie_jar: Optional[AbstractCookieJar] = None,
212
connector_owner: bool = True,
213
raise_for_status: bool = False,
214
read_timeout: Union[float, object] = sentinel,
215
conn_timeout: Optional[float] = None,
216
timeout: Union[object, ClientTimeout] = sentinel,
217
auto_decompress: bool = True,
218
trust_env: bool = False,
219
requote_redirect_url: bool = True,
220
trace_configs: Optional[List[TraceConfig]] = None,
221
read_bufsize: int = 2 ** 16,
222
) -> None:
223
if loop is None:
224
if connector is not None:
225
loop = connector._loop
226
227
loop = get_running_loop(loop)
228
229
if base_url is None or isinstance(base_url, URL):
230
self._base_url: Optional[URL] = base_url
231
else:
232
self._base_url = URL(base_url)
233
assert (
234
self._base_url.origin() == self._base_url
235
), "Only absolute URLs without path part are supported"
236
237
if connector is None:
238
connector = TCPConnector(loop=loop)
239
240
if connector._loop is not loop:
241
raise RuntimeError("Session and connector has to use same event loop")
242
243
self._loop = loop
244
245
if loop.get_debug():
246
self._source_traceback = traceback.extract_stack(sys._getframe(1))
247
248
if cookie_jar is None:
249
cookie_jar = CookieJar(loop=loop)
250
self._cookie_jar = cookie_jar
251
252
if cookies is not None:
253
self._cookie_jar.update_cookies(cookies)
254
255
self._connector = connector # type: Optional[BaseConnector]
256
self._connector_owner = connector_owner
257
self._default_auth = auth
258
self._version = version
259
self._json_serialize = json_serialize
260
if timeout is sentinel:
261
self._timeout = DEFAULT_TIMEOUT
262
if read_timeout is not sentinel:
263
warnings.warn(
264
"read_timeout is deprecated, " "use timeout argument instead",
265
DeprecationWarning,
266
stacklevel=2,
267
)
268
self._timeout = attr.evolve(self._timeout, total=read_timeout)
269
if conn_timeout is not None:
270
self._timeout = attr.evolve(self._timeout, connect=conn_timeout)
271
warnings.warn(
272
"conn_timeout is deprecated, " "use timeout argument instead",
273
DeprecationWarning,
274
stacklevel=2,
275
)
276
else:
277
self._timeout = timeout # type: ignore[assignment]
278
if read_timeout is not sentinel:
279
raise ValueError(
280
"read_timeout and timeout parameters "
281
"conflict, please setup "
282
"timeout.read"
283
)
284
if conn_timeout is not None:
285
raise ValueError(
286
"conn_timeout and timeout parameters "
287
"conflict, please setup "
288
"timeout.connect"
289
)
290
self._raise_for_status = raise_for_status
291
self._auto_decompress = auto_decompress
292
self._trust_env = trust_env
293
self._requote_redirect_url = requote_redirect_url
294
self._read_bufsize = read_bufsize
295
296
# Convert to list of tuples
297
if headers:
298
real_headers = CIMultiDict(headers) # type: CIMultiDict[str]
299
else:
300
real_headers = CIMultiDict()
301
self._default_headers = real_headers # type: CIMultiDict[str]
302
if skip_auto_headers is not None:
303
self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers)
304
else:
305
self._skip_auto_headers = frozenset()
306
307
self._request_class = request_class
308
self._response_class = response_class
309
self._ws_response_class = ws_response_class
310
311
self._trace_configs = trace_configs or []
312
for trace_config in self._trace_configs:
313
trace_config.freeze()
314
315
def __init_subclass__(cls: Type["ClientSession"]) -> None:
316
warnings.warn(
317
"Inheritance class {} from ClientSession "
318
"is discouraged".format(cls.__name__),
319
DeprecationWarning,
320
stacklevel=2,
321
)
322
323
if DEBUG:
324
325
def __setattr__(self, name: str, val: Any) -> None:
326
if name not in self.ATTRS:
327
warnings.warn(
328
"Setting custom ClientSession.{} attribute "
329
"is discouraged".format(name),
330
DeprecationWarning,
331
stacklevel=2,
332
)
333
super().__setattr__(name, val)
334
335
def __del__(self, _warnings: Any = warnings) -> None:
336
if not self.closed:
337
if PY_36:
338
kwargs = {"source": self}
339
else:
340
kwargs = {}
341
_warnings.warn(
342
f"Unclosed client session {self!r}", ResourceWarning, **kwargs
343
)
344
context = {"client_session": self, "message": "Unclosed client session"}
345
if self._source_traceback is not None:
346
context["source_traceback"] = self._source_traceback
347
self._loop.call_exception_handler(context)
348
349
def request(
350
self, method: str, url: StrOrURL, **kwargs: Any
351
) -> "_RequestContextManager":
352
"""Perform HTTP request."""
353
return _RequestContextManager(self._request(method, url, **kwargs))
354
355
def _build_url(self, str_or_url: StrOrURL) -> URL:
356
url = URL(str_or_url)
357
if self._base_url is None:
358
return url
359
else:
360
assert not url.is_absolute() and url.path.startswith("/")
361
return self._base_url.join(url)
362
363
async def _request(
364
self,
365
method: str,
366
str_or_url: StrOrURL,
367
*,
368
params: Optional[Mapping[str, str]] = None,
369
data: Any = None,
370
json: Any = None,
371
cookies: Optional[LooseCookies] = None,
372
headers: Optional[LooseHeaders] = None,
373
skip_auto_headers: Optional[Iterable[str]] = None,
374
auth: Optional[BasicAuth] = None,
375
allow_redirects: bool = True,
376
max_redirects: int = 10,
377
compress: Optional[str] = None,
378
chunked: Optional[bool] = None,
379
expect100: bool = False,
380
raise_for_status: Optional[bool] = None,
381
read_until_eof: bool = True,
382
proxy: Optional[StrOrURL] = None,
383
proxy_auth: Optional[BasicAuth] = None,
384
timeout: Union[ClientTimeout, object] = sentinel,
385
verify_ssl: Optional[bool] = None,
386
fingerprint: Optional[bytes] = None,
387
ssl_context: Optional[SSLContext] = None,
388
ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None,
389
proxy_headers: Optional[LooseHeaders] = None,
390
trace_request_ctx: Optional[SimpleNamespace] = None,
391
read_bufsize: Optional[int] = None,
392
) -> ClientResponse:
393
394
# NOTE: timeout clamps existing connect and read timeouts. We cannot
395
# set the default to None because we need to detect if the user wants
396
# to use the existing timeouts by setting timeout to None.
397
398
if self.closed:
399
raise RuntimeError("Session is closed")
400
401
ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
402
403
if data is not None and json is not None:
404
raise ValueError(
405
"data and json parameters can not be used at the same time"
406
)
407
elif json is not None:
408
data = payload.JsonPayload(json, dumps=self._json_serialize)
409
410
if not isinstance(chunked, bool) and chunked is not None:
411
warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
412
413
redirects = 0
414
history = []
415
version = self._version
416
417
# Merge with default headers and transform to CIMultiDict
418
headers = self._prepare_headers(headers)
419
proxy_headers = self._prepare_headers(proxy_headers)
420
421
try:
422
url = self._build_url(str_or_url)
423
except ValueError as e:
424
raise InvalidURL(str_or_url) from e
425
426
skip_headers = set(self._skip_auto_headers)
427
if skip_auto_headers is not None:
428
for i in skip_auto_headers:
429
skip_headers.add(istr(i))
430
431
if proxy is not None:
432
try:
433
proxy = URL(proxy)
434
except ValueError as e:
435
raise InvalidURL(proxy) from e
436
437
if timeout is sentinel:
438
real_timeout = self._timeout # type: ClientTimeout
439
else:
440
if not isinstance(timeout, ClientTimeout):
441
real_timeout = ClientTimeout(total=timeout) # type: ignore[arg-type]
442
else:
443
real_timeout = timeout
444
# timeout is cumulative for all request operations
445
# (request, redirects, responses, data consuming)
446
tm = TimeoutHandle(self._loop, real_timeout.total)
447
handle = tm.start()
448
449
if read_bufsize is None:
450
read_bufsize = self._read_bufsize
451
452
traces = [
453
Trace(
454
self,
455
trace_config,
456
trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
457
)
458
for trace_config in self._trace_configs
459
]
460
461
for trace in traces:
462
await trace.send_request_start(method, url.update_query(params), headers)
463
464
timer = tm.timer()
465
try:
466
with timer:
467
while True:
468
url, auth_from_url = strip_auth_from_url(url)
469
if auth and auth_from_url:
470
raise ValueError(
471
"Cannot combine AUTH argument with "
472
"credentials encoded in URL"
473
)
474
475
if auth is None:
476
auth = auth_from_url
477
if auth is None:
478
auth = self._default_auth
479
# It would be confusing if we support explicit
480
# Authorization header with auth argument
481
if (
482
headers is not None
483
and auth is not None
484
and hdrs.AUTHORIZATION in headers
485
):
486
raise ValueError(
487
"Cannot combine AUTHORIZATION header "
488
"with AUTH argument or credentials "
489
"encoded in URL"
490
)
491
492
all_cookies = self._cookie_jar.filter_cookies(url)
493
494
if cookies is not None:
495
tmp_cookie_jar = CookieJar()
496
tmp_cookie_jar.update_cookies(cookies)
497
req_cookies = tmp_cookie_jar.filter_cookies(url)
498
if req_cookies:
499
all_cookies.load(req_cookies)
500
501
if proxy is not None:
502
proxy = URL(proxy)
503
elif self._trust_env:
504
with suppress(LookupError):
505
proxy, proxy_auth = get_env_proxy_for_url(url)
506
507
req = self._request_class(
508
method,
509
url,
510
params=params,
511
headers=headers,
512
skip_auto_headers=skip_headers,
513
data=data,
514
cookies=all_cookies,
515
auth=auth,
516
version=version,
517
compress=compress,
518
chunked=chunked,
519
expect100=expect100,
520
loop=self._loop,
521
response_class=self._response_class,
522
proxy=proxy,
523
proxy_auth=proxy_auth,
524
timer=timer,
525
session=self,
526
ssl=ssl,
527
proxy_headers=proxy_headers,
528
traces=traces,
529
)
530
531
# connection timeout
532
try:
533
async with ceil_timeout(real_timeout.connect):
534
assert self._connector is not None
535
conn = await self._connector.connect(
536
req, traces=traces, timeout=real_timeout
537
)
538
except asyncio.TimeoutError as exc:
539
raise ServerTimeoutError(
540
"Connection timeout " "to host {}".format(url)
541
) from exc
542
543
assert conn.transport is not None
544
545
assert conn.protocol is not None
546
conn.protocol.set_response_params(
547
timer=timer,
548
skip_payload=method.upper() == "HEAD",
549
read_until_eof=read_until_eof,
550
auto_decompress=self._auto_decompress,
551
read_timeout=real_timeout.sock_read,
552
read_bufsize=read_bufsize,
553
)
554
555
try:
556
try:
557
resp = await req.send(conn)
558
try:
559
await resp.start(conn)
560
except BaseException:
561
resp.close()
562
raise
563
except BaseException:
564
conn.close()
565
raise
566
except ClientError:
567
raise
568
except OSError as exc:
569
raise ClientOSError(*exc.args) from exc
570
571
self._cookie_jar.update_cookies(resp.cookies, resp.url)
572
573
# redirects
574
if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
575
576
for trace in traces:
577
await trace.send_request_redirect(
578
method, url.update_query(params), headers, resp
579
)
580
581
redirects += 1
582
history.append(resp)
583
if max_redirects and redirects >= max_redirects:
584
resp.close()
585
raise TooManyRedirects(
586
history[0].request_info, tuple(history)
587
)
588
589
# For 301 and 302, mimic IE, now changed in RFC
590
# https://github.com/kennethreitz/requests/pull/269
591
if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
592
resp.status in (301, 302) and resp.method == hdrs.METH_POST
593
):
594
method = hdrs.METH_GET
595
data = None
596
if headers.get(hdrs.CONTENT_LENGTH):
597
headers.pop(hdrs.CONTENT_LENGTH)
598
599
r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
600
hdrs.URI
601
)
602
if r_url is None:
603
# see github.com/aio-libs/aiohttp/issues/2022
604
break
605
else:
606
# reading from correct redirection
607
# response is forbidden
608
resp.release()
609
610
try:
611
parsed_url = URL(
612
r_url, encoded=not self._requote_redirect_url
613
)
614
615
except ValueError as e:
616
raise InvalidURL(r_url) from e
617
618
scheme = parsed_url.scheme
619
if scheme not in ("http", "https", ""):
620
resp.close()
621
raise ValueError("Can redirect only to http or https")
622
elif not scheme:
623
parsed_url = url.join(parsed_url)
624
625
if url.origin() != parsed_url.origin():
626
auth = None
627
headers.pop(hdrs.AUTHORIZATION, None)
628
629
url = parsed_url
630
params = None
631
resp.release()
632
continue
633
634
break
635
636
# check response status
637
if raise_for_status is None:
638
raise_for_status = self._raise_for_status
639
if raise_for_status:
640
resp.raise_for_status()
641
642
# register connection
643
if handle is not None:
644
if resp.connection is not None:
645
resp.connection.add_callback(handle.cancel)
646
else:
647
handle.cancel()
648
649
resp._history = tuple(history)
650
651
for trace in traces:
652
await trace.send_request_end(
653
method, url.update_query(params), headers, resp
654
)
655
return resp
656
657
except BaseException as e:
658
# cleanup timer
659
tm.close()
660
if handle:
661
handle.cancel()
662
handle = None
663
664
for trace in traces:
665
await trace.send_request_exception(
666
method, url.update_query(params), headers, e
667
)
668
raise
669
670
def ws_connect(
671
self,
672
url: StrOrURL,
673
*,
674
method: str = hdrs.METH_GET,
675
protocols: Iterable[str] = (),
676
timeout: float = 10.0,
677
receive_timeout: Optional[float] = None,
678
autoclose: bool = True,
679
autoping: bool = True,
680
heartbeat: Optional[float] = None,
681
auth: Optional[BasicAuth] = None,
682
origin: Optional[str] = None,
683
params: Optional[Mapping[str, str]] = None,
684
headers: Optional[LooseHeaders] = None,
685
proxy: Optional[StrOrURL] = None,
686
proxy_auth: Optional[BasicAuth] = None,
687
ssl: Union[SSLContext, bool, None, Fingerprint] = None,
688
verify_ssl: Optional[bool] = None,
689
fingerprint: Optional[bytes] = None,
690
ssl_context: Optional[SSLContext] = None,
691
proxy_headers: Optional[LooseHeaders] = None,
692
compress: int = 0,
693
max_msg_size: int = 4 * 1024 * 1024,
694
) -> "_WSRequestContextManager":
695
"""Initiate websocket connection."""
696
return _WSRequestContextManager(
697
self._ws_connect(
698
url,
699
method=method,
700
protocols=protocols,
701
timeout=timeout,
702
receive_timeout=receive_timeout,
703
autoclose=autoclose,
704
autoping=autoping,
705
heartbeat=heartbeat,
706
auth=auth,
707
origin=origin,
708
params=params,
709
headers=headers,
710
proxy=proxy,
711
proxy_auth=proxy_auth,
712
ssl=ssl,
713
verify_ssl=verify_ssl,
714
fingerprint=fingerprint,
715
ssl_context=ssl_context,
716
proxy_headers=proxy_headers,
717
compress=compress,
718
max_msg_size=max_msg_size,
719
)
720
)
721
722
async def _ws_connect(
723
self,
724
url: StrOrURL,
725
*,
726
method: str = hdrs.METH_GET,
727
protocols: Iterable[str] = (),
728
timeout: float = 10.0,
729
receive_timeout: Optional[float] = None,
730
autoclose: bool = True,
731
autoping: bool = True,
732
heartbeat: Optional[float] = None,
733
auth: Optional[BasicAuth] = None,
734
origin: Optional[str] = None,
735
params: Optional[Mapping[str, str]] = None,
736
headers: Optional[LooseHeaders] = None,
737
proxy: Optional[StrOrURL] = None,
738
proxy_auth: Optional[BasicAuth] = None,
739
ssl: Union[SSLContext, bool, None, Fingerprint] = None,
740
verify_ssl: Optional[bool] = None,
741
fingerprint: Optional[bytes] = None,
742
ssl_context: Optional[SSLContext] = None,
743
proxy_headers: Optional[LooseHeaders] = None,
744
compress: int = 0,
745
max_msg_size: int = 4 * 1024 * 1024,
746
) -> ClientWebSocketResponse:
747
748
if headers is None:
749
real_headers = CIMultiDict() # type: CIMultiDict[str]
750
else:
751
real_headers = CIMultiDict(headers)
752
753
default_headers = {
754
hdrs.UPGRADE: "websocket",
755
hdrs.CONNECTION: "upgrade",
756
hdrs.SEC_WEBSOCKET_VERSION: "13",
757
}
758
759
for key, value in default_headers.items():
760
real_headers.setdefault(key, value)
761
762
sec_key = base64.b64encode(os.urandom(16))
763
real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode()
764
765
if protocols:
766
real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols)
767
if origin is not None:
768
real_headers[hdrs.ORIGIN] = origin
769
if compress:
770
extstr = ws_ext_gen(compress=compress)
771
real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
772
773
ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
774
775
# send request
776
resp = await self.request(
777
method,
778
url,
779
params=params,
780
headers=real_headers,
781
read_until_eof=False,
782
auth=auth,
783
proxy=proxy,
784
proxy_auth=proxy_auth,
785
ssl=ssl,
786
proxy_headers=proxy_headers,
787
)
788
789
try:
790
# check handshake
791
if resp.status != 101:
792
raise WSServerHandshakeError(
793
resp.request_info,
794
resp.history,
795
message="Invalid response status",
796
status=resp.status,
797
headers=resp.headers,
798
)
799
800
if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket":
801
raise WSServerHandshakeError(
802
resp.request_info,
803
resp.history,
804
message="Invalid upgrade header",
805
status=resp.status,
806
headers=resp.headers,
807
)
808
809
if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade":
810
raise WSServerHandshakeError(
811
resp.request_info,
812
resp.history,
813
message="Invalid connection header",
814
status=resp.status,
815
headers=resp.headers,
816
)
817
818
# key calculation
819
r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "")
820
match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode()
821
if r_key != match:
822
raise WSServerHandshakeError(
823
resp.request_info,
824
resp.history,
825
message="Invalid challenge response",
826
status=resp.status,
827
headers=resp.headers,
828
)
829
830
# websocket protocol
831
protocol = None
832
if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
833
resp_protocols = [
834
proto.strip()
835
for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
836
]
837
838
for proto in resp_protocols:
839
if proto in protocols:
840
protocol = proto
841
break
842
843
# websocket compress
844
notakeover = False
845
if compress:
846
compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
847
if compress_hdrs:
848
try:
849
compress, notakeover = ws_ext_parse(compress_hdrs)
850
except WSHandshakeError as exc:
851
raise WSServerHandshakeError(
852
resp.request_info,
853
resp.history,
854
message=exc.args[0],
855
status=resp.status,
856
headers=resp.headers,
857
) from exc
858
else:
859
compress = 0
860
notakeover = False
861
862
conn = resp.connection
863
assert conn is not None
864
conn_proto = conn.protocol
865
assert conn_proto is not None
866
transport = conn.transport
867
assert transport is not None
868
reader = FlowControlDataQueue(
869
conn_proto, 2 ** 16, loop=self._loop
870
) # type: FlowControlDataQueue[WSMessage]
871
conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
872
writer = WebSocketWriter(
873
conn_proto,
874
transport,
875
use_mask=True,
876
compress=compress,
877
notakeover=notakeover,
878
)
879
except BaseException:
880
resp.close()
881
raise
882
else:
883
return self._ws_response_class(
884
reader,
885
writer,
886
protocol,
887
resp,
888
timeout,
889
autoclose,
890
autoping,
891
self._loop,
892
receive_timeout=receive_timeout,
893
heartbeat=heartbeat,
894
compress=compress,
895
client_notakeover=notakeover,
896
)
897
898
def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]":
899
"""Add default headers and transform it to CIMultiDict"""
900
# Convert headers to MultiDict
901
result = CIMultiDict(self._default_headers)
902
if headers:
903
if not isinstance(headers, (MultiDictProxy, MultiDict)):
904
headers = CIMultiDict(headers)
905
added_names = set() # type: Set[str]
906
for key, value in headers.items():
907
if key in added_names:
908
result.add(key, value)
909
else:
910
result[key] = value
911
added_names.add(key)
912
return result
913
914
def get(
915
self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
916
) -> "_RequestContextManager":
917
"""Perform HTTP GET request."""
918
return _RequestContextManager(
919
self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs)
920
)
921
922
def options(
923
self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
924
) -> "_RequestContextManager":
925
"""Perform HTTP OPTIONS request."""
926
return _RequestContextManager(
927
self._request(
928
hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs
929
)
930
)
931
932
def head(
933
self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any
934
) -> "_RequestContextManager":
935
"""Perform HTTP HEAD request."""
936
return _RequestContextManager(
937
self._request(
938
hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs
939
)
940
)
941
942
def post(
943
self, url: StrOrURL, *, data: Any = None, **kwargs: Any
944
) -> "_RequestContextManager":
945
"""Perform HTTP POST request."""
946
return _RequestContextManager(
947
self._request(hdrs.METH_POST, url, data=data, **kwargs)
948
)
949
950
def put(
951
self, url: StrOrURL, *, data: Any = None, **kwargs: Any
952
) -> "_RequestContextManager":
953
"""Perform HTTP PUT request."""
954
return _RequestContextManager(
955
self._request(hdrs.METH_PUT, url, data=data, **kwargs)
956
)
957
958
def patch(
959
self, url: StrOrURL, *, data: Any = None, **kwargs: Any
960
) -> "_RequestContextManager":
961
"""Perform HTTP PATCH request."""
962
return _RequestContextManager(
963
self._request(hdrs.METH_PATCH, url, data=data, **kwargs)
964
)
965
966
def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager":
967
"""Perform HTTP DELETE request."""
968
return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs))
969
970
async def close(self) -> None:
971
"""Close underlying connector.
972
973
Release all acquired resources.
974
"""
975
if not self.closed:
976
if self._connector is not None and self._connector_owner:
977
await self._connector.close()
978
self._connector = None
979
980
@property
981
def closed(self) -> bool:
982
"""Is client session closed.
983
984
A readonly property.
985
"""
986
return self._connector is None or self._connector.closed
987
988
@property
989
def connector(self) -> Optional[BaseConnector]:
990
"""Connector instance used for the session."""
991
return self._connector
992
993
@property
994
def cookie_jar(self) -> AbstractCookieJar:
995
"""The session cookies."""
996
return self._cookie_jar
997
998
@property
999
def version(self) -> Tuple[int, int]:
1000
"""The session HTTP protocol version."""
1001
return self._version
1002
1003
@property
1004
def requote_redirect_url(self) -> bool:
1005
"""Do URL requoting on redirection handling."""
1006
return self._requote_redirect_url
1007
1008
@requote_redirect_url.setter
1009
def requote_redirect_url(self, val: bool) -> None:
1010
"""Do URL requoting on redirection handling."""
1011
warnings.warn(
1012
"session.requote_redirect_url modification " "is deprecated #2778",
1013
DeprecationWarning,
1014
stacklevel=2,
1015
)
1016
self._requote_redirect_url = val
1017
1018
@property
1019
def loop(self) -> asyncio.AbstractEventLoop:
1020
"""Session's loop."""
1021
warnings.warn(
1022
"client.loop property is deprecated", DeprecationWarning, stacklevel=2
1023
)
1024
return self._loop
1025
1026
@property
1027
def timeout(self) -> Union[object, ClientTimeout]:
1028
"""Timeout for the session."""
1029
return self._timeout
1030
1031
@property
1032
def headers(self) -> "CIMultiDict[str]":
1033
"""The default headers of the client session."""
1034
return self._default_headers
1035
1036
@property
1037
def skip_auto_headers(self) -> FrozenSet[istr]:
1038
"""Headers for which autogeneration should be skipped"""
1039
return self._skip_auto_headers
1040
1041
@property
1042
def auth(self) -> Optional[BasicAuth]:
1043
"""An object that represents HTTP Basic Authorization"""
1044
return self._default_auth
1045
1046
@property
1047
def json_serialize(self) -> JSONEncoder:
1048
"""Json serializer callable"""
1049
return self._json_serialize
1050
1051
@property
1052
def connector_owner(self) -> bool:
1053
"""Should connector be closed on session closing"""
1054
return self._connector_owner
1055
1056
@property
1057
def raise_for_status(
1058
self,
1059
) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]:
1060
"""Should `ClientResponse.raise_for_status()` be called for each response."""
1061
return self._raise_for_status
1062
1063
@property
1064
def auto_decompress(self) -> bool:
1065
"""Should the body response be automatically decompressed."""
1066
return self._auto_decompress
1067
1068
@property
1069
def trust_env(self) -> bool:
1070
"""
1071
Should proxies information from environment or netrc be trusted.
1072
1073
Information is from HTTP_PROXY / HTTPS_PROXY environment variables
1074
or ~/.netrc file if present.
1075
"""
1076
return self._trust_env
1077
1078
@property
1079
def trace_configs(self) -> List[TraceConfig]:
1080
"""A list of TraceConfig instances used for client tracing"""
1081
return self._trace_configs
1082
1083
def detach(self) -> None:
1084
"""Detach connector from session without closing the former.
1085
1086
Session is switched to closed state anyway.
1087
"""
1088
self._connector = None
1089
1090
def __enter__(self) -> None:
1091
raise TypeError("Use async with instead")
1092
1093
def __exit__(
1094
self,
1095
exc_type: Optional[Type[BaseException]],
1096
exc_val: Optional[BaseException],
1097
exc_tb: Optional[TracebackType],
1098
) -> None:
1099
# __exit__ should exist in pair with __enter__ but never executed
1100
pass # pragma: no cover
1101
1102
async def __aenter__(self) -> "ClientSession":
1103
return self
1104
1105
async def __aexit__(
1106
self,
1107
exc_type: Optional[Type[BaseException]],
1108
exc_val: Optional[BaseException],
1109
exc_tb: Optional[TracebackType],
1110
) -> None:
1111
await self.close()
1112
1113
1114
class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]):
1115
1116
__slots__ = ("_coro", "_resp")
1117
1118
def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None:
1119
self._coro = coro
1120
1121
def send(self, arg: None) -> "asyncio.Future[Any]":
1122
return self._coro.send(arg)
1123
1124
def throw(self, arg: BaseException) -> None: # type: ignore[arg-type,override]
1125
self._coro.throw(arg)
1126
1127
def close(self) -> None:
1128
return self._coro.close()
1129
1130
def __await__(self) -> Generator[Any, None, _RetType]:
1131
ret = self._coro.__await__()
1132
return ret
1133
1134
def __iter__(self) -> Generator[Any, None, _RetType]:
1135
return self.__await__()
1136
1137
async def __aenter__(self) -> _RetType:
1138
self._resp = await self._coro
1139
return self._resp
1140
1141
1142
class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
1143
__slots__ = ()
1144
1145
async def __aexit__(
1146
self,
1147
exc_type: Optional[Type[BaseException]],
1148
exc: Optional[BaseException],
1149
tb: Optional[TracebackType],
1150
) -> None:
1151
# We're basing behavior on the exception as it can be caused by
1152
# user code unrelated to the status of the connection. If you
1153
# would like to close a connection you must do that
1154
# explicitly. Otherwise connection error handling should kick in
1155
# and close/recycle the connection as required.
1156
self._resp.release()
1157
1158
1159
class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]):
1160
__slots__ = ()
1161
1162
async def __aexit__(
1163
self,
1164
exc_type: Optional[Type[BaseException]],
1165
exc: Optional[BaseException],
1166
tb: Optional[TracebackType],
1167
) -> None:
1168
await self._resp.close()
1169
1170
1171
class _SessionRequestContextManager:
1172
1173
__slots__ = ("_coro", "_resp", "_session")
1174
1175
def __init__(
1176
self,
1177
coro: Coroutine["asyncio.Future[Any]", None, ClientResponse],
1178
session: ClientSession,
1179
) -> None:
1180
self._coro = coro
1181
self._resp = None # type: Optional[ClientResponse]
1182
self._session = session
1183
1184
async def __aenter__(self) -> ClientResponse:
1185
try:
1186
self._resp = await self._coro
1187
except BaseException:
1188
await self._session.close()
1189
raise
1190
else:
1191
return self._resp
1192
1193
async def __aexit__(
1194
self,
1195
exc_type: Optional[Type[BaseException]],
1196
exc: Optional[BaseException],
1197
tb: Optional[TracebackType],
1198
) -> None:
1199
assert self._resp is not None
1200
self._resp.close()
1201
await self._session.close()
1202
1203
1204
def request(
1205
method: str,
1206
url: StrOrURL,
1207
*,
1208
params: Optional[Mapping[str, str]] = None,
1209
data: Any = None,
1210
json: Any = None,
1211
headers: Optional[LooseHeaders] = None,
1212
skip_auto_headers: Optional[Iterable[str]] = None,
1213
auth: Optional[BasicAuth] = None,
1214
allow_redirects: bool = True,
1215
max_redirects: int = 10,
1216
compress: Optional[str] = None,
1217
chunked: Optional[bool] = None,
1218
expect100: bool = False,
1219
raise_for_status: Optional[bool] = None,
1220
read_until_eof: bool = True,
1221
proxy: Optional[StrOrURL] = None,
1222
proxy_auth: Optional[BasicAuth] = None,
1223
timeout: Union[ClientTimeout, object] = sentinel,
1224
cookies: Optional[LooseCookies] = None,
1225
version: HttpVersion = http.HttpVersion11,
1226
connector: Optional[BaseConnector] = None,
1227
read_bufsize: Optional[int] = None,
1228
loop: Optional[asyncio.AbstractEventLoop] = None,
1229
) -> _SessionRequestContextManager:
1230
"""Constructs and sends a request.
1231
1232
Returns response object.
1233
method - HTTP method
1234
url - request url
1235
params - (optional) Dictionary or bytes to be sent in the query
1236
string of the new request
1237
data - (optional) Dictionary, bytes, or file-like object to
1238
send in the body of the request
1239
json - (optional) Any json compatible python object
1240
headers - (optional) Dictionary of HTTP Headers to send with
1241
the request
1242
cookies - (optional) Dict object to send with the request
1243
auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
1244
auth - aiohttp.helpers.BasicAuth
1245
allow_redirects - (optional) If set to False, do not follow
1246
redirects
1247
version - Request HTTP version.
1248
compress - Set to True if request has to be compressed
1249
with deflate encoding.
1250
chunked - Set to chunk size for chunked transfer encoding.
1251
expect100 - Expect 100-continue response from server.
1252
connector - BaseConnector sub-class instance to support
1253
connection pooling.
1254
read_until_eof - Read response until eof if response
1255
does not have Content-Length header.
1256
loop - Optional event loop.
1257
timeout - Optional ClientTimeout settings structure, 5min
1258
total timeout by default.
1259
Usage::
1260
>>> import aiohttp
1261
>>> resp = await aiohttp.request('GET', 'http://python.org/')
1262
>>> resp
1263
<ClientResponse(python.org/) [200]>
1264
>>> data = await resp.read()
1265
"""
1266
connector_owner = False
1267
if connector is None:
1268
connector_owner = True
1269
connector = TCPConnector(loop=loop, force_close=True)
1270
1271
session = ClientSession(
1272
loop=loop,
1273
cookies=cookies,
1274
version=version,
1275
timeout=timeout,
1276
connector=connector,
1277
connector_owner=connector_owner,
1278
)
1279
1280
return _SessionRequestContextManager(
1281
session._request(
1282
method,
1283
url,
1284
params=params,
1285
data=data,
1286
json=json,
1287
headers=headers,
1288
skip_auto_headers=skip_auto_headers,
1289
auth=auth,
1290
allow_redirects=allow_redirects,
1291
max_redirects=max_redirects,
1292
compress=compress,
1293
chunked=chunked,
1294
expect100=expect100,
1295
raise_for_status=raise_for_status,
1296
read_until_eof=read_until_eof,
1297
proxy=proxy,
1298
proxy_auth=proxy_auth,
1299
read_bufsize=read_bufsize,
1300
),
1301
session,
1302
)
1303
1304