Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/payload.py
7762 views
import asyncio1import enum2import io3import json4import mimetypes5import os6import warnings7from abc import ABC, abstractmethod8from itertools import chain9from typing import (10IO,11TYPE_CHECKING,12Any,13ByteString,14Dict,15Iterable,16Optional,17TextIO,18Tuple,19Type,20Union,21)2223from multidict import CIMultiDict2425from . import hdrs26from .abc import AbstractStreamWriter27from .helpers import (28PY_36,29content_disposition_header,30guess_filename,31parse_mimetype,32sentinel,33)34from .streams import StreamReader35from .typedefs import Final, JSONEncoder, _CIMultiDict3637__all__ = (38"PAYLOAD_REGISTRY",39"get_payload",40"payload_type",41"Payload",42"BytesPayload",43"StringPayload",44"IOBasePayload",45"BytesIOPayload",46"BufferedReaderPayload",47"TextIOPayload",48"StringIOPayload",49"JsonPayload",50"AsyncIterablePayload",51)5253TOO_LARGE_BYTES_BODY: Final[int] = 2 ** 20 # 1 MB5455if TYPE_CHECKING: # pragma: no cover56from typing import List575859class LookupError(Exception):60pass616263class Order(str, enum.Enum):64normal = "normal"65try_first = "try_first"66try_last = "try_last"676869def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":70return PAYLOAD_REGISTRY.get(data, *args, **kwargs)717273def register_payload(74factory: Type["Payload"], type: Any, *, order: Order = Order.normal75) -> None:76PAYLOAD_REGISTRY.register(factory, type, order=order)777879class payload_type:80def __init__(self, type: Any, *, order: Order = Order.normal) -> None:81self.type = type82self.order = order8384def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:85register_payload(factory, self.type, order=self.order)86return factory878889PayloadType = Type["Payload"]90_PayloadRegistryItem = Tuple[PayloadType, Any]919293class PayloadRegistry:94"""Payload registry.9596note: we need zope.interface for more efficient adapter search97"""9899def __init__(self) -> None:100self._first = [] # type: List[_PayloadRegistryItem]101self._normal = [] # type: List[_PayloadRegistryItem]102self._last = [] # type: List[_PayloadRegistryItem]103104def get(105self,106data: Any,107*args: Any,108_CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,109**kwargs: Any,110) -> "Payload":111if isinstance(data, Payload):112return data113for factory, type in _CHAIN(self._first, self._normal, self._last):114if isinstance(data, type):115return factory(data, *args, **kwargs)116117raise LookupError()118119def register(120self, factory: PayloadType, type: Any, *, order: Order = Order.normal121) -> None:122if order is Order.try_first:123self._first.append((factory, type))124elif order is Order.normal:125self._normal.append((factory, type))126elif order is Order.try_last:127self._last.append((factory, type))128else:129raise ValueError(f"Unsupported order {order!r}")130131132class Payload(ABC):133134_default_content_type = "application/octet-stream" # type: str135_size = None # type: Optional[int]136137def __init__(138self,139value: Any,140headers: Optional[141Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]142] = None,143content_type: Optional[str] = sentinel,144filename: Optional[str] = None,145encoding: Optional[str] = None,146**kwargs: Any,147) -> None:148self._encoding = encoding149self._filename = filename150self._headers = CIMultiDict() # type: _CIMultiDict151self._value = value152if content_type is not sentinel and content_type is not None:153self._headers[hdrs.CONTENT_TYPE] = content_type154elif self._filename is not None:155content_type = mimetypes.guess_type(self._filename)[0]156if content_type is None:157content_type = self._default_content_type158self._headers[hdrs.CONTENT_TYPE] = content_type159else:160self._headers[hdrs.CONTENT_TYPE] = self._default_content_type161self._headers.update(headers or {})162163@property164def size(self) -> Optional[int]:165"""Size of the payload."""166return self._size167168@property169def filename(self) -> Optional[str]:170"""Filename of the payload."""171return self._filename172173@property174def headers(self) -> _CIMultiDict:175"""Custom item headers"""176return self._headers177178@property179def _binary_headers(self) -> bytes:180return (181"".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(182"utf-8"183)184+ b"\r\n"185)186187@property188def encoding(self) -> Optional[str]:189"""Payload encoding"""190return self._encoding191192@property193def content_type(self) -> str:194"""Content type"""195return self._headers[hdrs.CONTENT_TYPE]196197def set_content_disposition(198self,199disptype: str,200quote_fields: bool = True,201_charset: str = "utf-8",202**params: Any,203) -> None:204"""Sets ``Content-Disposition`` header."""205self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(206disptype, quote_fields=quote_fields, _charset=_charset, **params207)208209@abstractmethod210async def write(self, writer: AbstractStreamWriter) -> None:211"""Write payload.212213writer is an AbstractStreamWriter instance:214"""215216217class BytesPayload(Payload):218def __init__(self, value: ByteString, *args: Any, **kwargs: Any) -> None:219if not isinstance(value, (bytes, bytearray, memoryview)):220raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")221222if "content_type" not in kwargs:223kwargs["content_type"] = "application/octet-stream"224225super().__init__(value, *args, **kwargs)226227if isinstance(value, memoryview):228self._size = value.nbytes229else:230self._size = len(value)231232if self._size > TOO_LARGE_BYTES_BODY:233if PY_36:234kwargs = {"source": self}235else:236kwargs = {}237warnings.warn(238"Sending a large body directly with raw bytes might"239" lock the event loop. You should probably pass an "240"io.BytesIO object instead",241ResourceWarning,242**kwargs,243)244245async def write(self, writer: AbstractStreamWriter) -> None:246await writer.write(self._value)247248249class StringPayload(BytesPayload):250def __init__(251self,252value: str,253*args: Any,254encoding: Optional[str] = None,255content_type: Optional[str] = None,256**kwargs: Any,257) -> None:258259if encoding is None:260if content_type is None:261real_encoding = "utf-8"262content_type = "text/plain; charset=utf-8"263else:264mimetype = parse_mimetype(content_type)265real_encoding = mimetype.parameters.get("charset", "utf-8")266else:267if content_type is None:268content_type = "text/plain; charset=%s" % encoding269real_encoding = encoding270271super().__init__(272value.encode(real_encoding),273encoding=real_encoding,274content_type=content_type,275*args,276**kwargs,277)278279280class StringIOPayload(StringPayload):281def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:282super().__init__(value.read(), *args, **kwargs)283284285class IOBasePayload(Payload):286_value: IO[Any]287288def __init__(289self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any290) -> None:291if "filename" not in kwargs:292kwargs["filename"] = guess_filename(value)293294super().__init__(value, *args, **kwargs)295296if self._filename is not None and disposition is not None:297if hdrs.CONTENT_DISPOSITION not in self.headers:298self.set_content_disposition(disposition, filename=self._filename)299300async def write(self, writer: AbstractStreamWriter) -> None:301loop = asyncio.get_event_loop()302try:303chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16)304while chunk:305await writer.write(chunk)306chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16)307finally:308await loop.run_in_executor(None, self._value.close)309310311class TextIOPayload(IOBasePayload):312_value: TextIO313314def __init__(315self,316value: TextIO,317*args: Any,318encoding: Optional[str] = None,319content_type: Optional[str] = None,320**kwargs: Any,321) -> None:322323if encoding is None:324if content_type is None:325encoding = "utf-8"326content_type = "text/plain; charset=utf-8"327else:328mimetype = parse_mimetype(content_type)329encoding = mimetype.parameters.get("charset", "utf-8")330else:331if content_type is None:332content_type = "text/plain; charset=%s" % encoding333334super().__init__(335value,336content_type=content_type,337encoding=encoding,338*args,339**kwargs,340)341342@property343def size(self) -> Optional[int]:344try:345return os.fstat(self._value.fileno()).st_size - self._value.tell()346except OSError:347return None348349async def write(self, writer: AbstractStreamWriter) -> None:350loop = asyncio.get_event_loop()351try:352chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16)353while chunk:354data = (355chunk.encode(encoding=self._encoding)356if self._encoding357else chunk.encode()358)359await writer.write(data)360chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16)361finally:362await loop.run_in_executor(None, self._value.close)363364365class BytesIOPayload(IOBasePayload):366@property367def size(self) -> int:368position = self._value.tell()369end = self._value.seek(0, os.SEEK_END)370self._value.seek(position)371return end - position372373374class BufferedReaderPayload(IOBasePayload):375@property376def size(self) -> Optional[int]:377try:378return os.fstat(self._value.fileno()).st_size - self._value.tell()379except OSError:380# data.fileno() is not supported, e.g.381# io.BufferedReader(io.BytesIO(b'data'))382return None383384385class JsonPayload(BytesPayload):386def __init__(387self,388value: Any,389encoding: str = "utf-8",390content_type: str = "application/json",391dumps: JSONEncoder = json.dumps,392*args: Any,393**kwargs: Any,394) -> None:395396super().__init__(397dumps(value).encode(encoding),398content_type=content_type,399encoding=encoding,400*args,401**kwargs,402)403404405if TYPE_CHECKING: # pragma: no cover406from typing import AsyncIterable, AsyncIterator407408_AsyncIterator = AsyncIterator[bytes]409_AsyncIterable = AsyncIterable[bytes]410else:411from collections.abc import AsyncIterable, AsyncIterator412413_AsyncIterator = AsyncIterator414_AsyncIterable = AsyncIterable415416417class AsyncIterablePayload(Payload):418419_iter = None # type: Optional[_AsyncIterator]420421def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:422if not isinstance(value, AsyncIterable):423raise TypeError(424"value argument must support "425"collections.abc.AsyncIterablebe interface, "426"got {!r}".format(type(value))427)428429if "content_type" not in kwargs:430kwargs["content_type"] = "application/octet-stream"431432super().__init__(value, *args, **kwargs)433434self._iter = value.__aiter__()435436async def write(self, writer: AbstractStreamWriter) -> None:437if self._iter:438try:439# iter is not None check prevents rare cases440# when the case iterable is used twice441while True:442chunk = await self._iter.__anext__()443await writer.write(chunk)444except StopAsyncIteration:445self._iter = None446447448class StreamReaderPayload(AsyncIterablePayload):449def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:450super().__init__(value.iter_any(), *args, **kwargs)451452453PAYLOAD_REGISTRY = PayloadRegistry()454PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))455PAYLOAD_REGISTRY.register(StringPayload, str)456PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)457PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)458PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)459PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))460PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)461PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)462# try_last for giving a chance to more specialized async interables like463# multidict.BodyPartReaderPayload override the default464PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)465466467