Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/formdata.py
7756 views
1
import io
2
from typing import Any, Iterable, List, Optional
3
from urllib.parse import urlencode
4
5
from multidict import MultiDict, MultiDictProxy
6
7
from . import hdrs, multipart, payload
8
from .helpers import guess_filename
9
from .payload import Payload
10
11
__all__ = ("FormData",)
12
13
14
class FormData:
15
"""Helper class for form body generation.
16
17
Supports multipart/form-data and application/x-www-form-urlencoded.
18
"""
19
20
def __init__(
21
self,
22
fields: Iterable[Any] = (),
23
quote_fields: bool = True,
24
charset: Optional[str] = None,
25
) -> None:
26
self._writer = multipart.MultipartWriter("form-data")
27
self._fields = [] # type: List[Any]
28
self._is_multipart = False
29
self._is_processed = False
30
self._quote_fields = quote_fields
31
self._charset = charset
32
33
if isinstance(fields, dict):
34
fields = list(fields.items())
35
elif not isinstance(fields, (list, tuple)):
36
fields = (fields,)
37
self.add_fields(*fields)
38
39
@property
40
def is_multipart(self) -> bool:
41
return self._is_multipart
42
43
def add_field(
44
self,
45
name: str,
46
value: Any,
47
*,
48
content_type: Optional[str] = None,
49
filename: Optional[str] = None,
50
content_transfer_encoding: Optional[str] = None,
51
) -> None:
52
53
if isinstance(value, io.IOBase):
54
self._is_multipart = True
55
elif isinstance(value, (bytes, bytearray, memoryview)):
56
if filename is None and content_transfer_encoding is None:
57
filename = name
58
59
type_options = MultiDict({"name": name}) # type: MultiDict[str]
60
if filename is not None and not isinstance(filename, str):
61
raise TypeError(
62
"filename must be an instance of str. " "Got: %s" % filename
63
)
64
if filename is None and isinstance(value, io.IOBase):
65
filename = guess_filename(value, name)
66
if filename is not None:
67
type_options["filename"] = filename
68
self._is_multipart = True
69
70
headers = {}
71
if content_type is not None:
72
if not isinstance(content_type, str):
73
raise TypeError(
74
"content_type must be an instance of str. " "Got: %s" % content_type
75
)
76
headers[hdrs.CONTENT_TYPE] = content_type
77
self._is_multipart = True
78
if content_transfer_encoding is not None:
79
if not isinstance(content_transfer_encoding, str):
80
raise TypeError(
81
"content_transfer_encoding must be an instance"
82
" of str. Got: %s" % content_transfer_encoding
83
)
84
headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding
85
self._is_multipart = True
86
87
self._fields.append((type_options, headers, value))
88
89
def add_fields(self, *fields: Any) -> None:
90
to_add = list(fields)
91
92
while to_add:
93
rec = to_add.pop(0)
94
95
if isinstance(rec, io.IOBase):
96
k = guess_filename(rec, "unknown")
97
self.add_field(k, rec) # type: ignore[arg-type]
98
99
elif isinstance(rec, (MultiDictProxy, MultiDict)):
100
to_add.extend(rec.items())
101
102
elif isinstance(rec, (list, tuple)) and len(rec) == 2:
103
k, fp = rec
104
self.add_field(k, fp) # type: ignore[arg-type]
105
106
else:
107
raise TypeError(
108
"Only io.IOBase, multidict and (name, file) "
109
"pairs allowed, use .add_field() for passing "
110
"more complex parameters, got {!r}".format(rec)
111
)
112
113
def _gen_form_urlencoded(self) -> payload.BytesPayload:
114
# form data (x-www-form-urlencoded)
115
data = []
116
for type_options, _, value in self._fields:
117
data.append((type_options["name"], value))
118
119
charset = self._charset if self._charset is not None else "utf-8"
120
121
if charset == "utf-8":
122
content_type = "application/x-www-form-urlencoded"
123
else:
124
content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
125
126
return payload.BytesPayload(
127
urlencode(data, doseq=True, encoding=charset).encode(),
128
content_type=content_type,
129
)
130
131
def _gen_form_data(self) -> multipart.MultipartWriter:
132
"""Encode a list of fields using the multipart/form-data MIME format"""
133
if self._is_processed:
134
raise RuntimeError("Form data has been processed already")
135
for dispparams, headers, value in self._fields:
136
try:
137
if hdrs.CONTENT_TYPE in headers:
138
part = payload.get_payload(
139
value,
140
content_type=headers[hdrs.CONTENT_TYPE],
141
headers=headers,
142
encoding=self._charset,
143
)
144
else:
145
part = payload.get_payload(
146
value, headers=headers, encoding=self._charset
147
)
148
except Exception as exc:
149
raise TypeError(
150
"Can not serialize value type: %r\n "
151
"headers: %r\n value: %r" % (type(value), headers, value)
152
) from exc
153
154
if dispparams:
155
part.set_content_disposition(
156
"form-data", quote_fields=self._quote_fields, **dispparams
157
)
158
# FIXME cgi.FieldStorage doesn't likes body parts with
159
# Content-Length which were sent via chunked transfer encoding
160
assert part.headers is not None
161
part.headers.popall(hdrs.CONTENT_LENGTH, None)
162
163
self._writer.append_payload(part)
164
165
self._is_processed = True
166
return self._writer
167
168
def __call__(self) -> Payload:
169
if self._is_multipart:
170
return self._gen_form_data()
171
else:
172
return self._gen_form_urlencoded()
173
174