Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/http_writer.py
7795 views
"""Http related parsers and protocol."""12import asyncio3import zlib4from typing import Any, Awaitable, Callable, NamedTuple, Optional, Union # noqa56from multidict import CIMultiDict78from .abc import AbstractStreamWriter9from .base_protocol import BaseProtocol10from .helpers import NO_EXTENSIONS1112__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11")131415class HttpVersion(NamedTuple):16major: int17minor: int181920HttpVersion10 = HttpVersion(1, 0)21HttpVersion11 = HttpVersion(1, 1)222324_T_OnChunkSent = Optional[Callable[[bytes], Awaitable[None]]]25_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]]262728class StreamWriter(AbstractStreamWriter):29def __init__(30self,31protocol: BaseProtocol,32loop: asyncio.AbstractEventLoop,33on_chunk_sent: _T_OnChunkSent = None,34on_headers_sent: _T_OnHeadersSent = None,35) -> None:36self._protocol = protocol37self._transport = protocol.transport3839self.loop = loop40self.length = None41self.chunked = False42self.buffer_size = 043self.output_size = 04445self._eof = False46self._compress = None # type: Any47self._drain_waiter = None4849self._on_chunk_sent = on_chunk_sent # type: _T_OnChunkSent50self._on_headers_sent = on_headers_sent # type: _T_OnHeadersSent5152@property53def transport(self) -> Optional[asyncio.Transport]:54return self._transport5556@property57def protocol(self) -> BaseProtocol:58return self._protocol5960def enable_chunking(self) -> None:61self.chunked = True6263def enable_compression(64self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY65) -> None:66zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else zlib.MAX_WBITS67self._compress = zlib.compressobj(wbits=zlib_mode, strategy=strategy)6869def _write(self, chunk: bytes) -> None:70size = len(chunk)71self.buffer_size += size72self.output_size += size7374if self._transport is None or self._transport.is_closing():75raise ConnectionResetError("Cannot write to closing transport")76self._transport.write(chunk)7778async def write(79self, chunk: bytes, *, drain: bool = True, LIMIT: int = 0x1000080) -> None:81"""Writes chunk of data to a stream.8283write_eof() indicates end of stream.84writer can't be used after write_eof() method being called.85write() return drain future.86"""87if self._on_chunk_sent is not None:88await self._on_chunk_sent(chunk)8990if isinstance(chunk, memoryview):91if chunk.nbytes != len(chunk):92# just reshape it93chunk = chunk.cast("c")9495if self._compress is not None:96chunk = self._compress.compress(chunk)97if not chunk:98return99100if self.length is not None:101chunk_len = len(chunk)102if self.length >= chunk_len:103self.length = self.length - chunk_len104else:105chunk = chunk[: self.length]106self.length = 0107if not chunk:108return109110if chunk:111if self.chunked:112chunk_len_pre = ("%x\r\n" % len(chunk)).encode("ascii")113chunk = chunk_len_pre + chunk + b"\r\n"114115self._write(chunk)116117if self.buffer_size > LIMIT and drain:118self.buffer_size = 0119await self.drain()120121async def write_headers(122self, status_line: str, headers: "CIMultiDict[str]"123) -> None:124"""Write request/response status and headers."""125if self._on_headers_sent is not None:126await self._on_headers_sent(headers)127128# status + headers129buf = _serialize_headers(status_line, headers)130self._write(buf)131132async def write_eof(self, chunk: bytes = b"") -> None:133if self._eof:134return135136if chunk and self._on_chunk_sent is not None:137await self._on_chunk_sent(chunk)138139if self._compress:140if chunk:141chunk = self._compress.compress(chunk)142143chunk = chunk + self._compress.flush()144if chunk and self.chunked:145chunk_len = ("%x\r\n" % len(chunk)).encode("ascii")146chunk = chunk_len + chunk + b"\r\n0\r\n\r\n"147else:148if self.chunked:149if chunk:150chunk_len = ("%x\r\n" % len(chunk)).encode("ascii")151chunk = chunk_len + chunk + b"\r\n0\r\n\r\n"152else:153chunk = b"0\r\n\r\n"154155if chunk:156self._write(chunk)157158await self.drain()159160self._eof = True161self._transport = None162163async def drain(self) -> None:164"""Flush the write buffer.165166The intended use is to write167168await w.write(data)169await w.drain()170"""171if self._protocol.transport is not None:172await self._protocol._drain_helper()173174175def _safe_header(string: str) -> str:176if "\r" in string or "\n" in string:177raise ValueError(178"Newline or carriage return detected in headers. "179"Potential header injection attack."180)181return string182183184def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes:185headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items())186line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n"187return line.encode("utf-8")188189190_serialize_headers = _py_serialize_headers191192try:193import aiohttp._http_writer as _http_writer # type: ignore[import]194195_c_serialize_headers = _http_writer._serialize_headers196if not NO_EXTENSIONS:197_serialize_headers = _c_serialize_headers198except ImportError:199pass200201202