Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/cookiejar.py
7763 views
import asyncio1import contextlib2import datetime3import os # noqa4import pathlib5import pickle6import re7from collections import defaultdict8from http.cookies import BaseCookie, Morsel, SimpleCookie9from typing import ( # noqa10DefaultDict,11Dict,12Iterable,13Iterator,14List,15Mapping,16Optional,17Set,18Tuple,19Union,20cast,21)2223from yarl import URL2425from .abc import AbstractCookieJar, ClearCookiePredicate26from .helpers import is_ip_address, next_whole_second27from .typedefs import LooseCookies, PathLike, StrOrURL2829__all__ = ("CookieJar", "DummyCookieJar")303132CookieItem = Union[str, "Morsel[str]"]333435class CookieJar(AbstractCookieJar):36"""Implements cookie storage adhering to RFC 6265."""3738DATE_TOKENS_RE = re.compile(39r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"40r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"41)4243DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")4445DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")4647DATE_MONTH_RE = re.compile(48"(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)",49re.I,50)5152DATE_YEAR_RE = re.compile(r"(\d{2,4})")5354MAX_TIME = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)5556MAX_32BIT_TIME = datetime.datetime.utcfromtimestamp(2 ** 31 - 1)5758def __init__(59self,60*,61unsafe: bool = False,62quote_cookie: bool = True,63treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None,64loop: Optional[asyncio.AbstractEventLoop] = None,65) -> None:66super().__init__(loop=loop)67self._cookies = defaultdict(68SimpleCookie69) # type: DefaultDict[str, SimpleCookie[str]]70self._host_only_cookies = set() # type: Set[Tuple[str, str]]71self._unsafe = unsafe72self._quote_cookie = quote_cookie73if treat_as_secure_origin is None:74treat_as_secure_origin = []75elif isinstance(treat_as_secure_origin, URL):76treat_as_secure_origin = [treat_as_secure_origin.origin()]77elif isinstance(treat_as_secure_origin, str):78treat_as_secure_origin = [URL(treat_as_secure_origin).origin()]79else:80treat_as_secure_origin = [81URL(url).origin() if isinstance(url, str) else url.origin()82for url in treat_as_secure_origin83]84self._treat_as_secure_origin = treat_as_secure_origin85self._next_expiration = next_whole_second()86self._expirations = {} # type: Dict[Tuple[str, str], datetime.datetime]87# #4515: datetime.max may not be representable on 32-bit platforms88self._max_time = self.MAX_TIME89try:90self._max_time.timestamp()91except OverflowError:92self._max_time = self.MAX_32BIT_TIME9394def save(self, file_path: PathLike) -> None:95file_path = pathlib.Path(file_path)96with file_path.open(mode="wb") as f:97pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL)9899def load(self, file_path: PathLike) -> None:100file_path = pathlib.Path(file_path)101with file_path.open(mode="rb") as f:102self._cookies = pickle.load(f)103104def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:105if predicate is None:106self._next_expiration = next_whole_second()107self._cookies.clear()108self._host_only_cookies.clear()109self._expirations.clear()110return111112to_del = []113now = datetime.datetime.now(datetime.timezone.utc)114for domain, cookie in self._cookies.items():115for name, morsel in cookie.items():116key = (domain, name)117if (118key in self._expirations and self._expirations[key] <= now119) or predicate(morsel):120to_del.append(key)121122for domain, name in to_del:123key = (domain, name)124self._host_only_cookies.discard(key)125if key in self._expirations:126del self._expirations[(domain, name)]127self._cookies[domain].pop(name, None)128129next_expiration = min(self._expirations.values(), default=self._max_time)130try:131self._next_expiration = next_expiration.replace(132microsecond=0133) + datetime.timedelta(seconds=1)134except OverflowError:135self._next_expiration = self._max_time136137def clear_domain(self, domain: str) -> None:138self.clear(lambda x: self._is_domain_match(domain, x["domain"]))139140def __iter__(self) -> "Iterator[Morsel[str]]":141self._do_expiration()142for val in self._cookies.values():143yield from val.values()144145def __len__(self) -> int:146return sum(1 for i in self)147148def _do_expiration(self) -> None:149self.clear(lambda x: False)150151def _expire_cookie(self, when: datetime.datetime, domain: str, name: str) -> None:152self._next_expiration = min(self._next_expiration, when)153self._expirations[(domain, name)] = when154155def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:156"""Update cookies."""157hostname = response_url.raw_host158159if not self._unsafe and is_ip_address(hostname):160# Don't accept cookies from IPs161return162163if isinstance(cookies, Mapping):164cookies = cookies.items()165166for name, cookie in cookies:167if not isinstance(cookie, Morsel):168tmp = SimpleCookie() # type: SimpleCookie[str]169tmp[name] = cookie # type: ignore[assignment]170cookie = tmp[name]171172domain = cookie["domain"]173174# ignore domains with trailing dots175if domain.endswith("."):176domain = ""177del cookie["domain"]178179if not domain and hostname is not None:180# Set the cookie's domain to the response hostname181# and set its host-only-flag182self._host_only_cookies.add((hostname, name))183domain = cookie["domain"] = hostname184185if domain.startswith("."):186# Remove leading dot187domain = domain[1:]188cookie["domain"] = domain189190if hostname and not self._is_domain_match(domain, hostname):191# Setting cookies for different domains is not allowed192continue193194path = cookie["path"]195if not path or not path.startswith("/"):196# Set the cookie's path to the response path197path = response_url.path198if not path.startswith("/"):199path = "/"200else:201# Cut everything from the last slash to the end202path = "/" + path[1 : path.rfind("/")]203cookie["path"] = path204205max_age = cookie["max-age"]206if max_age:207try:208delta_seconds = int(max_age)209try:210max_age_expiration = datetime.datetime.now(211datetime.timezone.utc212) + datetime.timedelta(seconds=delta_seconds)213except OverflowError:214max_age_expiration = self._max_time215self._expire_cookie(max_age_expiration, domain, name)216except ValueError:217cookie["max-age"] = ""218219else:220expires = cookie["expires"]221if expires:222expire_time = self._parse_date(expires)223if expire_time:224self._expire_cookie(expire_time, domain, name)225else:226cookie["expires"] = ""227228self._cookies[domain][name] = cookie229230self._do_expiration()231232def filter_cookies(233self, request_url: URL = URL()234) -> Union["BaseCookie[str]", "SimpleCookie[str]"]:235"""Returns this jar's cookies filtered by their attributes."""236self._do_expiration()237request_url = URL(request_url)238filtered: Union["SimpleCookie[str]", "BaseCookie[str]"] = (239SimpleCookie() if self._quote_cookie else BaseCookie()240)241hostname = request_url.raw_host or ""242request_origin = URL()243with contextlib.suppress(ValueError):244request_origin = request_url.origin()245246is_not_secure = (247request_url.scheme not in ("https", "wss")248and request_origin not in self._treat_as_secure_origin249)250251for cookie in self:252name = cookie.key253domain = cookie["domain"]254255# Send shared cookies256if not domain:257filtered[name] = cookie.value258continue259260if not self._unsafe and is_ip_address(hostname):261continue262263if (domain, name) in self._host_only_cookies:264if domain != hostname:265continue266elif not self._is_domain_match(domain, hostname):267continue268269if not self._is_path_match(request_url.path, cookie["path"]):270continue271272if is_not_secure and cookie["secure"]:273continue274275# It's critical we use the Morsel so the coded_value276# (based on cookie version) is preserved277mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel()))278mrsl_val.set(cookie.key, cookie.value, cookie.coded_value)279filtered[name] = mrsl_val280281return filtered282283@staticmethod284def _is_domain_match(domain: str, hostname: str) -> bool:285"""Implements domain matching adhering to RFC 6265."""286if hostname == domain:287return True288289if not hostname.endswith(domain):290return False291292non_matching = hostname[: -len(domain)]293294if not non_matching.endswith("."):295return False296297return not is_ip_address(hostname)298299@staticmethod300def _is_path_match(req_path: str, cookie_path: str) -> bool:301"""Implements path matching adhering to RFC 6265."""302if not req_path.startswith("/"):303req_path = "/"304305if req_path == cookie_path:306return True307308if not req_path.startswith(cookie_path):309return False310311if cookie_path.endswith("/"):312return True313314non_matching = req_path[len(cookie_path) :]315316return non_matching.startswith("/")317318@classmethod319def _parse_date(cls, date_str: str) -> Optional[datetime.datetime]:320"""Implements date string parsing adhering to RFC 6265."""321if not date_str:322return None323324found_time = False325found_day = False326found_month = False327found_year = False328329hour = minute = second = 0330day = 0331month = 0332year = 0333334for token_match in cls.DATE_TOKENS_RE.finditer(date_str):335336token = token_match.group("token")337338if not found_time:339time_match = cls.DATE_HMS_TIME_RE.match(token)340if time_match:341found_time = True342hour, minute, second = (int(s) for s in time_match.groups())343continue344345if not found_day:346day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)347if day_match:348found_day = True349day = int(day_match.group())350continue351352if not found_month:353month_match = cls.DATE_MONTH_RE.match(token)354if month_match:355found_month = True356assert month_match.lastindex is not None357month = month_match.lastindex358continue359360if not found_year:361year_match = cls.DATE_YEAR_RE.match(token)362if year_match:363found_year = True364year = int(year_match.group())365366if 70 <= year <= 99:367year += 1900368elif 0 <= year <= 69:369year += 2000370371if False in (found_day, found_month, found_year, found_time):372return None373374if not 1 <= day <= 31:375return None376377if year < 1601 or hour > 23 or minute > 59 or second > 59:378return None379380return datetime.datetime(381year, month, day, hour, minute, second, tzinfo=datetime.timezone.utc382)383384385class DummyCookieJar(AbstractCookieJar):386"""Implements a dummy cookie storage.387388It can be used with the ClientSession when no cookie processing is needed.389390"""391392def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:393super().__init__(loop=loop)394395def __iter__(self) -> "Iterator[Morsel[str]]":396while False:397yield None398399def __len__(self) -> int:400return 0401402def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:403pass404405def clear_domain(self, domain: str) -> None:406pass407408def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:409pass410411def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":412return SimpleCookie()413414415