Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/helpers.py
7763 views
1
"""Various helper functions"""
2
3
import asyncio
4
import base64
5
import binascii
6
import cgi
7
import datetime
8
import functools
9
import inspect
10
import netrc
11
import os
12
import platform
13
import re
14
import sys
15
import time
16
import warnings
17
import weakref
18
from collections import namedtuple
19
from contextlib import suppress
20
from email.utils import parsedate
21
from math import ceil
22
from pathlib import Path
23
from types import TracebackType
24
from typing import (
25
Any,
26
Callable,
27
ContextManager,
28
Dict,
29
Generator,
30
Generic,
31
Iterable,
32
Iterator,
33
List,
34
Mapping,
35
Optional,
36
Pattern,
37
Set,
38
Tuple,
39
Type,
40
TypeVar,
41
Union,
42
cast,
43
)
44
from urllib.parse import quote
45
from urllib.request import getproxies, proxy_bypass
46
47
import async_timeout
48
import attr
49
from multidict import MultiDict, MultiDictProxy
50
from yarl import URL
51
52
from . import hdrs
53
from .log import client_logger, internal_logger
54
from .typedefs import PathLike, Protocol # noqa
55
56
__all__ = ("BasicAuth", "ChainMapProxy", "ETag")
57
58
IS_MACOS = platform.system() == "Darwin"
59
IS_WINDOWS = platform.system() == "Windows"
60
61
PY_36 = sys.version_info >= (3, 6)
62
PY_37 = sys.version_info >= (3, 7)
63
PY_38 = sys.version_info >= (3, 8)
64
PY_310 = sys.version_info >= (3, 10)
65
66
if sys.version_info < (3, 7):
67
import idna_ssl
68
69
idna_ssl.patch_match_hostname()
70
71
def all_tasks(
72
loop: Optional[asyncio.AbstractEventLoop] = None,
73
) -> Set["asyncio.Task[Any]"]:
74
tasks = list(asyncio.Task.all_tasks(loop))
75
return {t for t in tasks if not t.done()}
76
77
78
else:
79
all_tasks = asyncio.all_tasks
80
81
82
_T = TypeVar("_T")
83
_S = TypeVar("_S")
84
85
86
sentinel = object() # type: Any
87
NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS")) # type: bool
88
89
# N.B. sys.flags.dev_mode is available on Python 3.7+, use getattr
90
# for compatibility with older versions
91
DEBUG = getattr(sys.flags, "dev_mode", False) or (
92
not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG"))
93
) # type: bool
94
95
96
CHAR = {chr(i) for i in range(0, 128)}
97
CTL = {chr(i) for i in range(0, 32)} | {
98
chr(127),
99
}
100
SEPARATORS = {
101
"(",
102
")",
103
"<",
104
">",
105
"@",
106
",",
107
";",
108
":",
109
"\\",
110
'"',
111
"/",
112
"[",
113
"]",
114
"?",
115
"=",
116
"{",
117
"}",
118
" ",
119
chr(9),
120
}
121
TOKEN = CHAR ^ CTL ^ SEPARATORS
122
123
124
class noop:
125
def __await__(self) -> Generator[None, None, None]:
126
yield
127
128
129
class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])):
130
"""Http basic authentication helper."""
131
132
def __new__(
133
cls, login: str, password: str = "", encoding: str = "latin1"
134
) -> "BasicAuth":
135
if login is None:
136
raise ValueError("None is not allowed as login value")
137
138
if password is None:
139
raise ValueError("None is not allowed as password value")
140
141
if ":" in login:
142
raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)')
143
144
return super().__new__(cls, login, password, encoding)
145
146
@classmethod
147
def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth":
148
"""Create a BasicAuth object from an Authorization HTTP header."""
149
try:
150
auth_type, encoded_credentials = auth_header.split(" ", 1)
151
except ValueError:
152
raise ValueError("Could not parse authorization header.")
153
154
if auth_type.lower() != "basic":
155
raise ValueError("Unknown authorization method %s" % auth_type)
156
157
try:
158
decoded = base64.b64decode(
159
encoded_credentials.encode("ascii"), validate=True
160
).decode(encoding)
161
except binascii.Error:
162
raise ValueError("Invalid base64 encoding.")
163
164
try:
165
# RFC 2617 HTTP Authentication
166
# https://www.ietf.org/rfc/rfc2617.txt
167
# the colon must be present, but the username and password may be
168
# otherwise blank.
169
username, password = decoded.split(":", 1)
170
except ValueError:
171
raise ValueError("Invalid credentials.")
172
173
return cls(username, password, encoding=encoding)
174
175
@classmethod
176
def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]:
177
"""Create BasicAuth from url."""
178
if not isinstance(url, URL):
179
raise TypeError("url should be yarl.URL instance")
180
if url.user is None:
181
return None
182
return cls(url.user, url.password or "", encoding=encoding)
183
184
def encode(self) -> str:
185
"""Encode credentials."""
186
creds = (f"{self.login}:{self.password}").encode(self.encoding)
187
return "Basic %s" % base64.b64encode(creds).decode(self.encoding)
188
189
190
def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
191
auth = BasicAuth.from_url(url)
192
if auth is None:
193
return url, None
194
else:
195
return url.with_user(None), auth
196
197
198
def netrc_from_env() -> Optional[netrc.netrc]:
199
"""Load netrc from file.
200
201
Attempt to load it from the path specified by the env-var
202
NETRC or in the default location in the user's home directory.
203
204
Returns None if it couldn't be found or fails to parse.
205
"""
206
netrc_env = os.environ.get("NETRC")
207
208
if netrc_env is not None:
209
netrc_path = Path(netrc_env)
210
else:
211
try:
212
home_dir = Path.home()
213
except RuntimeError as e: # pragma: no cover
214
# if pathlib can't resolve home, it may raise a RuntimeError
215
client_logger.debug(
216
"Could not resolve home directory when "
217
"trying to look for .netrc file: %s",
218
e,
219
)
220
return None
221
222
netrc_path = home_dir / ("_netrc" if IS_WINDOWS else ".netrc")
223
224
try:
225
return netrc.netrc(str(netrc_path))
226
except netrc.NetrcParseError as e:
227
client_logger.warning("Could not parse .netrc file: %s", e)
228
except OSError as e:
229
# we couldn't read the file (doesn't exist, permissions, etc.)
230
if netrc_env or netrc_path.is_file():
231
# only warn if the environment wanted us to load it,
232
# or it appears like the default file does actually exist
233
client_logger.warning("Could not read .netrc file: %s", e)
234
235
return None
236
237
238
@attr.s(auto_attribs=True, frozen=True, slots=True)
239
class ProxyInfo:
240
proxy: URL
241
proxy_auth: Optional[BasicAuth]
242
243
244
def proxies_from_env() -> Dict[str, ProxyInfo]:
245
proxy_urls = {
246
k: URL(v)
247
for k, v in getproxies().items()
248
if k in ("http", "https", "ws", "wss")
249
}
250
netrc_obj = netrc_from_env()
251
stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()}
252
ret = {}
253
for proto, val in stripped.items():
254
proxy, auth = val
255
if proxy.scheme in ("https", "wss"):
256
client_logger.warning(
257
"%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy
258
)
259
continue
260
if netrc_obj and auth is None:
261
auth_from_netrc = None
262
if proxy.host is not None:
263
auth_from_netrc = netrc_obj.authenticators(proxy.host)
264
if auth_from_netrc is not None:
265
# auth_from_netrc is a (`user`, `account`, `password`) tuple,
266
# `user` and `account` both can be username,
267
# if `user` is None, use `account`
268
*logins, password = auth_from_netrc
269
login = logins[0] if logins[0] else logins[-1]
270
auth = BasicAuth(cast(str, login), cast(str, password))
271
ret[proto] = ProxyInfo(proxy, auth)
272
return ret
273
274
275
def current_task(
276
loop: Optional[asyncio.AbstractEventLoop] = None,
277
) -> "Optional[asyncio.Task[Any]]":
278
if sys.version_info >= (3, 7):
279
return asyncio.current_task(loop=loop)
280
else:
281
return asyncio.Task.current_task(loop=loop)
282
283
284
def get_running_loop(
285
loop: Optional[asyncio.AbstractEventLoop] = None,
286
) -> asyncio.AbstractEventLoop:
287
if loop is None:
288
loop = asyncio.get_event_loop()
289
if not loop.is_running():
290
warnings.warn(
291
"The object should be created within an async function",
292
DeprecationWarning,
293
stacklevel=3,
294
)
295
if loop.get_debug():
296
internal_logger.warning(
297
"The object should be created within an async function", stack_info=True
298
)
299
return loop
300
301
302
def isasyncgenfunction(obj: Any) -> bool:
303
func = getattr(inspect, "isasyncgenfunction", None)
304
if func is not None:
305
return func(obj) # type: ignore[no-any-return]
306
else:
307
return False
308
309
310
def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
311
"""Get a permitted proxy for the given URL from the env."""
312
if url.host is not None and proxy_bypass(url.host):
313
raise LookupError(f"Proxying is disallowed for `{url.host!r}`")
314
315
proxies_in_env = proxies_from_env()
316
try:
317
proxy_info = proxies_in_env[url.scheme]
318
except KeyError:
319
raise LookupError(f"No proxies found for `{url!s}` in the env")
320
else:
321
return proxy_info.proxy, proxy_info.proxy_auth
322
323
324
@attr.s(auto_attribs=True, frozen=True, slots=True)
325
class MimeType:
326
type: str
327
subtype: str
328
suffix: str
329
parameters: "MultiDictProxy[str]"
330
331
332
@functools.lru_cache(maxsize=56)
333
def parse_mimetype(mimetype: str) -> MimeType:
334
"""Parses a MIME type into its components.
335
336
mimetype is a MIME type string.
337
338
Returns a MimeType object.
339
340
Example:
341
342
>>> parse_mimetype('text/html; charset=utf-8')
343
MimeType(type='text', subtype='html', suffix='',
344
parameters={'charset': 'utf-8'})
345
346
"""
347
if not mimetype:
348
return MimeType(
349
type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict())
350
)
351
352
parts = mimetype.split(";")
353
params = MultiDict() # type: MultiDict[str]
354
for item in parts[1:]:
355
if not item:
356
continue
357
key, value = cast(
358
Tuple[str, str], item.split("=", 1) if "=" in item else (item, "")
359
)
360
params.add(key.lower().strip(), value.strip(' "'))
361
362
fulltype = parts[0].strip().lower()
363
if fulltype == "*":
364
fulltype = "*/*"
365
366
mtype, stype = (
367
cast(Tuple[str, str], fulltype.split("/", 1))
368
if "/" in fulltype
369
else (fulltype, "")
370
)
371
stype, suffix = (
372
cast(Tuple[str, str], stype.split("+", 1)) if "+" in stype else (stype, "")
373
)
374
375
return MimeType(
376
type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params)
377
)
378
379
380
def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]:
381
name = getattr(obj, "name", None)
382
if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">":
383
return Path(name).name
384
return default
385
386
387
not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]")
388
QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"}
389
390
391
def quoted_string(content: str) -> str:
392
"""Return 7-bit content as quoted-string.
393
394
Format content into a quoted-string as defined in RFC5322 for
395
Internet Message Format. Notice that this is not the 8-bit HTTP
396
format, but the 7-bit email format. Content must be in usascii or
397
a ValueError is raised.
398
"""
399
if not (QCONTENT > set(content)):
400
raise ValueError(f"bad content for quoted-string {content!r}")
401
return not_qtext_re.sub(lambda x: "\\" + x.group(0), content)
402
403
404
def content_disposition_header(
405
disptype: str, quote_fields: bool = True, _charset: str = "utf-8", **params: str
406
) -> str:
407
"""Sets ``Content-Disposition`` header for MIME.
408
409
This is the MIME payload Content-Disposition header from RFC 2183
410
and RFC 7579 section 4.2, not the HTTP Content-Disposition from
411
RFC 6266.
412
413
disptype is a disposition type: inline, attachment, form-data.
414
Should be valid extension token (see RFC 2183)
415
416
quote_fields performs value quoting to 7-bit MIME headers
417
according to RFC 7578. Set to quote_fields to False if recipient
418
can take 8-bit file names and field values.
419
420
_charset specifies the charset to use when quote_fields is True.
421
422
params is a dict with disposition params.
423
"""
424
if not disptype or not (TOKEN > set(disptype)):
425
raise ValueError("bad content disposition type {!r}" "".format(disptype))
426
427
value = disptype
428
if params:
429
lparams = []
430
for key, val in params.items():
431
if not key or not (TOKEN > set(key)):
432
raise ValueError(
433
"bad content disposition parameter" " {!r}={!r}".format(key, val)
434
)
435
if quote_fields:
436
if key.lower() == "filename":
437
qval = quote(val, "", encoding=_charset)
438
lparams.append((key, '"%s"' % qval))
439
else:
440
try:
441
qval = quoted_string(val)
442
except ValueError:
443
qval = "".join(
444
(_charset, "''", quote(val, "", encoding=_charset))
445
)
446
lparams.append((key + "*", qval))
447
else:
448
lparams.append((key, '"%s"' % qval))
449
else:
450
qval = val.replace("\\", "\\\\").replace('"', '\\"')
451
lparams.append((key, '"%s"' % qval))
452
sparams = "; ".join("=".join(pair) for pair in lparams)
453
value = "; ".join((value, sparams))
454
return value
455
456
457
class _TSelf(Protocol, Generic[_T]):
458
_cache: Dict[str, _T]
459
460
461
class reify(Generic[_T]):
462
"""Use as a class method decorator.
463
464
It operates almost exactly like
465
the Python `@property` decorator, but it puts the result of the
466
method it decorates into the instance dict after the first call,
467
effectively replacing the function it decorates with an instance
468
variable. It is, in Python parlance, a data descriptor.
469
"""
470
471
def __init__(self, wrapped: Callable[..., _T]) -> None:
472
self.wrapped = wrapped
473
self.__doc__ = wrapped.__doc__
474
self.name = wrapped.__name__
475
476
def __get__(self, inst: _TSelf[_T], owner: Optional[Type[Any]] = None) -> _T:
477
try:
478
try:
479
return inst._cache[self.name]
480
except KeyError:
481
val = self.wrapped(inst)
482
inst._cache[self.name] = val
483
return val
484
except AttributeError:
485
if inst is None:
486
return self
487
raise
488
489
def __set__(self, inst: _TSelf[_T], value: _T) -> None:
490
raise AttributeError("reified property is read-only")
491
492
493
reify_py = reify
494
495
try:
496
from ._helpers import reify as reify_c
497
498
if not NO_EXTENSIONS:
499
reify = reify_c # type: ignore[misc,assignment]
500
except ImportError:
501
pass
502
503
_ipv4_pattern = (
504
r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"
505
r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
506
)
507
_ipv6_pattern = (
508
r"^(?:(?:(?:[A-F0-9]{1,4}:){6}|(?=(?:[A-F0-9]{0,4}:){0,6}"
509
r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}$)(([0-9A-F]{1,4}:){0,5}|:)"
510
r"((:[0-9A-F]{1,4}){1,5}:|:)|::(?:[A-F0-9]{1,4}:){5})"
511
r"(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}"
512
r"(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|(?:[A-F0-9]{1,4}:){7}"
513
r"[A-F0-9]{1,4}|(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}$)"
514
r"(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)|(?:[A-F0-9]{1,4}:){7}"
515
r":|:(:[A-F0-9]{1,4}){7})$"
516
)
517
_ipv4_regex = re.compile(_ipv4_pattern)
518
_ipv6_regex = re.compile(_ipv6_pattern, flags=re.IGNORECASE)
519
_ipv4_regexb = re.compile(_ipv4_pattern.encode("ascii"))
520
_ipv6_regexb = re.compile(_ipv6_pattern.encode("ascii"), flags=re.IGNORECASE)
521
522
523
def _is_ip_address(
524
regex: Pattern[str], regexb: Pattern[bytes], host: Optional[Union[str, bytes]]
525
) -> bool:
526
if host is None:
527
return False
528
if isinstance(host, str):
529
return bool(regex.match(host))
530
elif isinstance(host, (bytes, bytearray, memoryview)):
531
return bool(regexb.match(host))
532
else:
533
raise TypeError(f"{host} [{type(host)}] is not a str or bytes")
534
535
536
is_ipv4_address = functools.partial(_is_ip_address, _ipv4_regex, _ipv4_regexb)
537
is_ipv6_address = functools.partial(_is_ip_address, _ipv6_regex, _ipv6_regexb)
538
539
540
def is_ip_address(host: Optional[Union[str, bytes, bytearray, memoryview]]) -> bool:
541
return is_ipv4_address(host) or is_ipv6_address(host)
542
543
544
def next_whole_second() -> datetime.datetime:
545
"""Return current time rounded up to the next whole second."""
546
return datetime.datetime.now(datetime.timezone.utc).replace(
547
microsecond=0
548
) + datetime.timedelta(seconds=0)
549
550
551
_cached_current_datetime = None # type: Optional[int]
552
_cached_formatted_datetime = ""
553
554
555
def rfc822_formatted_time() -> str:
556
global _cached_current_datetime
557
global _cached_formatted_datetime
558
559
now = int(time.time())
560
if now != _cached_current_datetime:
561
# Weekday and month names for HTTP date/time formatting;
562
# always English!
563
# Tuples are constants stored in codeobject!
564
_weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
565
_monthname = (
566
"", # Dummy so we can use 1-based month numbers
567
"Jan",
568
"Feb",
569
"Mar",
570
"Apr",
571
"May",
572
"Jun",
573
"Jul",
574
"Aug",
575
"Sep",
576
"Oct",
577
"Nov",
578
"Dec",
579
)
580
581
year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now)
582
_cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
583
_weekdayname[wd],
584
day,
585
_monthname[month],
586
year,
587
hh,
588
mm,
589
ss,
590
)
591
_cached_current_datetime = now
592
return _cached_formatted_datetime
593
594
595
def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:
596
ref, name = info
597
ob = ref()
598
if ob is not None:
599
with suppress(Exception):
600
getattr(ob, name)()
601
602
603
def weakref_handle(
604
ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop
605
) -> Optional[asyncio.TimerHandle]:
606
if timeout is not None and timeout > 0:
607
when = loop.time() + timeout
608
if timeout >= 5:
609
when = ceil(when)
610
611
return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
612
return None
613
614
615
def call_later(
616
cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop
617
) -> Optional[asyncio.TimerHandle]:
618
if timeout is not None and timeout > 0:
619
when = loop.time() + timeout
620
if timeout > 5:
621
when = ceil(when)
622
return loop.call_at(when, cb)
623
return None
624
625
626
class TimeoutHandle:
627
"""Timeout handle"""
628
629
def __init__(
630
self, loop: asyncio.AbstractEventLoop, timeout: Optional[float]
631
) -> None:
632
self._timeout = timeout
633
self._loop = loop
634
self._callbacks = (
635
[]
636
) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]]
637
638
def register(
639
self, callback: Callable[..., None], *args: Any, **kwargs: Any
640
) -> None:
641
self._callbacks.append((callback, args, kwargs))
642
643
def close(self) -> None:
644
self._callbacks.clear()
645
646
def start(self) -> Optional[asyncio.Handle]:
647
timeout = self._timeout
648
if timeout is not None and timeout > 0:
649
when = self._loop.time() + timeout
650
if timeout >= 5:
651
when = ceil(when)
652
return self._loop.call_at(when, self.__call__)
653
else:
654
return None
655
656
def timer(self) -> "BaseTimerContext":
657
if self._timeout is not None and self._timeout > 0:
658
timer = TimerContext(self._loop)
659
self.register(timer.timeout)
660
return timer
661
else:
662
return TimerNoop()
663
664
def __call__(self) -> None:
665
for cb, args, kwargs in self._callbacks:
666
with suppress(Exception):
667
cb(*args, **kwargs)
668
669
self._callbacks.clear()
670
671
672
class BaseTimerContext(ContextManager["BaseTimerContext"]):
673
pass
674
675
676
class TimerNoop(BaseTimerContext):
677
def __enter__(self) -> BaseTimerContext:
678
return self
679
680
def __exit__(
681
self,
682
exc_type: Optional[Type[BaseException]],
683
exc_val: Optional[BaseException],
684
exc_tb: Optional[TracebackType],
685
) -> None:
686
return
687
688
689
class TimerContext(BaseTimerContext):
690
"""Low resolution timeout context manager"""
691
692
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
693
self._loop = loop
694
self._tasks = [] # type: List[asyncio.Task[Any]]
695
self._cancelled = False
696
697
def __enter__(self) -> BaseTimerContext:
698
task = current_task(loop=self._loop)
699
700
if task is None:
701
raise RuntimeError(
702
"Timeout context manager should be used " "inside a task"
703
)
704
705
if self._cancelled:
706
raise asyncio.TimeoutError from None
707
708
self._tasks.append(task)
709
return self
710
711
def __exit__(
712
self,
713
exc_type: Optional[Type[BaseException]],
714
exc_val: Optional[BaseException],
715
exc_tb: Optional[TracebackType],
716
) -> Optional[bool]:
717
if self._tasks:
718
self._tasks.pop()
719
720
if exc_type is asyncio.CancelledError and self._cancelled:
721
raise asyncio.TimeoutError from None
722
return None
723
724
def timeout(self) -> None:
725
if not self._cancelled:
726
for task in set(self._tasks):
727
task.cancel()
728
729
self._cancelled = True
730
731
732
def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout:
733
if delay is None or delay <= 0:
734
return async_timeout.timeout(None)
735
736
loop = get_running_loop()
737
now = loop.time()
738
when = now + delay
739
if delay > 5:
740
when = ceil(when)
741
return async_timeout.timeout_at(when)
742
743
744
class HeadersMixin:
745
746
ATTRS = frozenset(["_content_type", "_content_dict", "_stored_content_type"])
747
748
_content_type = None # type: Optional[str]
749
_content_dict = None # type: Optional[Dict[str, str]]
750
_stored_content_type = sentinel
751
752
def _parse_content_type(self, raw: str) -> None:
753
self._stored_content_type = raw
754
if raw is None:
755
# default value according to RFC 2616
756
self._content_type = "application/octet-stream"
757
self._content_dict = {}
758
else:
759
self._content_type, self._content_dict = cgi.parse_header(raw)
760
761
@property
762
def content_type(self) -> str:
763
"""The value of content part for Content-Type HTTP header."""
764
raw = self._headers.get(hdrs.CONTENT_TYPE) # type: ignore[attr-defined]
765
if self._stored_content_type != raw:
766
self._parse_content_type(raw)
767
return self._content_type # type: ignore[return-value]
768
769
@property
770
def charset(self) -> Optional[str]:
771
"""The value of charset part for Content-Type HTTP header."""
772
raw = self._headers.get(hdrs.CONTENT_TYPE) # type: ignore[attr-defined]
773
if self._stored_content_type != raw:
774
self._parse_content_type(raw)
775
return self._content_dict.get("charset") # type: ignore[union-attr]
776
777
@property
778
def content_length(self) -> Optional[int]:
779
"""The value of Content-Length HTTP header."""
780
content_length = self._headers.get( # type: ignore[attr-defined]
781
hdrs.CONTENT_LENGTH
782
)
783
784
if content_length is not None:
785
return int(content_length)
786
else:
787
return None
788
789
790
def set_result(fut: "asyncio.Future[_T]", result: _T) -> None:
791
if not fut.done():
792
fut.set_result(result)
793
794
795
def set_exception(fut: "asyncio.Future[_T]", exc: BaseException) -> None:
796
if not fut.done():
797
fut.set_exception(exc)
798
799
800
class ChainMapProxy(Mapping[str, Any]):
801
__slots__ = ("_maps",)
802
803
def __init__(self, maps: Iterable[Mapping[str, Any]]) -> None:
804
self._maps = tuple(maps)
805
806
def __init_subclass__(cls) -> None:
807
raise TypeError(
808
"Inheritance class {} from ChainMapProxy "
809
"is forbidden".format(cls.__name__)
810
)
811
812
def __getitem__(self, key: str) -> Any:
813
for mapping in self._maps:
814
try:
815
return mapping[key]
816
except KeyError:
817
pass
818
raise KeyError(key)
819
820
def get(self, key: str, default: Any = None) -> Any:
821
return self[key] if key in self else default
822
823
def __len__(self) -> int:
824
# reuses stored hash values if possible
825
return len(set().union(*self._maps)) # type: ignore[arg-type]
826
827
def __iter__(self) -> Iterator[str]:
828
d = {} # type: Dict[str, Any]
829
for mapping in reversed(self._maps):
830
# reuses stored hash values if possible
831
d.update(mapping)
832
return iter(d)
833
834
def __contains__(self, key: object) -> bool:
835
return any(key in m for m in self._maps)
836
837
def __bool__(self) -> bool:
838
return any(self._maps)
839
840
def __repr__(self) -> str:
841
content = ", ".join(map(repr, self._maps))
842
return f"ChainMapProxy({content})"
843
844
845
# https://tools.ietf.org/html/rfc7232#section-2.3
846
_ETAGC = r"[!#-}\x80-\xff]+"
847
_ETAGC_RE = re.compile(_ETAGC)
848
_QUOTED_ETAG = fr'(W/)?"({_ETAGC})"'
849
QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG)
850
LIST_QUOTED_ETAG_RE = re.compile(fr"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)")
851
852
ETAG_ANY = "*"
853
854
855
@attr.s(auto_attribs=True, frozen=True, slots=True)
856
class ETag:
857
value: str
858
is_weak: bool = False
859
860
861
def validate_etag_value(value: str) -> None:
862
if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value):
863
raise ValueError(
864
f"Value {value!r} is not a valid etag. Maybe it contains '\"'?"
865
)
866
867
868
def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]:
869
"""Process a date string, return a datetime object"""
870
if date_str is not None:
871
timetuple = parsedate(date_str)
872
if timetuple is not None:
873
with suppress(ValueError):
874
return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc)
875
return None
876
877