Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client_proto.py
7763 views
import asyncio1from contextlib import suppress2from typing import Any, Optional, Tuple34from .base_protocol import BaseProtocol5from .client_exceptions import (6ClientOSError,7ClientPayloadError,8ServerDisconnectedError,9ServerTimeoutError,10)11from .helpers import BaseTimerContext12from .http import HttpResponseParser, RawResponseMessage13from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader141516class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):17"""Helper class to adapt between Protocol and StreamReader."""1819def __init__(self, loop: asyncio.AbstractEventLoop) -> None:20BaseProtocol.__init__(self, loop=loop)21DataQueue.__init__(self, loop)2223self._should_close = False2425self._payload: Optional[StreamReader] = None26self._skip_payload = False27self._payload_parser = None2829self._timer = None3031self._tail = b""32self._upgraded = False33self._parser = None # type: Optional[HttpResponseParser]3435self._read_timeout = None # type: Optional[float]36self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]3738@property39def upgraded(self) -> bool:40return self._upgraded4142@property43def should_close(self) -> bool:44if self._payload is not None and not self._payload.is_eof() or self._upgraded:45return True4647return (48self._should_close49or self._upgraded50or self.exception() is not None51or self._payload_parser is not None52or len(self) > 053or bool(self._tail)54)5556def force_close(self) -> None:57self._should_close = True5859def close(self) -> None:60transport = self.transport61if transport is not None:62transport.close()63self.transport = None64self._payload = None65self._drop_timeout()6667def is_connected(self) -> bool:68return self.transport is not None and not self.transport.is_closing()6970def connection_lost(self, exc: Optional[BaseException]) -> None:71self._drop_timeout()7273if self._payload_parser is not None:74with suppress(Exception):75self._payload_parser.feed_eof()7677uncompleted = None78if self._parser is not None:79try:80uncompleted = self._parser.feed_eof()81except Exception:82if self._payload is not None:83self._payload.set_exception(84ClientPayloadError("Response payload is not completed")85)8687if not self.is_eof():88if isinstance(exc, OSError):89exc = ClientOSError(*exc.args)90if exc is None:91exc = ServerDisconnectedError(uncompleted)92# assigns self._should_close to True as side effect,93# we do it anyway below94self.set_exception(exc)9596self._should_close = True97self._parser = None98self._payload = None99self._payload_parser = None100self._reading_paused = False101102super().connection_lost(exc)103104def eof_received(self) -> None:105# should call parser.feed_eof() most likely106self._drop_timeout()107108def pause_reading(self) -> None:109super().pause_reading()110self._drop_timeout()111112def resume_reading(self) -> None:113super().resume_reading()114self._reschedule_timeout()115116def set_exception(self, exc: BaseException) -> None:117self._should_close = True118self._drop_timeout()119super().set_exception(exc)120121def set_parser(self, parser: Any, payload: Any) -> None:122# TODO: actual types are:123# parser: WebSocketReader124# payload: FlowControlDataQueue125# but they are not generi enough126# Need an ABC for both types127self._payload = payload128self._payload_parser = parser129130self._drop_timeout()131132if self._tail:133data, self._tail = self._tail, b""134self.data_received(data)135136def set_response_params(137self,138*,139timer: Optional[BaseTimerContext] = None,140skip_payload: bool = False,141read_until_eof: bool = False,142auto_decompress: bool = True,143read_timeout: Optional[float] = None,144read_bufsize: int = 2 ** 16,145) -> None:146self._skip_payload = skip_payload147148self._read_timeout = read_timeout149self._reschedule_timeout()150151self._parser = HttpResponseParser(152self,153self._loop,154read_bufsize,155timer=timer,156payload_exception=ClientPayloadError,157response_with_body=not skip_payload,158read_until_eof=read_until_eof,159auto_decompress=auto_decompress,160)161162if self._tail:163data, self._tail = self._tail, b""164self.data_received(data)165166def _drop_timeout(self) -> None:167if self._read_timeout_handle is not None:168self._read_timeout_handle.cancel()169self._read_timeout_handle = None170171def _reschedule_timeout(self) -> None:172timeout = self._read_timeout173if self._read_timeout_handle is not None:174self._read_timeout_handle.cancel()175176if timeout:177self._read_timeout_handle = self._loop.call_later(178timeout, self._on_read_timeout179)180else:181self._read_timeout_handle = None182183def _on_read_timeout(self) -> None:184exc = ServerTimeoutError("Timeout on reading data from socket")185self.set_exception(exc)186if self._payload is not None:187self._payload.set_exception(exc)188189def data_received(self, data: bytes) -> None:190self._reschedule_timeout()191192if not data:193return194195# custom payload parser196if self._payload_parser is not None:197eof, tail = self._payload_parser.feed_data(data)198if eof:199self._payload = None200self._payload_parser = None201202if tail:203self.data_received(tail)204return205else:206if self._upgraded or self._parser is None:207# i.e. websocket connection, websocket parser is not set yet208self._tail += data209else:210# parse http messages211try:212messages, upgraded, tail = self._parser.feed_data(data)213except BaseException as exc:214if self.transport is not None:215# connection.release() could be called BEFORE216# data_received(), the transport is already217# closed in this case218self.transport.close()219# should_close is True after the call220self.set_exception(exc)221return222223self._upgraded = upgraded224225payload: Optional[StreamReader] = None226for message, payload in messages:227if message.should_close:228self._should_close = True229230self._payload = payload231232if self._skip_payload or message.code in (204, 304):233self.feed_data((message, EMPTY_PAYLOAD), 0)234else:235self.feed_data((message, payload), 0)236if payload is not None:237# new message(s) was processed238# register timeout handler unsubscribing239# either on end-of-stream or immediately for240# EMPTY_PAYLOAD241if payload is not EMPTY_PAYLOAD:242payload.on_eof(self._drop_timeout)243else:244self._drop_timeout()245246if tail:247if upgraded:248self.data_received(tail)249else:250self._tail = tail251252253