Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/cookiejar.py
7763 views
1
import asyncio
2
import contextlib
3
import datetime
4
import os # noqa
5
import pathlib
6
import pickle
7
import re
8
from collections import defaultdict
9
from http.cookies import BaseCookie, Morsel, SimpleCookie
10
from typing import ( # noqa
11
DefaultDict,
12
Dict,
13
Iterable,
14
Iterator,
15
List,
16
Mapping,
17
Optional,
18
Set,
19
Tuple,
20
Union,
21
cast,
22
)
23
24
from yarl import URL
25
26
from .abc import AbstractCookieJar, ClearCookiePredicate
27
from .helpers import is_ip_address, next_whole_second
28
from .typedefs import LooseCookies, PathLike, StrOrURL
29
30
__all__ = ("CookieJar", "DummyCookieJar")
31
32
33
CookieItem = Union[str, "Morsel[str]"]
34
35
36
class CookieJar(AbstractCookieJar):
37
"""Implements cookie storage adhering to RFC 6265."""
38
39
DATE_TOKENS_RE = re.compile(
40
r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
41
r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
42
)
43
44
DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
45
46
DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
47
48
DATE_MONTH_RE = re.compile(
49
"(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)",
50
re.I,
51
)
52
53
DATE_YEAR_RE = re.compile(r"(\d{2,4})")
54
55
MAX_TIME = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)
56
57
MAX_32BIT_TIME = datetime.datetime.utcfromtimestamp(2 ** 31 - 1)
58
59
def __init__(
60
self,
61
*,
62
unsafe: bool = False,
63
quote_cookie: bool = True,
64
treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None,
65
loop: Optional[asyncio.AbstractEventLoop] = None,
66
) -> None:
67
super().__init__(loop=loop)
68
self._cookies = defaultdict(
69
SimpleCookie
70
) # type: DefaultDict[str, SimpleCookie[str]]
71
self._host_only_cookies = set() # type: Set[Tuple[str, str]]
72
self._unsafe = unsafe
73
self._quote_cookie = quote_cookie
74
if treat_as_secure_origin is None:
75
treat_as_secure_origin = []
76
elif isinstance(treat_as_secure_origin, URL):
77
treat_as_secure_origin = [treat_as_secure_origin.origin()]
78
elif isinstance(treat_as_secure_origin, str):
79
treat_as_secure_origin = [URL(treat_as_secure_origin).origin()]
80
else:
81
treat_as_secure_origin = [
82
URL(url).origin() if isinstance(url, str) else url.origin()
83
for url in treat_as_secure_origin
84
]
85
self._treat_as_secure_origin = treat_as_secure_origin
86
self._next_expiration = next_whole_second()
87
self._expirations = {} # type: Dict[Tuple[str, str], datetime.datetime]
88
# #4515: datetime.max may not be representable on 32-bit platforms
89
self._max_time = self.MAX_TIME
90
try:
91
self._max_time.timestamp()
92
except OverflowError:
93
self._max_time = self.MAX_32BIT_TIME
94
95
def save(self, file_path: PathLike) -> None:
96
file_path = pathlib.Path(file_path)
97
with file_path.open(mode="wb") as f:
98
pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL)
99
100
def load(self, file_path: PathLike) -> None:
101
file_path = pathlib.Path(file_path)
102
with file_path.open(mode="rb") as f:
103
self._cookies = pickle.load(f)
104
105
def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
106
if predicate is None:
107
self._next_expiration = next_whole_second()
108
self._cookies.clear()
109
self._host_only_cookies.clear()
110
self._expirations.clear()
111
return
112
113
to_del = []
114
now = datetime.datetime.now(datetime.timezone.utc)
115
for domain, cookie in self._cookies.items():
116
for name, morsel in cookie.items():
117
key = (domain, name)
118
if (
119
key in self._expirations and self._expirations[key] <= now
120
) or predicate(morsel):
121
to_del.append(key)
122
123
for domain, name in to_del:
124
key = (domain, name)
125
self._host_only_cookies.discard(key)
126
if key in self._expirations:
127
del self._expirations[(domain, name)]
128
self._cookies[domain].pop(name, None)
129
130
next_expiration = min(self._expirations.values(), default=self._max_time)
131
try:
132
self._next_expiration = next_expiration.replace(
133
microsecond=0
134
) + datetime.timedelta(seconds=1)
135
except OverflowError:
136
self._next_expiration = self._max_time
137
138
def clear_domain(self, domain: str) -> None:
139
self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
140
141
def __iter__(self) -> "Iterator[Morsel[str]]":
142
self._do_expiration()
143
for val in self._cookies.values():
144
yield from val.values()
145
146
def __len__(self) -> int:
147
return sum(1 for i in self)
148
149
def _do_expiration(self) -> None:
150
self.clear(lambda x: False)
151
152
def _expire_cookie(self, when: datetime.datetime, domain: str, name: str) -> None:
153
self._next_expiration = min(self._next_expiration, when)
154
self._expirations[(domain, name)] = when
155
156
def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
157
"""Update cookies."""
158
hostname = response_url.raw_host
159
160
if not self._unsafe and is_ip_address(hostname):
161
# Don't accept cookies from IPs
162
return
163
164
if isinstance(cookies, Mapping):
165
cookies = cookies.items()
166
167
for name, cookie in cookies:
168
if not isinstance(cookie, Morsel):
169
tmp = SimpleCookie() # type: SimpleCookie[str]
170
tmp[name] = cookie # type: ignore[assignment]
171
cookie = tmp[name]
172
173
domain = cookie["domain"]
174
175
# ignore domains with trailing dots
176
if domain.endswith("."):
177
domain = ""
178
del cookie["domain"]
179
180
if not domain and hostname is not None:
181
# Set the cookie's domain to the response hostname
182
# and set its host-only-flag
183
self._host_only_cookies.add((hostname, name))
184
domain = cookie["domain"] = hostname
185
186
if domain.startswith("."):
187
# Remove leading dot
188
domain = domain[1:]
189
cookie["domain"] = domain
190
191
if hostname and not self._is_domain_match(domain, hostname):
192
# Setting cookies for different domains is not allowed
193
continue
194
195
path = cookie["path"]
196
if not path or not path.startswith("/"):
197
# Set the cookie's path to the response path
198
path = response_url.path
199
if not path.startswith("/"):
200
path = "/"
201
else:
202
# Cut everything from the last slash to the end
203
path = "/" + path[1 : path.rfind("/")]
204
cookie["path"] = path
205
206
max_age = cookie["max-age"]
207
if max_age:
208
try:
209
delta_seconds = int(max_age)
210
try:
211
max_age_expiration = datetime.datetime.now(
212
datetime.timezone.utc
213
) + datetime.timedelta(seconds=delta_seconds)
214
except OverflowError:
215
max_age_expiration = self._max_time
216
self._expire_cookie(max_age_expiration, domain, name)
217
except ValueError:
218
cookie["max-age"] = ""
219
220
else:
221
expires = cookie["expires"]
222
if expires:
223
expire_time = self._parse_date(expires)
224
if expire_time:
225
self._expire_cookie(expire_time, domain, name)
226
else:
227
cookie["expires"] = ""
228
229
self._cookies[domain][name] = cookie
230
231
self._do_expiration()
232
233
def filter_cookies(
234
self, request_url: URL = URL()
235
) -> Union["BaseCookie[str]", "SimpleCookie[str]"]:
236
"""Returns this jar's cookies filtered by their attributes."""
237
self._do_expiration()
238
request_url = URL(request_url)
239
filtered: Union["SimpleCookie[str]", "BaseCookie[str]"] = (
240
SimpleCookie() if self._quote_cookie else BaseCookie()
241
)
242
hostname = request_url.raw_host or ""
243
request_origin = URL()
244
with contextlib.suppress(ValueError):
245
request_origin = request_url.origin()
246
247
is_not_secure = (
248
request_url.scheme not in ("https", "wss")
249
and request_origin not in self._treat_as_secure_origin
250
)
251
252
for cookie in self:
253
name = cookie.key
254
domain = cookie["domain"]
255
256
# Send shared cookies
257
if not domain:
258
filtered[name] = cookie.value
259
continue
260
261
if not self._unsafe and is_ip_address(hostname):
262
continue
263
264
if (domain, name) in self._host_only_cookies:
265
if domain != hostname:
266
continue
267
elif not self._is_domain_match(domain, hostname):
268
continue
269
270
if not self._is_path_match(request_url.path, cookie["path"]):
271
continue
272
273
if is_not_secure and cookie["secure"]:
274
continue
275
276
# It's critical we use the Morsel so the coded_value
277
# (based on cookie version) is preserved
278
mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel()))
279
mrsl_val.set(cookie.key, cookie.value, cookie.coded_value)
280
filtered[name] = mrsl_val
281
282
return filtered
283
284
@staticmethod
285
def _is_domain_match(domain: str, hostname: str) -> bool:
286
"""Implements domain matching adhering to RFC 6265."""
287
if hostname == domain:
288
return True
289
290
if not hostname.endswith(domain):
291
return False
292
293
non_matching = hostname[: -len(domain)]
294
295
if not non_matching.endswith("."):
296
return False
297
298
return not is_ip_address(hostname)
299
300
@staticmethod
301
def _is_path_match(req_path: str, cookie_path: str) -> bool:
302
"""Implements path matching adhering to RFC 6265."""
303
if not req_path.startswith("/"):
304
req_path = "/"
305
306
if req_path == cookie_path:
307
return True
308
309
if not req_path.startswith(cookie_path):
310
return False
311
312
if cookie_path.endswith("/"):
313
return True
314
315
non_matching = req_path[len(cookie_path) :]
316
317
return non_matching.startswith("/")
318
319
@classmethod
320
def _parse_date(cls, date_str: str) -> Optional[datetime.datetime]:
321
"""Implements date string parsing adhering to RFC 6265."""
322
if not date_str:
323
return None
324
325
found_time = False
326
found_day = False
327
found_month = False
328
found_year = False
329
330
hour = minute = second = 0
331
day = 0
332
month = 0
333
year = 0
334
335
for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
336
337
token = token_match.group("token")
338
339
if not found_time:
340
time_match = cls.DATE_HMS_TIME_RE.match(token)
341
if time_match:
342
found_time = True
343
hour, minute, second = (int(s) for s in time_match.groups())
344
continue
345
346
if not found_day:
347
day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
348
if day_match:
349
found_day = True
350
day = int(day_match.group())
351
continue
352
353
if not found_month:
354
month_match = cls.DATE_MONTH_RE.match(token)
355
if month_match:
356
found_month = True
357
assert month_match.lastindex is not None
358
month = month_match.lastindex
359
continue
360
361
if not found_year:
362
year_match = cls.DATE_YEAR_RE.match(token)
363
if year_match:
364
found_year = True
365
year = int(year_match.group())
366
367
if 70 <= year <= 99:
368
year += 1900
369
elif 0 <= year <= 69:
370
year += 2000
371
372
if False in (found_day, found_month, found_year, found_time):
373
return None
374
375
if not 1 <= day <= 31:
376
return None
377
378
if year < 1601 or hour > 23 or minute > 59 or second > 59:
379
return None
380
381
return datetime.datetime(
382
year, month, day, hour, minute, second, tzinfo=datetime.timezone.utc
383
)
384
385
386
class DummyCookieJar(AbstractCookieJar):
387
"""Implements a dummy cookie storage.
388
389
It can be used with the ClientSession when no cookie processing is needed.
390
391
"""
392
393
def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
394
super().__init__(loop=loop)
395
396
def __iter__(self) -> "Iterator[Morsel[str]]":
397
while False:
398
yield None
399
400
def __len__(self) -> int:
401
return 0
402
403
def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
404
pass
405
406
def clear_domain(self, domain: str) -> None:
407
pass
408
409
def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
410
pass
411
412
def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
413
return SimpleCookie()
414
415