Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/connector.py
7763 views
import asyncio1import functools2import random3import sys4import traceback5import warnings6from collections import defaultdict, deque7from contextlib import suppress8from http.cookies import SimpleCookie9from itertools import cycle, islice10from time import monotonic11from types import TracebackType12from typing import (13TYPE_CHECKING,14Any,15Awaitable,16Callable,17DefaultDict,18Dict,19Iterator,20List,21Optional,22Set,23Tuple,24Type,25Union,26cast,27)2829import attr3031from . import hdrs, helpers32from .abc import AbstractResolver33from .client_exceptions import (34ClientConnectionError,35ClientConnectorCertificateError,36ClientConnectorError,37ClientConnectorSSLError,38ClientHttpProxyError,39ClientProxyConnectionError,40ServerFingerprintMismatch,41UnixClientConnectorError,42cert_errors,43ssl_errors,44)45from .client_proto import ResponseHandler46from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params47from .helpers import (48PY_36,49ceil_timeout,50get_running_loop,51is_ip_address,52noop,53sentinel,54)55from .http import RESPONSES56from .locks import EventResultOrError57from .resolver import DefaultResolver5859try:60import ssl6162SSLContext = ssl.SSLContext63except ImportError: # pragma: no cover64ssl = None # type: ignore[assignment]65SSLContext = object # type: ignore[misc,assignment]666768__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")697071if TYPE_CHECKING: # pragma: no cover72from .client import ClientTimeout73from .client_reqrep import ConnectionKey74from .tracing import Trace757677class _DeprecationWaiter:78__slots__ = ("_awaitable", "_awaited")7980def __init__(self, awaitable: Awaitable[Any]) -> None:81self._awaitable = awaitable82self._awaited = False8384def __await__(self) -> Any:85self._awaited = True86return self._awaitable.__await__()8788def __del__(self) -> None:89if not self._awaited:90warnings.warn(91"Connector.close() is a coroutine, "92"please use await connector.close()",93DeprecationWarning,94)959697class Connection:9899_source_traceback = None100_transport = None101102def __init__(103self,104connector: "BaseConnector",105key: "ConnectionKey",106protocol: ResponseHandler,107loop: asyncio.AbstractEventLoop,108) -> None:109self._key = key110self._connector = connector111self._loop = loop112self._protocol = protocol # type: Optional[ResponseHandler]113self._callbacks = [] # type: List[Callable[[], None]]114115if loop.get_debug():116self._source_traceback = traceback.extract_stack(sys._getframe(1))117118def __repr__(self) -> str:119return f"Connection<{self._key}>"120121def __del__(self, _warnings: Any = warnings) -> None:122if self._protocol is not None:123if PY_36:124kwargs = {"source": self}125else:126kwargs = {}127_warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)128if self._loop.is_closed():129return130131self._connector._release(self._key, self._protocol, should_close=True)132133context = {"client_connection": self, "message": "Unclosed connection"}134if self._source_traceback is not None:135context["source_traceback"] = self._source_traceback136self._loop.call_exception_handler(context)137138@property139def loop(self) -> asyncio.AbstractEventLoop:140warnings.warn(141"connector.loop property is deprecated", DeprecationWarning, stacklevel=2142)143return self._loop144145@property146def transport(self) -> Optional[asyncio.Transport]:147if self._protocol is None:148return None149return self._protocol.transport150151@property152def protocol(self) -> Optional[ResponseHandler]:153return self._protocol154155def add_callback(self, callback: Callable[[], None]) -> None:156if callback is not None:157self._callbacks.append(callback)158159def _notify_release(self) -> None:160callbacks, self._callbacks = self._callbacks[:], []161162for cb in callbacks:163with suppress(Exception):164cb()165166def close(self) -> None:167self._notify_release()168169if self._protocol is not None:170self._connector._release(self._key, self._protocol, should_close=True)171self._protocol = None172173def release(self) -> None:174self._notify_release()175176if self._protocol is not None:177self._connector._release(178self._key, self._protocol, should_close=self._protocol.should_close179)180self._protocol = None181182@property183def closed(self) -> bool:184return self._protocol is None or not self._protocol.is_connected()185186187class _TransportPlaceholder:188"""placeholder for BaseConnector.connect function"""189190def close(self) -> None:191pass192193194class BaseConnector:195"""Base connector class.196197keepalive_timeout - (optional) Keep-alive timeout.198force_close - Set to True to force close and do reconnect199after each request (and between redirects).200limit - The total number of simultaneous connections.201limit_per_host - Number of simultaneous connections to one host.202enable_cleanup_closed - Enables clean-up closed ssl transports.203Disabled by default.204loop - Optional event loop.205"""206207_closed = True # prevent AttributeError in __del__ if ctor was failed208_source_traceback = None209210# abort transport after 2 seconds (cleanup broken connections)211_cleanup_closed_period = 2.0212213def __init__(214self,215*,216keepalive_timeout: Union[object, None, float] = sentinel,217force_close: bool = False,218limit: int = 100,219limit_per_host: int = 0,220enable_cleanup_closed: bool = False,221loop: Optional[asyncio.AbstractEventLoop] = None,222) -> None:223224if force_close:225if keepalive_timeout is not None and keepalive_timeout is not sentinel:226raise ValueError(227"keepalive_timeout cannot " "be set if force_close is True"228)229else:230if keepalive_timeout is sentinel:231keepalive_timeout = 15.0232233loop = get_running_loop(loop)234235self._closed = False236if loop.get_debug():237self._source_traceback = traceback.extract_stack(sys._getframe(1))238239self._conns = (240{}241) # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]]242self._limit = limit243self._limit_per_host = limit_per_host244self._acquired = set() # type: Set[ResponseHandler]245self._acquired_per_host = defaultdict(246set247) # type: DefaultDict[ConnectionKey, Set[ResponseHandler]]248self._keepalive_timeout = cast(float, keepalive_timeout)249self._force_close = force_close250251# {host_key: FIFO list of waiters}252self._waiters = defaultdict(deque) # type: ignore[var-annotated]253254self._loop = loop255self._factory = functools.partial(ResponseHandler, loop=loop)256257self.cookies = SimpleCookie() # type: SimpleCookie[str]258259# start keep-alive connection cleanup task260self._cleanup_handle: Optional[asyncio.TimerHandle] = None261262# start cleanup closed transports task263self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None264self._cleanup_closed_disabled = not enable_cleanup_closed265self._cleanup_closed_transports = [] # type: List[Optional[asyncio.Transport]]266self._cleanup_closed()267268def __del__(self, _warnings: Any = warnings) -> None:269if self._closed:270return271if not self._conns:272return273274conns = [repr(c) for c in self._conns.values()]275276self._close()277278if PY_36:279kwargs = {"source": self}280else:281kwargs = {}282_warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)283context = {284"connector": self,285"connections": conns,286"message": "Unclosed connector",287}288if self._source_traceback is not None:289context["source_traceback"] = self._source_traceback290self._loop.call_exception_handler(context)291292def __enter__(self) -> "BaseConnector":293warnings.warn(294'"witn Connector():" is deprecated, '295'use "async with Connector():" instead',296DeprecationWarning,297)298return self299300def __exit__(self, *exc: Any) -> None:301self.close()302303async def __aenter__(self) -> "BaseConnector":304return self305306async def __aexit__(307self,308exc_type: Optional[Type[BaseException]] = None,309exc_value: Optional[BaseException] = None,310exc_traceback: Optional[TracebackType] = None,311) -> None:312await self.close()313314@property315def force_close(self) -> bool:316"""Ultimately close connection on releasing if True."""317return self._force_close318319@property320def limit(self) -> int:321"""The total number for simultaneous connections.322323If limit is 0 the connector has no limit.324The default limit size is 100.325"""326return self._limit327328@property329def limit_per_host(self) -> int:330"""The limit for simultaneous connections to the same endpoint.331332Endpoints are the same if they are have equal333(host, port, is_ssl) triple.334"""335return self._limit_per_host336337def _cleanup(self) -> None:338"""Cleanup unused transports."""339if self._cleanup_handle:340self._cleanup_handle.cancel()341# _cleanup_handle should be unset, otherwise _release() will not342# recreate it ever!343self._cleanup_handle = None344345now = self._loop.time()346timeout = self._keepalive_timeout347348if self._conns:349connections = {}350deadline = now - timeout351for key, conns in self._conns.items():352alive = []353for proto, use_time in conns:354if proto.is_connected():355if use_time - deadline < 0:356transport = proto.transport357proto.close()358if key.is_ssl and not self._cleanup_closed_disabled:359self._cleanup_closed_transports.append(transport)360else:361alive.append((proto, use_time))362else:363transport = proto.transport364proto.close()365if key.is_ssl and not self._cleanup_closed_disabled:366self._cleanup_closed_transports.append(transport)367368if alive:369connections[key] = alive370371self._conns = connections372373if self._conns:374self._cleanup_handle = helpers.weakref_handle(375self, "_cleanup", timeout, self._loop376)377378def _drop_acquired_per_host(379self, key: "ConnectionKey", val: ResponseHandler380) -> None:381acquired_per_host = self._acquired_per_host382if key not in acquired_per_host:383return384conns = acquired_per_host[key]385conns.remove(val)386if not conns:387del self._acquired_per_host[key]388389def _cleanup_closed(self) -> None:390"""Double confirmation for transport close.391392Some broken ssl servers may leave socket open without proper close.393"""394if self._cleanup_closed_handle:395self._cleanup_closed_handle.cancel()396397for transport in self._cleanup_closed_transports:398if transport is not None:399transport.abort()400401self._cleanup_closed_transports = []402403if not self._cleanup_closed_disabled:404self._cleanup_closed_handle = helpers.weakref_handle(405self, "_cleanup_closed", self._cleanup_closed_period, self._loop406)407408def close(self) -> Awaitable[None]:409"""Close all opened transports."""410self._close()411return _DeprecationWaiter(noop())412413def _close(self) -> None:414if self._closed:415return416417self._closed = True418419try:420if self._loop.is_closed():421return422423# cancel cleanup task424if self._cleanup_handle:425self._cleanup_handle.cancel()426427# cancel cleanup close task428if self._cleanup_closed_handle:429self._cleanup_closed_handle.cancel()430431for data in self._conns.values():432for proto, t0 in data:433proto.close()434435for proto in self._acquired:436proto.close()437438for transport in self._cleanup_closed_transports:439if transport is not None:440transport.abort()441442finally:443self._conns.clear()444self._acquired.clear()445self._waiters.clear()446self._cleanup_handle = None447self._cleanup_closed_transports.clear()448self._cleanup_closed_handle = None449450@property451def closed(self) -> bool:452"""Is connector closed.453454A readonly property.455"""456return self._closed457458def _available_connections(self, key: "ConnectionKey") -> int:459"""460Return number of available connections.461462The limit, limit_per_host and the connection key are taken into account.463464If it returns less than 1 means that there are no connections465available.466"""467if self._limit:468# total calc available connections469available = self._limit - len(self._acquired)470471# check limit per host472if (473self._limit_per_host474and available > 0475and key in self._acquired_per_host476):477acquired = self._acquired_per_host.get(key)478assert acquired is not None479available = self._limit_per_host - len(acquired)480481elif self._limit_per_host and key in self._acquired_per_host:482# check limit per host483acquired = self._acquired_per_host.get(key)484assert acquired is not None485available = self._limit_per_host - len(acquired)486else:487available = 1488489return available490491async def connect(492self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"493) -> Connection:494"""Get from pool or create new connection."""495key = req.connection_key496available = self._available_connections(key)497498# Wait if there are no available connections or if there are/were499# waiters (i.e. don't steal connection from a waiter about to wake up)500if available <= 0 or key in self._waiters:501fut = self._loop.create_future()502503# This connection will now count towards the limit.504self._waiters[key].append(fut)505506if traces:507for trace in traces:508await trace.send_connection_queued_start()509510try:511await fut512except BaseException as e:513if key in self._waiters:514# remove a waiter even if it was cancelled, normally it's515# removed when it's notified516try:517self._waiters[key].remove(fut)518except ValueError: # fut may no longer be in list519pass520521raise e522finally:523if key in self._waiters and not self._waiters[key]:524del self._waiters[key]525526if traces:527for trace in traces:528await trace.send_connection_queued_end()529530proto = self._get(key)531if proto is None:532placeholder = cast(ResponseHandler, _TransportPlaceholder())533self._acquired.add(placeholder)534self._acquired_per_host[key].add(placeholder)535536if traces:537for trace in traces:538await trace.send_connection_create_start()539540try:541proto = await self._create_connection(req, traces, timeout)542if self._closed:543proto.close()544raise ClientConnectionError("Connector is closed.")545except BaseException:546if not self._closed:547self._acquired.remove(placeholder)548self._drop_acquired_per_host(key, placeholder)549self._release_waiter()550raise551else:552if not self._closed:553self._acquired.remove(placeholder)554self._drop_acquired_per_host(key, placeholder)555556if traces:557for trace in traces:558await trace.send_connection_create_end()559else:560if traces:561# Acquire the connection to prevent race conditions with limits562placeholder = cast(ResponseHandler, _TransportPlaceholder())563self._acquired.add(placeholder)564self._acquired_per_host[key].add(placeholder)565for trace in traces:566await trace.send_connection_reuseconn()567self._acquired.remove(placeholder)568self._drop_acquired_per_host(key, placeholder)569570self._acquired.add(proto)571self._acquired_per_host[key].add(proto)572return Connection(self, key, proto, self._loop)573574def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:575try:576conns = self._conns[key]577except KeyError:578return None579580t1 = self._loop.time()581while conns:582proto, t0 = conns.pop()583if proto.is_connected():584if t1 - t0 > self._keepalive_timeout:585transport = proto.transport586proto.close()587# only for SSL transports588if key.is_ssl and not self._cleanup_closed_disabled:589self._cleanup_closed_transports.append(transport)590else:591if not conns:592# The very last connection was reclaimed: drop the key593del self._conns[key]594return proto595else:596transport = proto.transport597proto.close()598if key.is_ssl and not self._cleanup_closed_disabled:599self._cleanup_closed_transports.append(transport)600601# No more connections: drop the key602del self._conns[key]603return None604605def _release_waiter(self) -> None:606"""607Iterates over all waiters until one to be released is found.608609The one to be released is not finsihed and610belongs to a host that has available connections.611"""612if not self._waiters:613return614615# Having the dict keys ordered this avoids to iterate616# at the same order at each call.617queues = list(self._waiters.keys())618random.shuffle(queues)619620for key in queues:621if self._available_connections(key) < 1:622continue623624waiters = self._waiters[key]625while waiters:626waiter = waiters.popleft()627if not waiter.done():628waiter.set_result(None)629return630631def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:632if self._closed:633# acquired connection is already released on connector closing634return635636try:637self._acquired.remove(proto)638self._drop_acquired_per_host(key, proto)639except KeyError: # pragma: no cover640# this may be result of undetermenistic order of objects641# finalization due garbage collection.642pass643else:644self._release_waiter()645646def _release(647self,648key: "ConnectionKey",649protocol: ResponseHandler,650*,651should_close: bool = False,652) -> None:653if self._closed:654# acquired connection is already released on connector closing655return656657self._release_acquired(key, protocol)658659if self._force_close:660should_close = True661662if should_close or protocol.should_close:663transport = protocol.transport664protocol.close()665666if key.is_ssl and not self._cleanup_closed_disabled:667self._cleanup_closed_transports.append(transport)668else:669conns = self._conns.get(key)670if conns is None:671conns = self._conns[key] = []672conns.append((protocol, self._loop.time()))673674if self._cleanup_handle is None:675self._cleanup_handle = helpers.weakref_handle(676self, "_cleanup", self._keepalive_timeout, self._loop677)678679async def _create_connection(680self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"681) -> ResponseHandler:682raise NotImplementedError()683684685class _DNSCacheTable:686def __init__(self, ttl: Optional[float] = None) -> None:687self._addrs_rr = (688{}689) # type: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]]690self._timestamps = {} # type: Dict[Tuple[str, int], float]691self._ttl = ttl692693def __contains__(self, host: object) -> bool:694return host in self._addrs_rr695696def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:697self._addrs_rr[key] = (cycle(addrs), len(addrs))698699if self._ttl:700self._timestamps[key] = monotonic()701702def remove(self, key: Tuple[str, int]) -> None:703self._addrs_rr.pop(key, None)704705if self._ttl:706self._timestamps.pop(key, None)707708def clear(self) -> None:709self._addrs_rr.clear()710self._timestamps.clear()711712def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:713loop, length = self._addrs_rr[key]714addrs = list(islice(loop, length))715# Consume one more element to shift internal state of `cycle`716next(loop)717return addrs718719def expired(self, key: Tuple[str, int]) -> bool:720if self._ttl is None:721return False722723return self._timestamps[key] + self._ttl < monotonic()724725726class TCPConnector(BaseConnector):727"""TCP connector.728729verify_ssl - Set to True to check ssl certifications.730fingerprint - Pass the binary sha256731digest of the expected certificate in DER format to verify732that the certificate the server presents matches. See also733https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning734resolver - Enable DNS lookups and use this735resolver736use_dns_cache - Use memory cache for DNS lookups.737ttl_dns_cache - Max seconds having cached a DNS entry, None forever.738family - socket address family739local_addr - local tuple of (host, port) to bind socket to740741keepalive_timeout - (optional) Keep-alive timeout.742force_close - Set to True to force close and do reconnect743after each request (and between redirects).744limit - The total number of simultaneous connections.745limit_per_host - Number of simultaneous connections to one host.746enable_cleanup_closed - Enables clean-up closed ssl transports.747Disabled by default.748loop - Optional event loop.749"""750751def __init__(752self,753*,754verify_ssl: bool = True,755fingerprint: Optional[bytes] = None,756use_dns_cache: bool = True,757ttl_dns_cache: Optional[int] = 10,758family: int = 0,759ssl_context: Optional[SSLContext] = None,760ssl: Union[None, bool, Fingerprint, SSLContext] = None,761local_addr: Optional[Tuple[str, int]] = None,762resolver: Optional[AbstractResolver] = None,763keepalive_timeout: Union[None, float, object] = sentinel,764force_close: bool = False,765limit: int = 100,766limit_per_host: int = 0,767enable_cleanup_closed: bool = False,768loop: Optional[asyncio.AbstractEventLoop] = None,769):770super().__init__(771keepalive_timeout=keepalive_timeout,772force_close=force_close,773limit=limit,774limit_per_host=limit_per_host,775enable_cleanup_closed=enable_cleanup_closed,776loop=loop,777)778779self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)780if resolver is None:781resolver = DefaultResolver(loop=self._loop)782self._resolver = resolver783784self._use_dns_cache = use_dns_cache785self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)786self._throttle_dns_events = (787{}788) # type: Dict[Tuple[str, int], EventResultOrError]789self._family = family790self._local_addr = local_addr791792def close(self) -> Awaitable[None]:793"""Close all ongoing DNS calls."""794for ev in self._throttle_dns_events.values():795ev.cancel()796797return super().close()798799@property800def family(self) -> int:801"""Socket family like AF_INET."""802return self._family803804@property805def use_dns_cache(self) -> bool:806"""True if local DNS caching is enabled."""807return self._use_dns_cache808809def clear_dns_cache(810self, host: Optional[str] = None, port: Optional[int] = None811) -> None:812"""Remove specified host/port or clear all dns local cache."""813if host is not None and port is not None:814self._cached_hosts.remove((host, port))815elif host is not None or port is not None:816raise ValueError("either both host and port " "or none of them are allowed")817else:818self._cached_hosts.clear()819820async def _resolve_host(821self, host: str, port: int, traces: Optional[List["Trace"]] = None822) -> List[Dict[str, Any]]:823if is_ip_address(host):824return [825{826"hostname": host,827"host": host,828"port": port,829"family": self._family,830"proto": 0,831"flags": 0,832}833]834835if not self._use_dns_cache:836837if traces:838for trace in traces:839await trace.send_dns_resolvehost_start(host)840841res = await self._resolver.resolve(host, port, family=self._family)842843if traces:844for trace in traces:845await trace.send_dns_resolvehost_end(host)846847return res848849key = (host, port)850851if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):852# get result early, before any await (#4014)853result = self._cached_hosts.next_addrs(key)854855if traces:856for trace in traces:857await trace.send_dns_cache_hit(host)858return result859860if key in self._throttle_dns_events:861# get event early, before any await (#4014)862event = self._throttle_dns_events[key]863if traces:864for trace in traces:865await trace.send_dns_cache_hit(host)866await event.wait()867else:868# update dict early, before any await (#4014)869self._throttle_dns_events[key] = EventResultOrError(self._loop)870if traces:871for trace in traces:872await trace.send_dns_cache_miss(host)873try:874875if traces:876for trace in traces:877await trace.send_dns_resolvehost_start(host)878879addrs = await self._resolver.resolve(host, port, family=self._family)880if traces:881for trace in traces:882await trace.send_dns_resolvehost_end(host)883884self._cached_hosts.add(key, addrs)885self._throttle_dns_events[key].set()886except BaseException as e:887# any DNS exception, independently of the implementation888# is set for the waiters to raise the same exception.889self._throttle_dns_events[key].set(exc=e)890raise891finally:892self._throttle_dns_events.pop(key)893894return self._cached_hosts.next_addrs(key)895896async def _create_connection(897self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"898) -> ResponseHandler:899"""Create connection.900901Has same keyword arguments as BaseEventLoop.create_connection.902"""903if req.proxy:904_, proto = await self._create_proxy_connection(req, traces, timeout)905else:906_, proto = await self._create_direct_connection(req, traces, timeout)907908return proto909910@staticmethod911@functools.lru_cache(None)912def _make_ssl_context(verified: bool) -> SSLContext:913if verified:914return ssl.create_default_context()915else:916sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)917sslcontext.options |= ssl.OP_NO_SSLv2918sslcontext.options |= ssl.OP_NO_SSLv3919sslcontext.check_hostname = False920sslcontext.verify_mode = ssl.CERT_NONE921try:922sslcontext.options |= ssl.OP_NO_COMPRESSION923except AttributeError as attr_err:924warnings.warn(925"{!s}: The Python interpreter is compiled "926"against OpenSSL < 1.0.0. Ref: "927"https://docs.python.org/3/library/ssl.html"928"#ssl.OP_NO_COMPRESSION".format(attr_err),929)930sslcontext.set_default_verify_paths()931return sslcontext932933def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]:934"""Logic to get the correct SSL context9359360. if req.ssl is false, return None9379381. if ssl_context is specified in req, use it9392. if _ssl_context is specified in self, use it9403. otherwise:9411. if verify_ssl is not specified in req, use self.ssl_context942(will generate a default context according to self.verify_ssl)9432. if verify_ssl is True in req, generate a default SSL context9443. if verify_ssl is False in req, generate a SSL context that945won't verify946"""947if req.is_ssl():948if ssl is None: # pragma: no cover949raise RuntimeError("SSL is not supported.")950sslcontext = req.ssl951if isinstance(sslcontext, ssl.SSLContext):952return sslcontext953if sslcontext is not None:954# not verified or fingerprinted955return self._make_ssl_context(False)956sslcontext = self._ssl957if isinstance(sslcontext, ssl.SSLContext):958return sslcontext959if sslcontext is not None:960# not verified or fingerprinted961return self._make_ssl_context(False)962return self._make_ssl_context(True)963else:964return None965966def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]:967ret = req.ssl968if isinstance(ret, Fingerprint):969return ret970ret = self._ssl971if isinstance(ret, Fingerprint):972return ret973return None974975async def _wrap_create_connection(976self,977*args: Any,978req: "ClientRequest",979timeout: "ClientTimeout",980client_error: Type[Exception] = ClientConnectorError,981**kwargs: Any,982) -> Tuple[asyncio.Transport, ResponseHandler]:983try:984async with ceil_timeout(timeout.sock_connect):985return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa986except cert_errors as exc:987raise ClientConnectorCertificateError(req.connection_key, exc) from exc988except ssl_errors as exc:989raise ClientConnectorSSLError(req.connection_key, exc) from exc990except OSError as exc:991raise client_error(req.connection_key, exc) from exc992993def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:994"""Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.995996One case is that :py:meth:`asyncio.loop.start_tls` is not yet997implemented under Python 3.6. It is necessary for TLS-in-TLS so998that it is possible to send HTTPS queries through HTTPS proxies.9991000This doesn't affect regular HTTP requests, though.1001"""1002if not req.is_ssl():1003return10041005proxy_url = req.proxy1006assert proxy_url is not None1007if proxy_url.scheme != "https":1008return10091010self._check_loop_for_start_tls()10111012def _check_loop_for_start_tls(self) -> None:1013try:1014self._loop.start_tls1015except AttributeError as attr_exc:1016raise RuntimeError(1017"An HTTPS request is being sent through an HTTPS proxy. "1018"This needs support for TLS in TLS but it is not implemented "1019"in your runtime for the stdlib asyncio.\n\n"1020"Please upgrade to Python 3.7 or higher. For more details, "1021"please see:\n"1022"* https://bugs.python.org/issue37179\n"1023"* https://github.com/python/cpython/pull/28073\n"1024"* https://docs.aiohttp.org/en/stable/"1025"client_advanced.html#proxy-support\n"1026"* https://github.com/aio-libs/aiohttp/discussions/6044\n",1027) from attr_exc10281029def _loop_supports_start_tls(self) -> bool:1030try:1031self._check_loop_for_start_tls()1032except RuntimeError:1033return False1034else:1035return True10361037def _warn_about_tls_in_tls(1038self,1039underlying_transport: asyncio.Transport,1040req: "ClientRequest",1041) -> None:1042"""Issue a warning if the requested URL has HTTPS scheme."""1043if req.request_info.url.scheme != "https":1044return10451046asyncio_supports_tls_in_tls = getattr(1047underlying_transport,1048"_start_tls_compatible",1049False,1050)10511052if asyncio_supports_tls_in_tls:1053return10541055warnings.warn(1056"An HTTPS request is being sent through an HTTPS proxy. "1057"This support for TLS in TLS is known to be disabled "1058"in the stdlib asyncio. This is why you'll probably see "1059"an error in the log below.\n\n"1060"It is possible to enable it via monkeypatching under "1061"Python 3.7 or higher. For more details, see:\n"1062"* https://bugs.python.org/issue37179\n"1063"* https://github.com/python/cpython/pull/28073\n\n"1064"You can temporarily patch this as follows:\n"1065"* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"1066"* https://github.com/aio-libs/aiohttp/discussions/6044\n",1067RuntimeWarning,1068source=self,1069# Why `4`? At least 3 of the calls in the stack originate1070# from the methods in this class.1071stacklevel=3,1072)10731074async def _start_tls_connection(1075self,1076underlying_transport: asyncio.Transport,1077req: "ClientRequest",1078timeout: "ClientTimeout",1079client_error: Type[Exception] = ClientConnectorError,1080) -> Tuple[asyncio.BaseTransport, ResponseHandler]:1081"""Wrap the raw TCP transport with TLS."""1082tls_proto = self._factory() # Create a brand new proto for TLS10831084# Safety of the `cast()` call here is based on the fact that1085# internally `_get_ssl_context()` only returns `None` when1086# `req.is_ssl()` evaluates to `False` which is never gonna happen1087# in this code path. Of course, it's rather fragile1088# maintainability-wise but this is to be solved separately.1089sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))10901091try:1092async with ceil_timeout(timeout.sock_connect):1093try:1094tls_transport = await self._loop.start_tls(1095underlying_transport,1096tls_proto,1097sslcontext,1098server_hostname=req.host,1099ssl_handshake_timeout=timeout.total,1100)1101except BaseException:1102# We need to close the underlying transport since1103# `start_tls()` probably failed before it had a1104# chance to do this:1105underlying_transport.close()1106raise1107except cert_errors as exc:1108raise ClientConnectorCertificateError(req.connection_key, exc) from exc1109except ssl_errors as exc:1110raise ClientConnectorSSLError(req.connection_key, exc) from exc1111except OSError as exc:1112raise client_error(req.connection_key, exc) from exc1113except TypeError as type_err:1114# Example cause looks like this:1115# TypeError: transport <asyncio.sslproto._SSLProtocolTransport1116# object at 0x7f760615e460> is not supported by start_tls()11171118raise ClientConnectionError(1119"Cannot initialize a TLS-in-TLS connection to host "1120f"{req.host!s}:{req.port:d} through an underlying connection "1121f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "1122f"[{type_err!s}]"1123) from type_err1124else:1125tls_proto.connection_made(1126tls_transport1127) # Kick the state machine of the new TLS protocol11281129return tls_transport, tls_proto11301131async def _create_direct_connection(1132self,1133req: "ClientRequest",1134traces: List["Trace"],1135timeout: "ClientTimeout",1136*,1137client_error: Type[Exception] = ClientConnectorError,1138) -> Tuple[asyncio.Transport, ResponseHandler]:1139sslcontext = self._get_ssl_context(req)1140fingerprint = self._get_fingerprint(req)11411142host = req.url.raw_host1143assert host is not None1144port = req.port1145assert port is not None1146host_resolved = asyncio.ensure_future(1147self._resolve_host(host, port, traces=traces), loop=self._loop1148)1149try:1150# Cancelling this lookup should not cancel the underlying lookup1151# or else the cancel event will get broadcast to all the waiters1152# across all connections.1153hosts = await asyncio.shield(host_resolved)1154except asyncio.CancelledError:11551156def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:1157with suppress(Exception, asyncio.CancelledError):1158fut.result()11591160host_resolved.add_done_callback(drop_exception)1161raise1162except OSError as exc:1163# in case of proxy it is not ClientProxyConnectionError1164# it is problem of resolving proxy ip itself1165raise ClientConnectorError(req.connection_key, exc) from exc11661167last_exc = None # type: Optional[Exception]11681169for hinfo in hosts:1170host = hinfo["host"]1171port = hinfo["port"]11721173try:1174transp, proto = await self._wrap_create_connection(1175self._factory,1176host,1177port,1178timeout=timeout,1179ssl=sslcontext,1180family=hinfo["family"],1181proto=hinfo["proto"],1182flags=hinfo["flags"],1183server_hostname=hinfo["hostname"] if sslcontext else None,1184local_addr=self._local_addr,1185req=req,1186client_error=client_error,1187)1188except ClientConnectorError as exc:1189last_exc = exc1190continue11911192if req.is_ssl() and fingerprint:1193try:1194fingerprint.check(transp)1195except ServerFingerprintMismatch as exc:1196transp.close()1197if not self._cleanup_closed_disabled:1198self._cleanup_closed_transports.append(transp)1199last_exc = exc1200continue12011202return transp, proto1203else:1204assert last_exc is not None1205raise last_exc12061207async def _create_proxy_connection(1208self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"1209) -> Tuple[asyncio.BaseTransport, ResponseHandler]:1210self._fail_on_no_start_tls(req)1211runtime_has_start_tls = self._loop_supports_start_tls()12121213headers = {} # type: Dict[str, str]1214if req.proxy_headers is not None:1215headers = req.proxy_headers # type: ignore[assignment]1216headers[hdrs.HOST] = req.headers[hdrs.HOST]12171218url = req.proxy1219assert url is not None1220proxy_req = ClientRequest(1221hdrs.METH_GET,1222url,1223headers=headers,1224auth=req.proxy_auth,1225loop=self._loop,1226ssl=req.ssl,1227)12281229# create connection to proxy server1230transport, proto = await self._create_direct_connection(1231proxy_req, [], timeout, client_error=ClientProxyConnectionError1232)12331234# Many HTTP proxies has buggy keepalive support. Let's not1235# reuse connection but close it after processing every1236# response.1237proto.force_close()12381239auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)1240if auth is not None:1241if not req.is_ssl():1242req.headers[hdrs.PROXY_AUTHORIZATION] = auth1243else:1244proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth12451246if req.is_ssl():1247if runtime_has_start_tls:1248self._warn_about_tls_in_tls(transport, req)12491250# For HTTPS requests over HTTP proxy1251# we must notify proxy to tunnel connection1252# so we send CONNECT command:1253# CONNECT www.python.org:443 HTTP/1.11254# Host: www.python.org1255#1256# next we must do TLS handshake and so on1257# to do this we must wrap raw socket into secure one1258# asyncio handles this perfectly1259proxy_req.method = hdrs.METH_CONNECT1260proxy_req.url = req.url1261key = attr.evolve(1262req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None1263)1264conn = Connection(self, key, proto, self._loop)1265proxy_resp = await proxy_req.send(conn)1266try:1267protocol = conn._protocol1268assert protocol is not None12691270# read_until_eof=True will ensure the connection isn't closed1271# once the response is received and processed allowing1272# START_TLS to work on the connection below.1273protocol.set_response_params(read_until_eof=runtime_has_start_tls)1274resp = await proxy_resp.start(conn)1275except BaseException:1276proxy_resp.close()1277conn.close()1278raise1279else:1280conn._protocol = None1281conn._transport = None1282try:1283if resp.status != 200:1284message = resp.reason1285if message is None:1286message = RESPONSES[resp.status][0]1287raise ClientHttpProxyError(1288proxy_resp.request_info,1289resp.history,1290status=resp.status,1291message=message,1292headers=resp.headers,1293)1294if not runtime_has_start_tls:1295rawsock = transport.get_extra_info("socket", default=None)1296if rawsock is None:1297raise RuntimeError(1298"Transport does not expose socket instance"1299)1300# Duplicate the socket, so now we can close proxy transport1301rawsock = rawsock.dup()1302except BaseException:1303# It shouldn't be closed in `finally` because it's fed to1304# `loop.start_tls()` and the docs say not to touch it after1305# passing there.1306transport.close()1307raise1308finally:1309if not runtime_has_start_tls:1310transport.close()13111312if not runtime_has_start_tls:1313# HTTP proxy with support for upgrade to HTTPS1314sslcontext = self._get_ssl_context(req)1315return await self._wrap_create_connection(1316self._factory,1317timeout=timeout,1318ssl=sslcontext,1319sock=rawsock,1320server_hostname=req.host,1321req=req,1322)13231324return await self._start_tls_connection(1325# Access the old transport for the last time before it's1326# closed and forgotten forever:1327transport,1328req=req,1329timeout=timeout,1330)1331finally:1332proxy_resp.close()13331334return transport, proto133513361337class UnixConnector(BaseConnector):1338"""Unix socket connector.13391340path - Unix socket path.1341keepalive_timeout - (optional) Keep-alive timeout.1342force_close - Set to True to force close and do reconnect1343after each request (and between redirects).1344limit - The total number of simultaneous connections.1345limit_per_host - Number of simultaneous connections to one host.1346loop - Optional event loop.1347"""13481349def __init__(1350self,1351path: str,1352force_close: bool = False,1353keepalive_timeout: Union[object, float, None] = sentinel,1354limit: int = 100,1355limit_per_host: int = 0,1356loop: Optional[asyncio.AbstractEventLoop] = None,1357) -> None:1358super().__init__(1359force_close=force_close,1360keepalive_timeout=keepalive_timeout,1361limit=limit,1362limit_per_host=limit_per_host,1363loop=loop,1364)1365self._path = path13661367@property1368def path(self) -> str:1369"""Path to unix socket."""1370return self._path13711372async def _create_connection(1373self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"1374) -> ResponseHandler:1375try:1376async with ceil_timeout(timeout.sock_connect):1377_, proto = await self._loop.create_unix_connection(1378self._factory, self._path1379)1380except OSError as exc:1381raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc13821383return cast(ResponseHandler, proto)138413851386class NamedPipeConnector(BaseConnector):1387"""Named pipe connector.13881389Only supported by the proactor event loop.1390See also: https://docs.python.org/3.7/library/asyncio-eventloop.html13911392path - Windows named pipe path.1393keepalive_timeout - (optional) Keep-alive timeout.1394force_close - Set to True to force close and do reconnect1395after each request (and between redirects).1396limit - The total number of simultaneous connections.1397limit_per_host - Number of simultaneous connections to one host.1398loop - Optional event loop.1399"""14001401def __init__(1402self,1403path: str,1404force_close: bool = False,1405keepalive_timeout: Union[object, float, None] = sentinel,1406limit: int = 100,1407limit_per_host: int = 0,1408loop: Optional[asyncio.AbstractEventLoop] = None,1409) -> None:1410super().__init__(1411force_close=force_close,1412keepalive_timeout=keepalive_timeout,1413limit=limit,1414limit_per_host=limit_per_host,1415loop=loop,1416)1417if not isinstance(1418self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]1419):1420raise RuntimeError(1421"Named Pipes only available in proactor " "loop under windows"1422)1423self._path = path14241425@property1426def path(self) -> str:1427"""Path to the named pipe."""1428return self._path14291430async def _create_connection(1431self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"1432) -> ResponseHandler:1433try:1434async with ceil_timeout(timeout.sock_connect):1435_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E5011436self._factory, self._path1437)1438# the drain is required so that the connection_made is called1439# and transport is set otherwise it is not set before the1440# `assert conn.transport is not None`1441# in client.py's _request method1442await asyncio.sleep(0)1443# other option is to manually set transport like1444# `proto.transport = trans`1445except OSError as exc:1446raise ClientConnectorError(req.connection_key, exc) from exc14471448return cast(ResponseHandler, proto)144914501451