Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/helpers.py
7763 views
"""Various helper functions"""12import asyncio3import base644import binascii5import cgi6import datetime7import functools8import inspect9import netrc10import os11import platform12import re13import sys14import time15import warnings16import weakref17from collections import namedtuple18from contextlib import suppress19from email.utils import parsedate20from math import ceil21from pathlib import Path22from types import TracebackType23from typing import (24Any,25Callable,26ContextManager,27Dict,28Generator,29Generic,30Iterable,31Iterator,32List,33Mapping,34Optional,35Pattern,36Set,37Tuple,38Type,39TypeVar,40Union,41cast,42)43from urllib.parse import quote44from urllib.request import getproxies, proxy_bypass4546import async_timeout47import attr48from multidict import MultiDict, MultiDictProxy49from yarl import URL5051from . import hdrs52from .log import client_logger, internal_logger53from .typedefs import PathLike, Protocol # noqa5455__all__ = ("BasicAuth", "ChainMapProxy", "ETag")5657IS_MACOS = platform.system() == "Darwin"58IS_WINDOWS = platform.system() == "Windows"5960PY_36 = sys.version_info >= (3, 6)61PY_37 = sys.version_info >= (3, 7)62PY_38 = sys.version_info >= (3, 8)63PY_310 = sys.version_info >= (3, 10)6465if sys.version_info < (3, 7):66import idna_ssl6768idna_ssl.patch_match_hostname()6970def all_tasks(71loop: Optional[asyncio.AbstractEventLoop] = None,72) -> Set["asyncio.Task[Any]"]:73tasks = list(asyncio.Task.all_tasks(loop))74return {t for t in tasks if not t.done()}757677else:78all_tasks = asyncio.all_tasks798081_T = TypeVar("_T")82_S = TypeVar("_S")838485sentinel = object() # type: Any86NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS")) # type: bool8788# N.B. sys.flags.dev_mode is available on Python 3.7+, use getattr89# for compatibility with older versions90DEBUG = getattr(sys.flags, "dev_mode", False) or (91not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG"))92) # type: bool939495CHAR = {chr(i) for i in range(0, 128)}96CTL = {chr(i) for i in range(0, 32)} | {97chr(127),98}99SEPARATORS = {100"(",101")",102"<",103">",104"@",105",",106";",107":",108"\\",109'"',110"/",111"[",112"]",113"?",114"=",115"{",116"}",117" ",118chr(9),119}120TOKEN = CHAR ^ CTL ^ SEPARATORS121122123class noop:124def __await__(self) -> Generator[None, None, None]:125yield126127128class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])):129"""Http basic authentication helper."""130131def __new__(132cls, login: str, password: str = "", encoding: str = "latin1"133) -> "BasicAuth":134if login is None:135raise ValueError("None is not allowed as login value")136137if password is None:138raise ValueError("None is not allowed as password value")139140if ":" in login:141raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)')142143return super().__new__(cls, login, password, encoding)144145@classmethod146def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth":147"""Create a BasicAuth object from an Authorization HTTP header."""148try:149auth_type, encoded_credentials = auth_header.split(" ", 1)150except ValueError:151raise ValueError("Could not parse authorization header.")152153if auth_type.lower() != "basic":154raise ValueError("Unknown authorization method %s" % auth_type)155156try:157decoded = base64.b64decode(158encoded_credentials.encode("ascii"), validate=True159).decode(encoding)160except binascii.Error:161raise ValueError("Invalid base64 encoding.")162163try:164# RFC 2617 HTTP Authentication165# https://www.ietf.org/rfc/rfc2617.txt166# the colon must be present, but the username and password may be167# otherwise blank.168username, password = decoded.split(":", 1)169except ValueError:170raise ValueError("Invalid credentials.")171172return cls(username, password, encoding=encoding)173174@classmethod175def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]:176"""Create BasicAuth from url."""177if not isinstance(url, URL):178raise TypeError("url should be yarl.URL instance")179if url.user is None:180return None181return cls(url.user, url.password or "", encoding=encoding)182183def encode(self) -> str:184"""Encode credentials."""185creds = (f"{self.login}:{self.password}").encode(self.encoding)186return "Basic %s" % base64.b64encode(creds).decode(self.encoding)187188189def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:190auth = BasicAuth.from_url(url)191if auth is None:192return url, None193else:194return url.with_user(None), auth195196197def netrc_from_env() -> Optional[netrc.netrc]:198"""Load netrc from file.199200Attempt to load it from the path specified by the env-var201NETRC or in the default location in the user's home directory.202203Returns None if it couldn't be found or fails to parse.204"""205netrc_env = os.environ.get("NETRC")206207if netrc_env is not None:208netrc_path = Path(netrc_env)209else:210try:211home_dir = Path.home()212except RuntimeError as e: # pragma: no cover213# if pathlib can't resolve home, it may raise a RuntimeError214client_logger.debug(215"Could not resolve home directory when "216"trying to look for .netrc file: %s",217e,218)219return None220221netrc_path = home_dir / ("_netrc" if IS_WINDOWS else ".netrc")222223try:224return netrc.netrc(str(netrc_path))225except netrc.NetrcParseError as e:226client_logger.warning("Could not parse .netrc file: %s", e)227except OSError as e:228# we couldn't read the file (doesn't exist, permissions, etc.)229if netrc_env or netrc_path.is_file():230# only warn if the environment wanted us to load it,231# or it appears like the default file does actually exist232client_logger.warning("Could not read .netrc file: %s", e)233234return None235236237@attr.s(auto_attribs=True, frozen=True, slots=True)238class ProxyInfo:239proxy: URL240proxy_auth: Optional[BasicAuth]241242243def proxies_from_env() -> Dict[str, ProxyInfo]:244proxy_urls = {245k: URL(v)246for k, v in getproxies().items()247if k in ("http", "https", "ws", "wss")248}249netrc_obj = netrc_from_env()250stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()}251ret = {}252for proto, val in stripped.items():253proxy, auth = val254if proxy.scheme in ("https", "wss"):255client_logger.warning(256"%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy257)258continue259if netrc_obj and auth is None:260auth_from_netrc = None261if proxy.host is not None:262auth_from_netrc = netrc_obj.authenticators(proxy.host)263if auth_from_netrc is not None:264# auth_from_netrc is a (`user`, `account`, `password`) tuple,265# `user` and `account` both can be username,266# if `user` is None, use `account`267*logins, password = auth_from_netrc268login = logins[0] if logins[0] else logins[-1]269auth = BasicAuth(cast(str, login), cast(str, password))270ret[proto] = ProxyInfo(proxy, auth)271return ret272273274def current_task(275loop: Optional[asyncio.AbstractEventLoop] = None,276) -> "Optional[asyncio.Task[Any]]":277if sys.version_info >= (3, 7):278return asyncio.current_task(loop=loop)279else:280return asyncio.Task.current_task(loop=loop)281282283def get_running_loop(284loop: Optional[asyncio.AbstractEventLoop] = None,285) -> asyncio.AbstractEventLoop:286if loop is None:287loop = asyncio.get_event_loop()288if not loop.is_running():289warnings.warn(290"The object should be created within an async function",291DeprecationWarning,292stacklevel=3,293)294if loop.get_debug():295internal_logger.warning(296"The object should be created within an async function", stack_info=True297)298return loop299300301def isasyncgenfunction(obj: Any) -> bool:302func = getattr(inspect, "isasyncgenfunction", None)303if func is not None:304return func(obj) # type: ignore[no-any-return]305else:306return False307308309def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:310"""Get a permitted proxy for the given URL from the env."""311if url.host is not None and proxy_bypass(url.host):312raise LookupError(f"Proxying is disallowed for `{url.host!r}`")313314proxies_in_env = proxies_from_env()315try:316proxy_info = proxies_in_env[url.scheme]317except KeyError:318raise LookupError(f"No proxies found for `{url!s}` in the env")319else:320return proxy_info.proxy, proxy_info.proxy_auth321322323@attr.s(auto_attribs=True, frozen=True, slots=True)324class MimeType:325type: str326subtype: str327suffix: str328parameters: "MultiDictProxy[str]"329330331@functools.lru_cache(maxsize=56)332def parse_mimetype(mimetype: str) -> MimeType:333"""Parses a MIME type into its components.334335mimetype is a MIME type string.336337Returns a MimeType object.338339Example:340341>>> parse_mimetype('text/html; charset=utf-8')342MimeType(type='text', subtype='html', suffix='',343parameters={'charset': 'utf-8'})344345"""346if not mimetype:347return MimeType(348type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict())349)350351parts = mimetype.split(";")352params = MultiDict() # type: MultiDict[str]353for item in parts[1:]:354if not item:355continue356key, value = cast(357Tuple[str, str], item.split("=", 1) if "=" in item else (item, "")358)359params.add(key.lower().strip(), value.strip(' "'))360361fulltype = parts[0].strip().lower()362if fulltype == "*":363fulltype = "*/*"364365mtype, stype = (366cast(Tuple[str, str], fulltype.split("/", 1))367if "/" in fulltype368else (fulltype, "")369)370stype, suffix = (371cast(Tuple[str, str], stype.split("+", 1)) if "+" in stype else (stype, "")372)373374return MimeType(375type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params)376)377378379def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]:380name = getattr(obj, "name", None)381if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">":382return Path(name).name383return default384385386not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]")387QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"}388389390def quoted_string(content: str) -> str:391"""Return 7-bit content as quoted-string.392393Format content into a quoted-string as defined in RFC5322 for394Internet Message Format. Notice that this is not the 8-bit HTTP395format, but the 7-bit email format. Content must be in usascii or396a ValueError is raised.397"""398if not (QCONTENT > set(content)):399raise ValueError(f"bad content for quoted-string {content!r}")400return not_qtext_re.sub(lambda x: "\\" + x.group(0), content)401402403def content_disposition_header(404disptype: str, quote_fields: bool = True, _charset: str = "utf-8", **params: str405) -> str:406"""Sets ``Content-Disposition`` header for MIME.407408This is the MIME payload Content-Disposition header from RFC 2183409and RFC 7579 section 4.2, not the HTTP Content-Disposition from410RFC 6266.411412disptype is a disposition type: inline, attachment, form-data.413Should be valid extension token (see RFC 2183)414415quote_fields performs value quoting to 7-bit MIME headers416according to RFC 7578. Set to quote_fields to False if recipient417can take 8-bit file names and field values.418419_charset specifies the charset to use when quote_fields is True.420421params is a dict with disposition params.422"""423if not disptype or not (TOKEN > set(disptype)):424raise ValueError("bad content disposition type {!r}" "".format(disptype))425426value = disptype427if params:428lparams = []429for key, val in params.items():430if not key or not (TOKEN > set(key)):431raise ValueError(432"bad content disposition parameter" " {!r}={!r}".format(key, val)433)434if quote_fields:435if key.lower() == "filename":436qval = quote(val, "", encoding=_charset)437lparams.append((key, '"%s"' % qval))438else:439try:440qval = quoted_string(val)441except ValueError:442qval = "".join(443(_charset, "''", quote(val, "", encoding=_charset))444)445lparams.append((key + "*", qval))446else:447lparams.append((key, '"%s"' % qval))448else:449qval = val.replace("\\", "\\\\").replace('"', '\\"')450lparams.append((key, '"%s"' % qval))451sparams = "; ".join("=".join(pair) for pair in lparams)452value = "; ".join((value, sparams))453return value454455456class _TSelf(Protocol, Generic[_T]):457_cache: Dict[str, _T]458459460class reify(Generic[_T]):461"""Use as a class method decorator.462463It operates almost exactly like464the Python `@property` decorator, but it puts the result of the465method it decorates into the instance dict after the first call,466effectively replacing the function it decorates with an instance467variable. It is, in Python parlance, a data descriptor.468"""469470def __init__(self, wrapped: Callable[..., _T]) -> None:471self.wrapped = wrapped472self.__doc__ = wrapped.__doc__473self.name = wrapped.__name__474475def __get__(self, inst: _TSelf[_T], owner: Optional[Type[Any]] = None) -> _T:476try:477try:478return inst._cache[self.name]479except KeyError:480val = self.wrapped(inst)481inst._cache[self.name] = val482return val483except AttributeError:484if inst is None:485return self486raise487488def __set__(self, inst: _TSelf[_T], value: _T) -> None:489raise AttributeError("reified property is read-only")490491492reify_py = reify493494try:495from ._helpers import reify as reify_c496497if not NO_EXTENSIONS:498reify = reify_c # type: ignore[misc,assignment]499except ImportError:500pass501502_ipv4_pattern = (503r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"504r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"505)506_ipv6_pattern = (507r"^(?:(?:(?:[A-F0-9]{1,4}:){6}|(?=(?:[A-F0-9]{0,4}:){0,6}"508r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}$)(([0-9A-F]{1,4}:){0,5}|:)"509r"((:[0-9A-F]{1,4}){1,5}:|:)|::(?:[A-F0-9]{1,4}:){5})"510r"(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}"511r"(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|(?:[A-F0-9]{1,4}:){7}"512r"[A-F0-9]{1,4}|(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}$)"513r"(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)|(?:[A-F0-9]{1,4}:){7}"514r":|:(:[A-F0-9]{1,4}){7})$"515)516_ipv4_regex = re.compile(_ipv4_pattern)517_ipv6_regex = re.compile(_ipv6_pattern, flags=re.IGNORECASE)518_ipv4_regexb = re.compile(_ipv4_pattern.encode("ascii"))519_ipv6_regexb = re.compile(_ipv6_pattern.encode("ascii"), flags=re.IGNORECASE)520521522def _is_ip_address(523regex: Pattern[str], regexb: Pattern[bytes], host: Optional[Union[str, bytes]]524) -> bool:525if host is None:526return False527if isinstance(host, str):528return bool(regex.match(host))529elif isinstance(host, (bytes, bytearray, memoryview)):530return bool(regexb.match(host))531else:532raise TypeError(f"{host} [{type(host)}] is not a str or bytes")533534535is_ipv4_address = functools.partial(_is_ip_address, _ipv4_regex, _ipv4_regexb)536is_ipv6_address = functools.partial(_is_ip_address, _ipv6_regex, _ipv6_regexb)537538539def is_ip_address(host: Optional[Union[str, bytes, bytearray, memoryview]]) -> bool:540return is_ipv4_address(host) or is_ipv6_address(host)541542543def next_whole_second() -> datetime.datetime:544"""Return current time rounded up to the next whole second."""545return datetime.datetime.now(datetime.timezone.utc).replace(546microsecond=0547) + datetime.timedelta(seconds=0)548549550_cached_current_datetime = None # type: Optional[int]551_cached_formatted_datetime = ""552553554def rfc822_formatted_time() -> str:555global _cached_current_datetime556global _cached_formatted_datetime557558now = int(time.time())559if now != _cached_current_datetime:560# Weekday and month names for HTTP date/time formatting;561# always English!562# Tuples are constants stored in codeobject!563_weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")564_monthname = (565"", # Dummy so we can use 1-based month numbers566"Jan",567"Feb",568"Mar",569"Apr",570"May",571"Jun",572"Jul",573"Aug",574"Sep",575"Oct",576"Nov",577"Dec",578)579580year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now)581_cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (582_weekdayname[wd],583day,584_monthname[month],585year,586hh,587mm,588ss,589)590_cached_current_datetime = now591return _cached_formatted_datetime592593594def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:595ref, name = info596ob = ref()597if ob is not None:598with suppress(Exception):599getattr(ob, name)()600601602def weakref_handle(603ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop604) -> Optional[asyncio.TimerHandle]:605if timeout is not None and timeout > 0:606when = loop.time() + timeout607if timeout >= 5:608when = ceil(when)609610return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))611return None612613614def call_later(615cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop616) -> Optional[asyncio.TimerHandle]:617if timeout is not None and timeout > 0:618when = loop.time() + timeout619if timeout > 5:620when = ceil(when)621return loop.call_at(when, cb)622return None623624625class TimeoutHandle:626"""Timeout handle"""627628def __init__(629self, loop: asyncio.AbstractEventLoop, timeout: Optional[float]630) -> None:631self._timeout = timeout632self._loop = loop633self._callbacks = (634[]635) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]]636637def register(638self, callback: Callable[..., None], *args: Any, **kwargs: Any639) -> None:640self._callbacks.append((callback, args, kwargs))641642def close(self) -> None:643self._callbacks.clear()644645def start(self) -> Optional[asyncio.Handle]:646timeout = self._timeout647if timeout is not None and timeout > 0:648when = self._loop.time() + timeout649if timeout >= 5:650when = ceil(when)651return self._loop.call_at(when, self.__call__)652else:653return None654655def timer(self) -> "BaseTimerContext":656if self._timeout is not None and self._timeout > 0:657timer = TimerContext(self._loop)658self.register(timer.timeout)659return timer660else:661return TimerNoop()662663def __call__(self) -> None:664for cb, args, kwargs in self._callbacks:665with suppress(Exception):666cb(*args, **kwargs)667668self._callbacks.clear()669670671class BaseTimerContext(ContextManager["BaseTimerContext"]):672pass673674675class TimerNoop(BaseTimerContext):676def __enter__(self) -> BaseTimerContext:677return self678679def __exit__(680self,681exc_type: Optional[Type[BaseException]],682exc_val: Optional[BaseException],683exc_tb: Optional[TracebackType],684) -> None:685return686687688class TimerContext(BaseTimerContext):689"""Low resolution timeout context manager"""690691def __init__(self, loop: asyncio.AbstractEventLoop) -> None:692self._loop = loop693self._tasks = [] # type: List[asyncio.Task[Any]]694self._cancelled = False695696def __enter__(self) -> BaseTimerContext:697task = current_task(loop=self._loop)698699if task is None:700raise RuntimeError(701"Timeout context manager should be used " "inside a task"702)703704if self._cancelled:705raise asyncio.TimeoutError from None706707self._tasks.append(task)708return self709710def __exit__(711self,712exc_type: Optional[Type[BaseException]],713exc_val: Optional[BaseException],714exc_tb: Optional[TracebackType],715) -> Optional[bool]:716if self._tasks:717self._tasks.pop()718719if exc_type is asyncio.CancelledError and self._cancelled:720raise asyncio.TimeoutError from None721return None722723def timeout(self) -> None:724if not self._cancelled:725for task in set(self._tasks):726task.cancel()727728self._cancelled = True729730731def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout:732if delay is None or delay <= 0:733return async_timeout.timeout(None)734735loop = get_running_loop()736now = loop.time()737when = now + delay738if delay > 5:739when = ceil(when)740return async_timeout.timeout_at(when)741742743class HeadersMixin:744745ATTRS = frozenset(["_content_type", "_content_dict", "_stored_content_type"])746747_content_type = None # type: Optional[str]748_content_dict = None # type: Optional[Dict[str, str]]749_stored_content_type = sentinel750751def _parse_content_type(self, raw: str) -> None:752self._stored_content_type = raw753if raw is None:754# default value according to RFC 2616755self._content_type = "application/octet-stream"756self._content_dict = {}757else:758self._content_type, self._content_dict = cgi.parse_header(raw)759760@property761def content_type(self) -> str:762"""The value of content part for Content-Type HTTP header."""763raw = self._headers.get(hdrs.CONTENT_TYPE) # type: ignore[attr-defined]764if self._stored_content_type != raw:765self._parse_content_type(raw)766return self._content_type # type: ignore[return-value]767768@property769def charset(self) -> Optional[str]:770"""The value of charset part for Content-Type HTTP header."""771raw = self._headers.get(hdrs.CONTENT_TYPE) # type: ignore[attr-defined]772if self._stored_content_type != raw:773self._parse_content_type(raw)774return self._content_dict.get("charset") # type: ignore[union-attr]775776@property777def content_length(self) -> Optional[int]:778"""The value of Content-Length HTTP header."""779content_length = self._headers.get( # type: ignore[attr-defined]780hdrs.CONTENT_LENGTH781)782783if content_length is not None:784return int(content_length)785else:786return None787788789def set_result(fut: "asyncio.Future[_T]", result: _T) -> None:790if not fut.done():791fut.set_result(result)792793794def set_exception(fut: "asyncio.Future[_T]", exc: BaseException) -> None:795if not fut.done():796fut.set_exception(exc)797798799class ChainMapProxy(Mapping[str, Any]):800__slots__ = ("_maps",)801802def __init__(self, maps: Iterable[Mapping[str, Any]]) -> None:803self._maps = tuple(maps)804805def __init_subclass__(cls) -> None:806raise TypeError(807"Inheritance class {} from ChainMapProxy "808"is forbidden".format(cls.__name__)809)810811def __getitem__(self, key: str) -> Any:812for mapping in self._maps:813try:814return mapping[key]815except KeyError:816pass817raise KeyError(key)818819def get(self, key: str, default: Any = None) -> Any:820return self[key] if key in self else default821822def __len__(self) -> int:823# reuses stored hash values if possible824return len(set().union(*self._maps)) # type: ignore[arg-type]825826def __iter__(self) -> Iterator[str]:827d = {} # type: Dict[str, Any]828for mapping in reversed(self._maps):829# reuses stored hash values if possible830d.update(mapping)831return iter(d)832833def __contains__(self, key: object) -> bool:834return any(key in m for m in self._maps)835836def __bool__(self) -> bool:837return any(self._maps)838839def __repr__(self) -> str:840content = ", ".join(map(repr, self._maps))841return f"ChainMapProxy({content})"842843844# https://tools.ietf.org/html/rfc7232#section-2.3845_ETAGC = r"[!#-}\x80-\xff]+"846_ETAGC_RE = re.compile(_ETAGC)847_QUOTED_ETAG = fr'(W/)?"({_ETAGC})"'848QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG)849LIST_QUOTED_ETAG_RE = re.compile(fr"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)")850851ETAG_ANY = "*"852853854@attr.s(auto_attribs=True, frozen=True, slots=True)855class ETag:856value: str857is_weak: bool = False858859860def validate_etag_value(value: str) -> None:861if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value):862raise ValueError(863f"Value {value!r} is not a valid etag. Maybe it contains '\"'?"864)865866867def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]:868"""Process a date string, return a datetime object"""869if date_str is not None:870timetuple = parsedate(date_str)871if timetuple is not None:872with suppress(ValueError):873return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc)874return None875876877