Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/http_writer.py
7795 views
1
"""Http related parsers and protocol."""
2
3
import asyncio
4
import zlib
5
from typing import Any, Awaitable, Callable, NamedTuple, Optional, Union # noqa
6
7
from multidict import CIMultiDict
8
9
from .abc import AbstractStreamWriter
10
from .base_protocol import BaseProtocol
11
from .helpers import NO_EXTENSIONS
12
13
__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11")
14
15
16
class HttpVersion(NamedTuple):
17
major: int
18
minor: int
19
20
21
HttpVersion10 = HttpVersion(1, 0)
22
HttpVersion11 = HttpVersion(1, 1)
23
24
25
_T_OnChunkSent = Optional[Callable[[bytes], Awaitable[None]]]
26
_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]]
27
28
29
class StreamWriter(AbstractStreamWriter):
30
def __init__(
31
self,
32
protocol: BaseProtocol,
33
loop: asyncio.AbstractEventLoop,
34
on_chunk_sent: _T_OnChunkSent = None,
35
on_headers_sent: _T_OnHeadersSent = None,
36
) -> None:
37
self._protocol = protocol
38
self._transport = protocol.transport
39
40
self.loop = loop
41
self.length = None
42
self.chunked = False
43
self.buffer_size = 0
44
self.output_size = 0
45
46
self._eof = False
47
self._compress = None # type: Any
48
self._drain_waiter = None
49
50
self._on_chunk_sent = on_chunk_sent # type: _T_OnChunkSent
51
self._on_headers_sent = on_headers_sent # type: _T_OnHeadersSent
52
53
@property
54
def transport(self) -> Optional[asyncio.Transport]:
55
return self._transport
56
57
@property
58
def protocol(self) -> BaseProtocol:
59
return self._protocol
60
61
def enable_chunking(self) -> None:
62
self.chunked = True
63
64
def enable_compression(
65
self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY
66
) -> None:
67
zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else zlib.MAX_WBITS
68
self._compress = zlib.compressobj(wbits=zlib_mode, strategy=strategy)
69
70
def _write(self, chunk: bytes) -> None:
71
size = len(chunk)
72
self.buffer_size += size
73
self.output_size += size
74
75
if self._transport is None or self._transport.is_closing():
76
raise ConnectionResetError("Cannot write to closing transport")
77
self._transport.write(chunk)
78
79
async def write(
80
self, chunk: bytes, *, drain: bool = True, LIMIT: int = 0x10000
81
) -> None:
82
"""Writes chunk of data to a stream.
83
84
write_eof() indicates end of stream.
85
writer can't be used after write_eof() method being called.
86
write() return drain future.
87
"""
88
if self._on_chunk_sent is not None:
89
await self._on_chunk_sent(chunk)
90
91
if isinstance(chunk, memoryview):
92
if chunk.nbytes != len(chunk):
93
# just reshape it
94
chunk = chunk.cast("c")
95
96
if self._compress is not None:
97
chunk = self._compress.compress(chunk)
98
if not chunk:
99
return
100
101
if self.length is not None:
102
chunk_len = len(chunk)
103
if self.length >= chunk_len:
104
self.length = self.length - chunk_len
105
else:
106
chunk = chunk[: self.length]
107
self.length = 0
108
if not chunk:
109
return
110
111
if chunk:
112
if self.chunked:
113
chunk_len_pre = ("%x\r\n" % len(chunk)).encode("ascii")
114
chunk = chunk_len_pre + chunk + b"\r\n"
115
116
self._write(chunk)
117
118
if self.buffer_size > LIMIT and drain:
119
self.buffer_size = 0
120
await self.drain()
121
122
async def write_headers(
123
self, status_line: str, headers: "CIMultiDict[str]"
124
) -> None:
125
"""Write request/response status and headers."""
126
if self._on_headers_sent is not None:
127
await self._on_headers_sent(headers)
128
129
# status + headers
130
buf = _serialize_headers(status_line, headers)
131
self._write(buf)
132
133
async def write_eof(self, chunk: bytes = b"") -> None:
134
if self._eof:
135
return
136
137
if chunk and self._on_chunk_sent is not None:
138
await self._on_chunk_sent(chunk)
139
140
if self._compress:
141
if chunk:
142
chunk = self._compress.compress(chunk)
143
144
chunk = chunk + self._compress.flush()
145
if chunk and self.chunked:
146
chunk_len = ("%x\r\n" % len(chunk)).encode("ascii")
147
chunk = chunk_len + chunk + b"\r\n0\r\n\r\n"
148
else:
149
if self.chunked:
150
if chunk:
151
chunk_len = ("%x\r\n" % len(chunk)).encode("ascii")
152
chunk = chunk_len + chunk + b"\r\n0\r\n\r\n"
153
else:
154
chunk = b"0\r\n\r\n"
155
156
if chunk:
157
self._write(chunk)
158
159
await self.drain()
160
161
self._eof = True
162
self._transport = None
163
164
async def drain(self) -> None:
165
"""Flush the write buffer.
166
167
The intended use is to write
168
169
await w.write(data)
170
await w.drain()
171
"""
172
if self._protocol.transport is not None:
173
await self._protocol._drain_helper()
174
175
176
def _safe_header(string: str) -> str:
177
if "\r" in string or "\n" in string:
178
raise ValueError(
179
"Newline or carriage return detected in headers. "
180
"Potential header injection attack."
181
)
182
return string
183
184
185
def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes:
186
headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items())
187
line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n"
188
return line.encode("utf-8")
189
190
191
_serialize_headers = _py_serialize_headers
192
193
try:
194
import aiohttp._http_writer as _http_writer # type: ignore[import]
195
196
_c_serialize_headers = _http_writer._serialize_headers
197
if not NO_EXTENSIONS:
198
_serialize_headers = _c_serialize_headers
199
except ImportError:
200
pass
201
202