Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/formdata.py
7756 views
import io1from typing import Any, Iterable, List, Optional2from urllib.parse import urlencode34from multidict import MultiDict, MultiDictProxy56from . import hdrs, multipart, payload7from .helpers import guess_filename8from .payload import Payload910__all__ = ("FormData",)111213class FormData:14"""Helper class for form body generation.1516Supports multipart/form-data and application/x-www-form-urlencoded.17"""1819def __init__(20self,21fields: Iterable[Any] = (),22quote_fields: bool = True,23charset: Optional[str] = None,24) -> None:25self._writer = multipart.MultipartWriter("form-data")26self._fields = [] # type: List[Any]27self._is_multipart = False28self._is_processed = False29self._quote_fields = quote_fields30self._charset = charset3132if isinstance(fields, dict):33fields = list(fields.items())34elif not isinstance(fields, (list, tuple)):35fields = (fields,)36self.add_fields(*fields)3738@property39def is_multipart(self) -> bool:40return self._is_multipart4142def add_field(43self,44name: str,45value: Any,46*,47content_type: Optional[str] = None,48filename: Optional[str] = None,49content_transfer_encoding: Optional[str] = None,50) -> None:5152if isinstance(value, io.IOBase):53self._is_multipart = True54elif isinstance(value, (bytes, bytearray, memoryview)):55if filename is None and content_transfer_encoding is None:56filename = name5758type_options = MultiDict({"name": name}) # type: MultiDict[str]59if filename is not None and not isinstance(filename, str):60raise TypeError(61"filename must be an instance of str. " "Got: %s" % filename62)63if filename is None and isinstance(value, io.IOBase):64filename = guess_filename(value, name)65if filename is not None:66type_options["filename"] = filename67self._is_multipart = True6869headers = {}70if content_type is not None:71if not isinstance(content_type, str):72raise TypeError(73"content_type must be an instance of str. " "Got: %s" % content_type74)75headers[hdrs.CONTENT_TYPE] = content_type76self._is_multipart = True77if content_transfer_encoding is not None:78if not isinstance(content_transfer_encoding, str):79raise TypeError(80"content_transfer_encoding must be an instance"81" of str. Got: %s" % content_transfer_encoding82)83headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding84self._is_multipart = True8586self._fields.append((type_options, headers, value))8788def add_fields(self, *fields: Any) -> None:89to_add = list(fields)9091while to_add:92rec = to_add.pop(0)9394if isinstance(rec, io.IOBase):95k = guess_filename(rec, "unknown")96self.add_field(k, rec) # type: ignore[arg-type]9798elif isinstance(rec, (MultiDictProxy, MultiDict)):99to_add.extend(rec.items())100101elif isinstance(rec, (list, tuple)) and len(rec) == 2:102k, fp = rec103self.add_field(k, fp) # type: ignore[arg-type]104105else:106raise TypeError(107"Only io.IOBase, multidict and (name, file) "108"pairs allowed, use .add_field() for passing "109"more complex parameters, got {!r}".format(rec)110)111112def _gen_form_urlencoded(self) -> payload.BytesPayload:113# form data (x-www-form-urlencoded)114data = []115for type_options, _, value in self._fields:116data.append((type_options["name"], value))117118charset = self._charset if self._charset is not None else "utf-8"119120if charset == "utf-8":121content_type = "application/x-www-form-urlencoded"122else:123content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset124125return payload.BytesPayload(126urlencode(data, doseq=True, encoding=charset).encode(),127content_type=content_type,128)129130def _gen_form_data(self) -> multipart.MultipartWriter:131"""Encode a list of fields using the multipart/form-data MIME format"""132if self._is_processed:133raise RuntimeError("Form data has been processed already")134for dispparams, headers, value in self._fields:135try:136if hdrs.CONTENT_TYPE in headers:137part = payload.get_payload(138value,139content_type=headers[hdrs.CONTENT_TYPE],140headers=headers,141encoding=self._charset,142)143else:144part = payload.get_payload(145value, headers=headers, encoding=self._charset146)147except Exception as exc:148raise TypeError(149"Can not serialize value type: %r\n "150"headers: %r\n value: %r" % (type(value), headers, value)151) from exc152153if dispparams:154part.set_content_disposition(155"form-data", quote_fields=self._quote_fields, **dispparams156)157# FIXME cgi.FieldStorage doesn't likes body parts with158# Content-Length which were sent via chunked transfer encoding159assert part.headers is not None160part.headers.popall(hdrs.CONTENT_LENGTH, None)161162self._writer.append_payload(part)163164self._is_processed = True165return self._writer166167def __call__(self) -> Payload:168if self._is_multipart:169return self._gen_form_data()170else:171return self._gen_form_urlencoded()172173174