Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/connector.py
7763 views
1
import asyncio
2
import functools
3
import random
4
import sys
5
import traceback
6
import warnings
7
from collections import defaultdict, deque
8
from contextlib import suppress
9
from http.cookies import SimpleCookie
10
from itertools import cycle, islice
11
from time import monotonic
12
from types import TracebackType
13
from typing import (
14
TYPE_CHECKING,
15
Any,
16
Awaitable,
17
Callable,
18
DefaultDict,
19
Dict,
20
Iterator,
21
List,
22
Optional,
23
Set,
24
Tuple,
25
Type,
26
Union,
27
cast,
28
)
29
30
import attr
31
32
from . import hdrs, helpers
33
from .abc import AbstractResolver
34
from .client_exceptions import (
35
ClientConnectionError,
36
ClientConnectorCertificateError,
37
ClientConnectorError,
38
ClientConnectorSSLError,
39
ClientHttpProxyError,
40
ClientProxyConnectionError,
41
ServerFingerprintMismatch,
42
UnixClientConnectorError,
43
cert_errors,
44
ssl_errors,
45
)
46
from .client_proto import ResponseHandler
47
from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
48
from .helpers import (
49
PY_36,
50
ceil_timeout,
51
get_running_loop,
52
is_ip_address,
53
noop,
54
sentinel,
55
)
56
from .http import RESPONSES
57
from .locks import EventResultOrError
58
from .resolver import DefaultResolver
59
60
try:
61
import ssl
62
63
SSLContext = ssl.SSLContext
64
except ImportError: # pragma: no cover
65
ssl = None # type: ignore[assignment]
66
SSLContext = object # type: ignore[misc,assignment]
67
68
69
__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
70
71
72
if TYPE_CHECKING: # pragma: no cover
73
from .client import ClientTimeout
74
from .client_reqrep import ConnectionKey
75
from .tracing import Trace
76
77
78
class _DeprecationWaiter:
79
__slots__ = ("_awaitable", "_awaited")
80
81
def __init__(self, awaitable: Awaitable[Any]) -> None:
82
self._awaitable = awaitable
83
self._awaited = False
84
85
def __await__(self) -> Any:
86
self._awaited = True
87
return self._awaitable.__await__()
88
89
def __del__(self) -> None:
90
if not self._awaited:
91
warnings.warn(
92
"Connector.close() is a coroutine, "
93
"please use await connector.close()",
94
DeprecationWarning,
95
)
96
97
98
class Connection:
99
100
_source_traceback = None
101
_transport = None
102
103
def __init__(
104
self,
105
connector: "BaseConnector",
106
key: "ConnectionKey",
107
protocol: ResponseHandler,
108
loop: asyncio.AbstractEventLoop,
109
) -> None:
110
self._key = key
111
self._connector = connector
112
self._loop = loop
113
self._protocol = protocol # type: Optional[ResponseHandler]
114
self._callbacks = [] # type: List[Callable[[], None]]
115
116
if loop.get_debug():
117
self._source_traceback = traceback.extract_stack(sys._getframe(1))
118
119
def __repr__(self) -> str:
120
return f"Connection<{self._key}>"
121
122
def __del__(self, _warnings: Any = warnings) -> None:
123
if self._protocol is not None:
124
if PY_36:
125
kwargs = {"source": self}
126
else:
127
kwargs = {}
128
_warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
129
if self._loop.is_closed():
130
return
131
132
self._connector._release(self._key, self._protocol, should_close=True)
133
134
context = {"client_connection": self, "message": "Unclosed connection"}
135
if self._source_traceback is not None:
136
context["source_traceback"] = self._source_traceback
137
self._loop.call_exception_handler(context)
138
139
@property
140
def loop(self) -> asyncio.AbstractEventLoop:
141
warnings.warn(
142
"connector.loop property is deprecated", DeprecationWarning, stacklevel=2
143
)
144
return self._loop
145
146
@property
147
def transport(self) -> Optional[asyncio.Transport]:
148
if self._protocol is None:
149
return None
150
return self._protocol.transport
151
152
@property
153
def protocol(self) -> Optional[ResponseHandler]:
154
return self._protocol
155
156
def add_callback(self, callback: Callable[[], None]) -> None:
157
if callback is not None:
158
self._callbacks.append(callback)
159
160
def _notify_release(self) -> None:
161
callbacks, self._callbacks = self._callbacks[:], []
162
163
for cb in callbacks:
164
with suppress(Exception):
165
cb()
166
167
def close(self) -> None:
168
self._notify_release()
169
170
if self._protocol is not None:
171
self._connector._release(self._key, self._protocol, should_close=True)
172
self._protocol = None
173
174
def release(self) -> None:
175
self._notify_release()
176
177
if self._protocol is not None:
178
self._connector._release(
179
self._key, self._protocol, should_close=self._protocol.should_close
180
)
181
self._protocol = None
182
183
@property
184
def closed(self) -> bool:
185
return self._protocol is None or not self._protocol.is_connected()
186
187
188
class _TransportPlaceholder:
189
"""placeholder for BaseConnector.connect function"""
190
191
def close(self) -> None:
192
pass
193
194
195
class BaseConnector:
196
"""Base connector class.
197
198
keepalive_timeout - (optional) Keep-alive timeout.
199
force_close - Set to True to force close and do reconnect
200
after each request (and between redirects).
201
limit - The total number of simultaneous connections.
202
limit_per_host - Number of simultaneous connections to one host.
203
enable_cleanup_closed - Enables clean-up closed ssl transports.
204
Disabled by default.
205
loop - Optional event loop.
206
"""
207
208
_closed = True # prevent AttributeError in __del__ if ctor was failed
209
_source_traceback = None
210
211
# abort transport after 2 seconds (cleanup broken connections)
212
_cleanup_closed_period = 2.0
213
214
def __init__(
215
self,
216
*,
217
keepalive_timeout: Union[object, None, float] = sentinel,
218
force_close: bool = False,
219
limit: int = 100,
220
limit_per_host: int = 0,
221
enable_cleanup_closed: bool = False,
222
loop: Optional[asyncio.AbstractEventLoop] = None,
223
) -> None:
224
225
if force_close:
226
if keepalive_timeout is not None and keepalive_timeout is not sentinel:
227
raise ValueError(
228
"keepalive_timeout cannot " "be set if force_close is True"
229
)
230
else:
231
if keepalive_timeout is sentinel:
232
keepalive_timeout = 15.0
233
234
loop = get_running_loop(loop)
235
236
self._closed = False
237
if loop.get_debug():
238
self._source_traceback = traceback.extract_stack(sys._getframe(1))
239
240
self._conns = (
241
{}
242
) # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]]
243
self._limit = limit
244
self._limit_per_host = limit_per_host
245
self._acquired = set() # type: Set[ResponseHandler]
246
self._acquired_per_host = defaultdict(
247
set
248
) # type: DefaultDict[ConnectionKey, Set[ResponseHandler]]
249
self._keepalive_timeout = cast(float, keepalive_timeout)
250
self._force_close = force_close
251
252
# {host_key: FIFO list of waiters}
253
self._waiters = defaultdict(deque) # type: ignore[var-annotated]
254
255
self._loop = loop
256
self._factory = functools.partial(ResponseHandler, loop=loop)
257
258
self.cookies = SimpleCookie() # type: SimpleCookie[str]
259
260
# start keep-alive connection cleanup task
261
self._cleanup_handle: Optional[asyncio.TimerHandle] = None
262
263
# start cleanup closed transports task
264
self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
265
self._cleanup_closed_disabled = not enable_cleanup_closed
266
self._cleanup_closed_transports = [] # type: List[Optional[asyncio.Transport]]
267
self._cleanup_closed()
268
269
def __del__(self, _warnings: Any = warnings) -> None:
270
if self._closed:
271
return
272
if not self._conns:
273
return
274
275
conns = [repr(c) for c in self._conns.values()]
276
277
self._close()
278
279
if PY_36:
280
kwargs = {"source": self}
281
else:
282
kwargs = {}
283
_warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
284
context = {
285
"connector": self,
286
"connections": conns,
287
"message": "Unclosed connector",
288
}
289
if self._source_traceback is not None:
290
context["source_traceback"] = self._source_traceback
291
self._loop.call_exception_handler(context)
292
293
def __enter__(self) -> "BaseConnector":
294
warnings.warn(
295
'"witn Connector():" is deprecated, '
296
'use "async with Connector():" instead',
297
DeprecationWarning,
298
)
299
return self
300
301
def __exit__(self, *exc: Any) -> None:
302
self.close()
303
304
async def __aenter__(self) -> "BaseConnector":
305
return self
306
307
async def __aexit__(
308
self,
309
exc_type: Optional[Type[BaseException]] = None,
310
exc_value: Optional[BaseException] = None,
311
exc_traceback: Optional[TracebackType] = None,
312
) -> None:
313
await self.close()
314
315
@property
316
def force_close(self) -> bool:
317
"""Ultimately close connection on releasing if True."""
318
return self._force_close
319
320
@property
321
def limit(self) -> int:
322
"""The total number for simultaneous connections.
323
324
If limit is 0 the connector has no limit.
325
The default limit size is 100.
326
"""
327
return self._limit
328
329
@property
330
def limit_per_host(self) -> int:
331
"""The limit for simultaneous connections to the same endpoint.
332
333
Endpoints are the same if they are have equal
334
(host, port, is_ssl) triple.
335
"""
336
return self._limit_per_host
337
338
def _cleanup(self) -> None:
339
"""Cleanup unused transports."""
340
if self._cleanup_handle:
341
self._cleanup_handle.cancel()
342
# _cleanup_handle should be unset, otherwise _release() will not
343
# recreate it ever!
344
self._cleanup_handle = None
345
346
now = self._loop.time()
347
timeout = self._keepalive_timeout
348
349
if self._conns:
350
connections = {}
351
deadline = now - timeout
352
for key, conns in self._conns.items():
353
alive = []
354
for proto, use_time in conns:
355
if proto.is_connected():
356
if use_time - deadline < 0:
357
transport = proto.transport
358
proto.close()
359
if key.is_ssl and not self._cleanup_closed_disabled:
360
self._cleanup_closed_transports.append(transport)
361
else:
362
alive.append((proto, use_time))
363
else:
364
transport = proto.transport
365
proto.close()
366
if key.is_ssl and not self._cleanup_closed_disabled:
367
self._cleanup_closed_transports.append(transport)
368
369
if alive:
370
connections[key] = alive
371
372
self._conns = connections
373
374
if self._conns:
375
self._cleanup_handle = helpers.weakref_handle(
376
self, "_cleanup", timeout, self._loop
377
)
378
379
def _drop_acquired_per_host(
380
self, key: "ConnectionKey", val: ResponseHandler
381
) -> None:
382
acquired_per_host = self._acquired_per_host
383
if key not in acquired_per_host:
384
return
385
conns = acquired_per_host[key]
386
conns.remove(val)
387
if not conns:
388
del self._acquired_per_host[key]
389
390
def _cleanup_closed(self) -> None:
391
"""Double confirmation for transport close.
392
393
Some broken ssl servers may leave socket open without proper close.
394
"""
395
if self._cleanup_closed_handle:
396
self._cleanup_closed_handle.cancel()
397
398
for transport in self._cleanup_closed_transports:
399
if transport is not None:
400
transport.abort()
401
402
self._cleanup_closed_transports = []
403
404
if not self._cleanup_closed_disabled:
405
self._cleanup_closed_handle = helpers.weakref_handle(
406
self, "_cleanup_closed", self._cleanup_closed_period, self._loop
407
)
408
409
def close(self) -> Awaitable[None]:
410
"""Close all opened transports."""
411
self._close()
412
return _DeprecationWaiter(noop())
413
414
def _close(self) -> None:
415
if self._closed:
416
return
417
418
self._closed = True
419
420
try:
421
if self._loop.is_closed():
422
return
423
424
# cancel cleanup task
425
if self._cleanup_handle:
426
self._cleanup_handle.cancel()
427
428
# cancel cleanup close task
429
if self._cleanup_closed_handle:
430
self._cleanup_closed_handle.cancel()
431
432
for data in self._conns.values():
433
for proto, t0 in data:
434
proto.close()
435
436
for proto in self._acquired:
437
proto.close()
438
439
for transport in self._cleanup_closed_transports:
440
if transport is not None:
441
transport.abort()
442
443
finally:
444
self._conns.clear()
445
self._acquired.clear()
446
self._waiters.clear()
447
self._cleanup_handle = None
448
self._cleanup_closed_transports.clear()
449
self._cleanup_closed_handle = None
450
451
@property
452
def closed(self) -> bool:
453
"""Is connector closed.
454
455
A readonly property.
456
"""
457
return self._closed
458
459
def _available_connections(self, key: "ConnectionKey") -> int:
460
"""
461
Return number of available connections.
462
463
The limit, limit_per_host and the connection key are taken into account.
464
465
If it returns less than 1 means that there are no connections
466
available.
467
"""
468
if self._limit:
469
# total calc available connections
470
available = self._limit - len(self._acquired)
471
472
# check limit per host
473
if (
474
self._limit_per_host
475
and available > 0
476
and key in self._acquired_per_host
477
):
478
acquired = self._acquired_per_host.get(key)
479
assert acquired is not None
480
available = self._limit_per_host - len(acquired)
481
482
elif self._limit_per_host and key in self._acquired_per_host:
483
# check limit per host
484
acquired = self._acquired_per_host.get(key)
485
assert acquired is not None
486
available = self._limit_per_host - len(acquired)
487
else:
488
available = 1
489
490
return available
491
492
async def connect(
493
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
494
) -> Connection:
495
"""Get from pool or create new connection."""
496
key = req.connection_key
497
available = self._available_connections(key)
498
499
# Wait if there are no available connections or if there are/were
500
# waiters (i.e. don't steal connection from a waiter about to wake up)
501
if available <= 0 or key in self._waiters:
502
fut = self._loop.create_future()
503
504
# This connection will now count towards the limit.
505
self._waiters[key].append(fut)
506
507
if traces:
508
for trace in traces:
509
await trace.send_connection_queued_start()
510
511
try:
512
await fut
513
except BaseException as e:
514
if key in self._waiters:
515
# remove a waiter even if it was cancelled, normally it's
516
# removed when it's notified
517
try:
518
self._waiters[key].remove(fut)
519
except ValueError: # fut may no longer be in list
520
pass
521
522
raise e
523
finally:
524
if key in self._waiters and not self._waiters[key]:
525
del self._waiters[key]
526
527
if traces:
528
for trace in traces:
529
await trace.send_connection_queued_end()
530
531
proto = self._get(key)
532
if proto is None:
533
placeholder = cast(ResponseHandler, _TransportPlaceholder())
534
self._acquired.add(placeholder)
535
self._acquired_per_host[key].add(placeholder)
536
537
if traces:
538
for trace in traces:
539
await trace.send_connection_create_start()
540
541
try:
542
proto = await self._create_connection(req, traces, timeout)
543
if self._closed:
544
proto.close()
545
raise ClientConnectionError("Connector is closed.")
546
except BaseException:
547
if not self._closed:
548
self._acquired.remove(placeholder)
549
self._drop_acquired_per_host(key, placeholder)
550
self._release_waiter()
551
raise
552
else:
553
if not self._closed:
554
self._acquired.remove(placeholder)
555
self._drop_acquired_per_host(key, placeholder)
556
557
if traces:
558
for trace in traces:
559
await trace.send_connection_create_end()
560
else:
561
if traces:
562
# Acquire the connection to prevent race conditions with limits
563
placeholder = cast(ResponseHandler, _TransportPlaceholder())
564
self._acquired.add(placeholder)
565
self._acquired_per_host[key].add(placeholder)
566
for trace in traces:
567
await trace.send_connection_reuseconn()
568
self._acquired.remove(placeholder)
569
self._drop_acquired_per_host(key, placeholder)
570
571
self._acquired.add(proto)
572
self._acquired_per_host[key].add(proto)
573
return Connection(self, key, proto, self._loop)
574
575
def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
576
try:
577
conns = self._conns[key]
578
except KeyError:
579
return None
580
581
t1 = self._loop.time()
582
while conns:
583
proto, t0 = conns.pop()
584
if proto.is_connected():
585
if t1 - t0 > self._keepalive_timeout:
586
transport = proto.transport
587
proto.close()
588
# only for SSL transports
589
if key.is_ssl and not self._cleanup_closed_disabled:
590
self._cleanup_closed_transports.append(transport)
591
else:
592
if not conns:
593
# The very last connection was reclaimed: drop the key
594
del self._conns[key]
595
return proto
596
else:
597
transport = proto.transport
598
proto.close()
599
if key.is_ssl and not self._cleanup_closed_disabled:
600
self._cleanup_closed_transports.append(transport)
601
602
# No more connections: drop the key
603
del self._conns[key]
604
return None
605
606
def _release_waiter(self) -> None:
607
"""
608
Iterates over all waiters until one to be released is found.
609
610
The one to be released is not finsihed and
611
belongs to a host that has available connections.
612
"""
613
if not self._waiters:
614
return
615
616
# Having the dict keys ordered this avoids to iterate
617
# at the same order at each call.
618
queues = list(self._waiters.keys())
619
random.shuffle(queues)
620
621
for key in queues:
622
if self._available_connections(key) < 1:
623
continue
624
625
waiters = self._waiters[key]
626
while waiters:
627
waiter = waiters.popleft()
628
if not waiter.done():
629
waiter.set_result(None)
630
return
631
632
def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
633
if self._closed:
634
# acquired connection is already released on connector closing
635
return
636
637
try:
638
self._acquired.remove(proto)
639
self._drop_acquired_per_host(key, proto)
640
except KeyError: # pragma: no cover
641
# this may be result of undetermenistic order of objects
642
# finalization due garbage collection.
643
pass
644
else:
645
self._release_waiter()
646
647
def _release(
648
self,
649
key: "ConnectionKey",
650
protocol: ResponseHandler,
651
*,
652
should_close: bool = False,
653
) -> None:
654
if self._closed:
655
# acquired connection is already released on connector closing
656
return
657
658
self._release_acquired(key, protocol)
659
660
if self._force_close:
661
should_close = True
662
663
if should_close or protocol.should_close:
664
transport = protocol.transport
665
protocol.close()
666
667
if key.is_ssl and not self._cleanup_closed_disabled:
668
self._cleanup_closed_transports.append(transport)
669
else:
670
conns = self._conns.get(key)
671
if conns is None:
672
conns = self._conns[key] = []
673
conns.append((protocol, self._loop.time()))
674
675
if self._cleanup_handle is None:
676
self._cleanup_handle = helpers.weakref_handle(
677
self, "_cleanup", self._keepalive_timeout, self._loop
678
)
679
680
async def _create_connection(
681
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
682
) -> ResponseHandler:
683
raise NotImplementedError()
684
685
686
class _DNSCacheTable:
687
def __init__(self, ttl: Optional[float] = None) -> None:
688
self._addrs_rr = (
689
{}
690
) # type: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]]
691
self._timestamps = {} # type: Dict[Tuple[str, int], float]
692
self._ttl = ttl
693
694
def __contains__(self, host: object) -> bool:
695
return host in self._addrs_rr
696
697
def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
698
self._addrs_rr[key] = (cycle(addrs), len(addrs))
699
700
if self._ttl:
701
self._timestamps[key] = monotonic()
702
703
def remove(self, key: Tuple[str, int]) -> None:
704
self._addrs_rr.pop(key, None)
705
706
if self._ttl:
707
self._timestamps.pop(key, None)
708
709
def clear(self) -> None:
710
self._addrs_rr.clear()
711
self._timestamps.clear()
712
713
def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
714
loop, length = self._addrs_rr[key]
715
addrs = list(islice(loop, length))
716
# Consume one more element to shift internal state of `cycle`
717
next(loop)
718
return addrs
719
720
def expired(self, key: Tuple[str, int]) -> bool:
721
if self._ttl is None:
722
return False
723
724
return self._timestamps[key] + self._ttl < monotonic()
725
726
727
class TCPConnector(BaseConnector):
728
"""TCP connector.
729
730
verify_ssl - Set to True to check ssl certifications.
731
fingerprint - Pass the binary sha256
732
digest of the expected certificate in DER format to verify
733
that the certificate the server presents matches. See also
734
https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
735
resolver - Enable DNS lookups and use this
736
resolver
737
use_dns_cache - Use memory cache for DNS lookups.
738
ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
739
family - socket address family
740
local_addr - local tuple of (host, port) to bind socket to
741
742
keepalive_timeout - (optional) Keep-alive timeout.
743
force_close - Set to True to force close and do reconnect
744
after each request (and between redirects).
745
limit - The total number of simultaneous connections.
746
limit_per_host - Number of simultaneous connections to one host.
747
enable_cleanup_closed - Enables clean-up closed ssl transports.
748
Disabled by default.
749
loop - Optional event loop.
750
"""
751
752
def __init__(
753
self,
754
*,
755
verify_ssl: bool = True,
756
fingerprint: Optional[bytes] = None,
757
use_dns_cache: bool = True,
758
ttl_dns_cache: Optional[int] = 10,
759
family: int = 0,
760
ssl_context: Optional[SSLContext] = None,
761
ssl: Union[None, bool, Fingerprint, SSLContext] = None,
762
local_addr: Optional[Tuple[str, int]] = None,
763
resolver: Optional[AbstractResolver] = None,
764
keepalive_timeout: Union[None, float, object] = sentinel,
765
force_close: bool = False,
766
limit: int = 100,
767
limit_per_host: int = 0,
768
enable_cleanup_closed: bool = False,
769
loop: Optional[asyncio.AbstractEventLoop] = None,
770
):
771
super().__init__(
772
keepalive_timeout=keepalive_timeout,
773
force_close=force_close,
774
limit=limit,
775
limit_per_host=limit_per_host,
776
enable_cleanup_closed=enable_cleanup_closed,
777
loop=loop,
778
)
779
780
self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
781
if resolver is None:
782
resolver = DefaultResolver(loop=self._loop)
783
self._resolver = resolver
784
785
self._use_dns_cache = use_dns_cache
786
self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
787
self._throttle_dns_events = (
788
{}
789
) # type: Dict[Tuple[str, int], EventResultOrError]
790
self._family = family
791
self._local_addr = local_addr
792
793
def close(self) -> Awaitable[None]:
794
"""Close all ongoing DNS calls."""
795
for ev in self._throttle_dns_events.values():
796
ev.cancel()
797
798
return super().close()
799
800
@property
801
def family(self) -> int:
802
"""Socket family like AF_INET."""
803
return self._family
804
805
@property
806
def use_dns_cache(self) -> bool:
807
"""True if local DNS caching is enabled."""
808
return self._use_dns_cache
809
810
def clear_dns_cache(
811
self, host: Optional[str] = None, port: Optional[int] = None
812
) -> None:
813
"""Remove specified host/port or clear all dns local cache."""
814
if host is not None and port is not None:
815
self._cached_hosts.remove((host, port))
816
elif host is not None or port is not None:
817
raise ValueError("either both host and port " "or none of them are allowed")
818
else:
819
self._cached_hosts.clear()
820
821
async def _resolve_host(
822
self, host: str, port: int, traces: Optional[List["Trace"]] = None
823
) -> List[Dict[str, Any]]:
824
if is_ip_address(host):
825
return [
826
{
827
"hostname": host,
828
"host": host,
829
"port": port,
830
"family": self._family,
831
"proto": 0,
832
"flags": 0,
833
}
834
]
835
836
if not self._use_dns_cache:
837
838
if traces:
839
for trace in traces:
840
await trace.send_dns_resolvehost_start(host)
841
842
res = await self._resolver.resolve(host, port, family=self._family)
843
844
if traces:
845
for trace in traces:
846
await trace.send_dns_resolvehost_end(host)
847
848
return res
849
850
key = (host, port)
851
852
if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
853
# get result early, before any await (#4014)
854
result = self._cached_hosts.next_addrs(key)
855
856
if traces:
857
for trace in traces:
858
await trace.send_dns_cache_hit(host)
859
return result
860
861
if key in self._throttle_dns_events:
862
# get event early, before any await (#4014)
863
event = self._throttle_dns_events[key]
864
if traces:
865
for trace in traces:
866
await trace.send_dns_cache_hit(host)
867
await event.wait()
868
else:
869
# update dict early, before any await (#4014)
870
self._throttle_dns_events[key] = EventResultOrError(self._loop)
871
if traces:
872
for trace in traces:
873
await trace.send_dns_cache_miss(host)
874
try:
875
876
if traces:
877
for trace in traces:
878
await trace.send_dns_resolvehost_start(host)
879
880
addrs = await self._resolver.resolve(host, port, family=self._family)
881
if traces:
882
for trace in traces:
883
await trace.send_dns_resolvehost_end(host)
884
885
self._cached_hosts.add(key, addrs)
886
self._throttle_dns_events[key].set()
887
except BaseException as e:
888
# any DNS exception, independently of the implementation
889
# is set for the waiters to raise the same exception.
890
self._throttle_dns_events[key].set(exc=e)
891
raise
892
finally:
893
self._throttle_dns_events.pop(key)
894
895
return self._cached_hosts.next_addrs(key)
896
897
async def _create_connection(
898
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
899
) -> ResponseHandler:
900
"""Create connection.
901
902
Has same keyword arguments as BaseEventLoop.create_connection.
903
"""
904
if req.proxy:
905
_, proto = await self._create_proxy_connection(req, traces, timeout)
906
else:
907
_, proto = await self._create_direct_connection(req, traces, timeout)
908
909
return proto
910
911
@staticmethod
912
@functools.lru_cache(None)
913
def _make_ssl_context(verified: bool) -> SSLContext:
914
if verified:
915
return ssl.create_default_context()
916
else:
917
sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
918
sslcontext.options |= ssl.OP_NO_SSLv2
919
sslcontext.options |= ssl.OP_NO_SSLv3
920
sslcontext.check_hostname = False
921
sslcontext.verify_mode = ssl.CERT_NONE
922
try:
923
sslcontext.options |= ssl.OP_NO_COMPRESSION
924
except AttributeError as attr_err:
925
warnings.warn(
926
"{!s}: The Python interpreter is compiled "
927
"against OpenSSL < 1.0.0. Ref: "
928
"https://docs.python.org/3/library/ssl.html"
929
"#ssl.OP_NO_COMPRESSION".format(attr_err),
930
)
931
sslcontext.set_default_verify_paths()
932
return sslcontext
933
934
def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]:
935
"""Logic to get the correct SSL context
936
937
0. if req.ssl is false, return None
938
939
1. if ssl_context is specified in req, use it
940
2. if _ssl_context is specified in self, use it
941
3. otherwise:
942
1. if verify_ssl is not specified in req, use self.ssl_context
943
(will generate a default context according to self.verify_ssl)
944
2. if verify_ssl is True in req, generate a default SSL context
945
3. if verify_ssl is False in req, generate a SSL context that
946
won't verify
947
"""
948
if req.is_ssl():
949
if ssl is None: # pragma: no cover
950
raise RuntimeError("SSL is not supported.")
951
sslcontext = req.ssl
952
if isinstance(sslcontext, ssl.SSLContext):
953
return sslcontext
954
if sslcontext is not None:
955
# not verified or fingerprinted
956
return self._make_ssl_context(False)
957
sslcontext = self._ssl
958
if isinstance(sslcontext, ssl.SSLContext):
959
return sslcontext
960
if sslcontext is not None:
961
# not verified or fingerprinted
962
return self._make_ssl_context(False)
963
return self._make_ssl_context(True)
964
else:
965
return None
966
967
def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]:
968
ret = req.ssl
969
if isinstance(ret, Fingerprint):
970
return ret
971
ret = self._ssl
972
if isinstance(ret, Fingerprint):
973
return ret
974
return None
975
976
async def _wrap_create_connection(
977
self,
978
*args: Any,
979
req: "ClientRequest",
980
timeout: "ClientTimeout",
981
client_error: Type[Exception] = ClientConnectorError,
982
**kwargs: Any,
983
) -> Tuple[asyncio.Transport, ResponseHandler]:
984
try:
985
async with ceil_timeout(timeout.sock_connect):
986
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
987
except cert_errors as exc:
988
raise ClientConnectorCertificateError(req.connection_key, exc) from exc
989
except ssl_errors as exc:
990
raise ClientConnectorSSLError(req.connection_key, exc) from exc
991
except OSError as exc:
992
raise client_error(req.connection_key, exc) from exc
993
994
def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
995
"""Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
996
997
One case is that :py:meth:`asyncio.loop.start_tls` is not yet
998
implemented under Python 3.6. It is necessary for TLS-in-TLS so
999
that it is possible to send HTTPS queries through HTTPS proxies.
1000
1001
This doesn't affect regular HTTP requests, though.
1002
"""
1003
if not req.is_ssl():
1004
return
1005
1006
proxy_url = req.proxy
1007
assert proxy_url is not None
1008
if proxy_url.scheme != "https":
1009
return
1010
1011
self._check_loop_for_start_tls()
1012
1013
def _check_loop_for_start_tls(self) -> None:
1014
try:
1015
self._loop.start_tls
1016
except AttributeError as attr_exc:
1017
raise RuntimeError(
1018
"An HTTPS request is being sent through an HTTPS proxy. "
1019
"This needs support for TLS in TLS but it is not implemented "
1020
"in your runtime for the stdlib asyncio.\n\n"
1021
"Please upgrade to Python 3.7 or higher. For more details, "
1022
"please see:\n"
1023
"* https://bugs.python.org/issue37179\n"
1024
"* https://github.com/python/cpython/pull/28073\n"
1025
"* https://docs.aiohttp.org/en/stable/"
1026
"client_advanced.html#proxy-support\n"
1027
"* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1028
) from attr_exc
1029
1030
def _loop_supports_start_tls(self) -> bool:
1031
try:
1032
self._check_loop_for_start_tls()
1033
except RuntimeError:
1034
return False
1035
else:
1036
return True
1037
1038
def _warn_about_tls_in_tls(
1039
self,
1040
underlying_transport: asyncio.Transport,
1041
req: "ClientRequest",
1042
) -> None:
1043
"""Issue a warning if the requested URL has HTTPS scheme."""
1044
if req.request_info.url.scheme != "https":
1045
return
1046
1047
asyncio_supports_tls_in_tls = getattr(
1048
underlying_transport,
1049
"_start_tls_compatible",
1050
False,
1051
)
1052
1053
if asyncio_supports_tls_in_tls:
1054
return
1055
1056
warnings.warn(
1057
"An HTTPS request is being sent through an HTTPS proxy. "
1058
"This support for TLS in TLS is known to be disabled "
1059
"in the stdlib asyncio. This is why you'll probably see "
1060
"an error in the log below.\n\n"
1061
"It is possible to enable it via monkeypatching under "
1062
"Python 3.7 or higher. For more details, see:\n"
1063
"* https://bugs.python.org/issue37179\n"
1064
"* https://github.com/python/cpython/pull/28073\n\n"
1065
"You can temporarily patch this as follows:\n"
1066
"* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1067
"* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1068
RuntimeWarning,
1069
source=self,
1070
# Why `4`? At least 3 of the calls in the stack originate
1071
# from the methods in this class.
1072
stacklevel=3,
1073
)
1074
1075
async def _start_tls_connection(
1076
self,
1077
underlying_transport: asyncio.Transport,
1078
req: "ClientRequest",
1079
timeout: "ClientTimeout",
1080
client_error: Type[Exception] = ClientConnectorError,
1081
) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1082
"""Wrap the raw TCP transport with TLS."""
1083
tls_proto = self._factory() # Create a brand new proto for TLS
1084
1085
# Safety of the `cast()` call here is based on the fact that
1086
# internally `_get_ssl_context()` only returns `None` when
1087
# `req.is_ssl()` evaluates to `False` which is never gonna happen
1088
# in this code path. Of course, it's rather fragile
1089
# maintainability-wise but this is to be solved separately.
1090
sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))
1091
1092
try:
1093
async with ceil_timeout(timeout.sock_connect):
1094
try:
1095
tls_transport = await self._loop.start_tls(
1096
underlying_transport,
1097
tls_proto,
1098
sslcontext,
1099
server_hostname=req.host,
1100
ssl_handshake_timeout=timeout.total,
1101
)
1102
except BaseException:
1103
# We need to close the underlying transport since
1104
# `start_tls()` probably failed before it had a
1105
# chance to do this:
1106
underlying_transport.close()
1107
raise
1108
except cert_errors as exc:
1109
raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1110
except ssl_errors as exc:
1111
raise ClientConnectorSSLError(req.connection_key, exc) from exc
1112
except OSError as exc:
1113
raise client_error(req.connection_key, exc) from exc
1114
except TypeError as type_err:
1115
# Example cause looks like this:
1116
# TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1117
# object at 0x7f760615e460> is not supported by start_tls()
1118
1119
raise ClientConnectionError(
1120
"Cannot initialize a TLS-in-TLS connection to host "
1121
f"{req.host!s}:{req.port:d} through an underlying connection "
1122
f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1123
f"[{type_err!s}]"
1124
) from type_err
1125
else:
1126
tls_proto.connection_made(
1127
tls_transport
1128
) # Kick the state machine of the new TLS protocol
1129
1130
return tls_transport, tls_proto
1131
1132
async def _create_direct_connection(
1133
self,
1134
req: "ClientRequest",
1135
traces: List["Trace"],
1136
timeout: "ClientTimeout",
1137
*,
1138
client_error: Type[Exception] = ClientConnectorError,
1139
) -> Tuple[asyncio.Transport, ResponseHandler]:
1140
sslcontext = self._get_ssl_context(req)
1141
fingerprint = self._get_fingerprint(req)
1142
1143
host = req.url.raw_host
1144
assert host is not None
1145
port = req.port
1146
assert port is not None
1147
host_resolved = asyncio.ensure_future(
1148
self._resolve_host(host, port, traces=traces), loop=self._loop
1149
)
1150
try:
1151
# Cancelling this lookup should not cancel the underlying lookup
1152
# or else the cancel event will get broadcast to all the waiters
1153
# across all connections.
1154
hosts = await asyncio.shield(host_resolved)
1155
except asyncio.CancelledError:
1156
1157
def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1158
with suppress(Exception, asyncio.CancelledError):
1159
fut.result()
1160
1161
host_resolved.add_done_callback(drop_exception)
1162
raise
1163
except OSError as exc:
1164
# in case of proxy it is not ClientProxyConnectionError
1165
# it is problem of resolving proxy ip itself
1166
raise ClientConnectorError(req.connection_key, exc) from exc
1167
1168
last_exc = None # type: Optional[Exception]
1169
1170
for hinfo in hosts:
1171
host = hinfo["host"]
1172
port = hinfo["port"]
1173
1174
try:
1175
transp, proto = await self._wrap_create_connection(
1176
self._factory,
1177
host,
1178
port,
1179
timeout=timeout,
1180
ssl=sslcontext,
1181
family=hinfo["family"],
1182
proto=hinfo["proto"],
1183
flags=hinfo["flags"],
1184
server_hostname=hinfo["hostname"] if sslcontext else None,
1185
local_addr=self._local_addr,
1186
req=req,
1187
client_error=client_error,
1188
)
1189
except ClientConnectorError as exc:
1190
last_exc = exc
1191
continue
1192
1193
if req.is_ssl() and fingerprint:
1194
try:
1195
fingerprint.check(transp)
1196
except ServerFingerprintMismatch as exc:
1197
transp.close()
1198
if not self._cleanup_closed_disabled:
1199
self._cleanup_closed_transports.append(transp)
1200
last_exc = exc
1201
continue
1202
1203
return transp, proto
1204
else:
1205
assert last_exc is not None
1206
raise last_exc
1207
1208
async def _create_proxy_connection(
1209
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1210
) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1211
self._fail_on_no_start_tls(req)
1212
runtime_has_start_tls = self._loop_supports_start_tls()
1213
1214
headers = {} # type: Dict[str, str]
1215
if req.proxy_headers is not None:
1216
headers = req.proxy_headers # type: ignore[assignment]
1217
headers[hdrs.HOST] = req.headers[hdrs.HOST]
1218
1219
url = req.proxy
1220
assert url is not None
1221
proxy_req = ClientRequest(
1222
hdrs.METH_GET,
1223
url,
1224
headers=headers,
1225
auth=req.proxy_auth,
1226
loop=self._loop,
1227
ssl=req.ssl,
1228
)
1229
1230
# create connection to proxy server
1231
transport, proto = await self._create_direct_connection(
1232
proxy_req, [], timeout, client_error=ClientProxyConnectionError
1233
)
1234
1235
# Many HTTP proxies has buggy keepalive support. Let's not
1236
# reuse connection but close it after processing every
1237
# response.
1238
proto.force_close()
1239
1240
auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1241
if auth is not None:
1242
if not req.is_ssl():
1243
req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1244
else:
1245
proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1246
1247
if req.is_ssl():
1248
if runtime_has_start_tls:
1249
self._warn_about_tls_in_tls(transport, req)
1250
1251
# For HTTPS requests over HTTP proxy
1252
# we must notify proxy to tunnel connection
1253
# so we send CONNECT command:
1254
# CONNECT www.python.org:443 HTTP/1.1
1255
# Host: www.python.org
1256
#
1257
# next we must do TLS handshake and so on
1258
# to do this we must wrap raw socket into secure one
1259
# asyncio handles this perfectly
1260
proxy_req.method = hdrs.METH_CONNECT
1261
proxy_req.url = req.url
1262
key = attr.evolve(
1263
req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1264
)
1265
conn = Connection(self, key, proto, self._loop)
1266
proxy_resp = await proxy_req.send(conn)
1267
try:
1268
protocol = conn._protocol
1269
assert protocol is not None
1270
1271
# read_until_eof=True will ensure the connection isn't closed
1272
# once the response is received and processed allowing
1273
# START_TLS to work on the connection below.
1274
protocol.set_response_params(read_until_eof=runtime_has_start_tls)
1275
resp = await proxy_resp.start(conn)
1276
except BaseException:
1277
proxy_resp.close()
1278
conn.close()
1279
raise
1280
else:
1281
conn._protocol = None
1282
conn._transport = None
1283
try:
1284
if resp.status != 200:
1285
message = resp.reason
1286
if message is None:
1287
message = RESPONSES[resp.status][0]
1288
raise ClientHttpProxyError(
1289
proxy_resp.request_info,
1290
resp.history,
1291
status=resp.status,
1292
message=message,
1293
headers=resp.headers,
1294
)
1295
if not runtime_has_start_tls:
1296
rawsock = transport.get_extra_info("socket", default=None)
1297
if rawsock is None:
1298
raise RuntimeError(
1299
"Transport does not expose socket instance"
1300
)
1301
# Duplicate the socket, so now we can close proxy transport
1302
rawsock = rawsock.dup()
1303
except BaseException:
1304
# It shouldn't be closed in `finally` because it's fed to
1305
# `loop.start_tls()` and the docs say not to touch it after
1306
# passing there.
1307
transport.close()
1308
raise
1309
finally:
1310
if not runtime_has_start_tls:
1311
transport.close()
1312
1313
if not runtime_has_start_tls:
1314
# HTTP proxy with support for upgrade to HTTPS
1315
sslcontext = self._get_ssl_context(req)
1316
return await self._wrap_create_connection(
1317
self._factory,
1318
timeout=timeout,
1319
ssl=sslcontext,
1320
sock=rawsock,
1321
server_hostname=req.host,
1322
req=req,
1323
)
1324
1325
return await self._start_tls_connection(
1326
# Access the old transport for the last time before it's
1327
# closed and forgotten forever:
1328
transport,
1329
req=req,
1330
timeout=timeout,
1331
)
1332
finally:
1333
proxy_resp.close()
1334
1335
return transport, proto
1336
1337
1338
class UnixConnector(BaseConnector):
1339
"""Unix socket connector.
1340
1341
path - Unix socket path.
1342
keepalive_timeout - (optional) Keep-alive timeout.
1343
force_close - Set to True to force close and do reconnect
1344
after each request (and between redirects).
1345
limit - The total number of simultaneous connections.
1346
limit_per_host - Number of simultaneous connections to one host.
1347
loop - Optional event loop.
1348
"""
1349
1350
def __init__(
1351
self,
1352
path: str,
1353
force_close: bool = False,
1354
keepalive_timeout: Union[object, float, None] = sentinel,
1355
limit: int = 100,
1356
limit_per_host: int = 0,
1357
loop: Optional[asyncio.AbstractEventLoop] = None,
1358
) -> None:
1359
super().__init__(
1360
force_close=force_close,
1361
keepalive_timeout=keepalive_timeout,
1362
limit=limit,
1363
limit_per_host=limit_per_host,
1364
loop=loop,
1365
)
1366
self._path = path
1367
1368
@property
1369
def path(self) -> str:
1370
"""Path to unix socket."""
1371
return self._path
1372
1373
async def _create_connection(
1374
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1375
) -> ResponseHandler:
1376
try:
1377
async with ceil_timeout(timeout.sock_connect):
1378
_, proto = await self._loop.create_unix_connection(
1379
self._factory, self._path
1380
)
1381
except OSError as exc:
1382
raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1383
1384
return cast(ResponseHandler, proto)
1385
1386
1387
class NamedPipeConnector(BaseConnector):
1388
"""Named pipe connector.
1389
1390
Only supported by the proactor event loop.
1391
See also: https://docs.python.org/3.7/library/asyncio-eventloop.html
1392
1393
path - Windows named pipe path.
1394
keepalive_timeout - (optional) Keep-alive timeout.
1395
force_close - Set to True to force close and do reconnect
1396
after each request (and between redirects).
1397
limit - The total number of simultaneous connections.
1398
limit_per_host - Number of simultaneous connections to one host.
1399
loop - Optional event loop.
1400
"""
1401
1402
def __init__(
1403
self,
1404
path: str,
1405
force_close: bool = False,
1406
keepalive_timeout: Union[object, float, None] = sentinel,
1407
limit: int = 100,
1408
limit_per_host: int = 0,
1409
loop: Optional[asyncio.AbstractEventLoop] = None,
1410
) -> None:
1411
super().__init__(
1412
force_close=force_close,
1413
keepalive_timeout=keepalive_timeout,
1414
limit=limit,
1415
limit_per_host=limit_per_host,
1416
loop=loop,
1417
)
1418
if not isinstance(
1419
self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1420
):
1421
raise RuntimeError(
1422
"Named Pipes only available in proactor " "loop under windows"
1423
)
1424
self._path = path
1425
1426
@property
1427
def path(self) -> str:
1428
"""Path to the named pipe."""
1429
return self._path
1430
1431
async def _create_connection(
1432
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1433
) -> ResponseHandler:
1434
try:
1435
async with ceil_timeout(timeout.sock_connect):
1436
_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
1437
self._factory, self._path
1438
)
1439
# the drain is required so that the connection_made is called
1440
# and transport is set otherwise it is not set before the
1441
# `assert conn.transport is not None`
1442
# in client.py's _request method
1443
await asyncio.sleep(0)
1444
# other option is to manually set transport like
1445
# `proto.transport = trans`
1446
except OSError as exc:
1447
raise ClientConnectorError(req.connection_key, exc) from exc
1448
1449
return cast(ResponseHandler, proto)
1450
1451