Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client_ws.py
7763 views
"""WebSocket client for asyncio."""12import asyncio3from typing import Any, Optional, cast45import async_timeout67from .client_exceptions import ClientError8from .client_reqrep import ClientResponse9from .helpers import call_later, set_result10from .http import (11WS_CLOSED_MESSAGE,12WS_CLOSING_MESSAGE,13WebSocketError,14WSCloseCode,15WSMessage,16WSMsgType,17)18from .http_websocket import WebSocketWriter # WSMessage19from .streams import EofStream, FlowControlDataQueue20from .typedefs import (21DEFAULT_JSON_DECODER,22DEFAULT_JSON_ENCODER,23JSONDecoder,24JSONEncoder,25)262728class ClientWebSocketResponse:29def __init__(30self,31reader: "FlowControlDataQueue[WSMessage]",32writer: WebSocketWriter,33protocol: Optional[str],34response: ClientResponse,35timeout: float,36autoclose: bool,37autoping: bool,38loop: asyncio.AbstractEventLoop,39*,40receive_timeout: Optional[float] = None,41heartbeat: Optional[float] = None,42compress: int = 0,43client_notakeover: bool = False,44) -> None:45self._response = response46self._conn = response.connection4748self._writer = writer49self._reader = reader50self._protocol = protocol51self._closed = False52self._closing = False53self._close_code = None # type: Optional[int]54self._timeout = timeout55self._receive_timeout = receive_timeout56self._autoclose = autoclose57self._autoping = autoping58self._heartbeat = heartbeat59self._heartbeat_cb: Optional[asyncio.TimerHandle] = None60if heartbeat is not None:61self._pong_heartbeat = heartbeat / 2.062self._pong_response_cb: Optional[asyncio.TimerHandle] = None63self._loop = loop64self._waiting = None # type: Optional[asyncio.Future[bool]]65self._exception = None # type: Optional[BaseException]66self._compress = compress67self._client_notakeover = client_notakeover6869self._reset_heartbeat()7071def _cancel_heartbeat(self) -> None:72if self._pong_response_cb is not None:73self._pong_response_cb.cancel()74self._pong_response_cb = None7576if self._heartbeat_cb is not None:77self._heartbeat_cb.cancel()78self._heartbeat_cb = None7980def _reset_heartbeat(self) -> None:81self._cancel_heartbeat()8283if self._heartbeat is not None:84self._heartbeat_cb = call_later(85self._send_heartbeat, self._heartbeat, self._loop86)8788def _send_heartbeat(self) -> None:89if self._heartbeat is not None and not self._closed:90# fire-and-forget a task is not perfect but maybe ok for91# sending ping. Otherwise we need a long-living heartbeat92# task in the class.93self._loop.create_task(self._writer.ping())9495if self._pong_response_cb is not None:96self._pong_response_cb.cancel()97self._pong_response_cb = call_later(98self._pong_not_received, self._pong_heartbeat, self._loop99)100101def _pong_not_received(self) -> None:102if not self._closed:103self._closed = True104self._close_code = WSCloseCode.ABNORMAL_CLOSURE105self._exception = asyncio.TimeoutError()106self._response.close()107108@property109def closed(self) -> bool:110return self._closed111112@property113def close_code(self) -> Optional[int]:114return self._close_code115116@property117def protocol(self) -> Optional[str]:118return self._protocol119120@property121def compress(self) -> int:122return self._compress123124@property125def client_notakeover(self) -> bool:126return self._client_notakeover127128def get_extra_info(self, name: str, default: Any = None) -> Any:129"""extra info from connection transport"""130conn = self._response.connection131if conn is None:132return default133transport = conn.transport134if transport is None:135return default136return transport.get_extra_info(name, default)137138def exception(self) -> Optional[BaseException]:139return self._exception140141async def ping(self, message: bytes = b"") -> None:142await self._writer.ping(message)143144async def pong(self, message: bytes = b"") -> None:145await self._writer.pong(message)146147async def send_str(self, data: str, compress: Optional[int] = None) -> None:148if not isinstance(data, str):149raise TypeError("data argument must be str (%r)" % type(data))150await self._writer.send(data, binary=False, compress=compress)151152async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:153if not isinstance(data, (bytes, bytearray, memoryview)):154raise TypeError("data argument must be byte-ish (%r)" % type(data))155await self._writer.send(data, binary=True, compress=compress)156157async def send_json(158self,159data: Any,160compress: Optional[int] = None,161*,162dumps: JSONEncoder = DEFAULT_JSON_ENCODER,163) -> None:164await self.send_str(dumps(data), compress=compress)165166async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:167# we need to break `receive()` cycle first,168# `close()` may be called from different task169if self._waiting is not None and not self._closed:170self._reader.feed_data(WS_CLOSING_MESSAGE, 0)171await self._waiting172173if not self._closed:174self._cancel_heartbeat()175self._closed = True176try:177await self._writer.close(code, message)178except asyncio.CancelledError:179self._close_code = WSCloseCode.ABNORMAL_CLOSURE180self._response.close()181raise182except Exception as exc:183self._close_code = WSCloseCode.ABNORMAL_CLOSURE184self._exception = exc185self._response.close()186return True187188if self._closing:189self._response.close()190return True191192while True:193try:194async with async_timeout.timeout(self._timeout):195msg = await self._reader.read()196except asyncio.CancelledError:197self._close_code = WSCloseCode.ABNORMAL_CLOSURE198self._response.close()199raise200except Exception as exc:201self._close_code = WSCloseCode.ABNORMAL_CLOSURE202self._exception = exc203self._response.close()204return True205206if msg.type == WSMsgType.CLOSE:207self._close_code = msg.data208self._response.close()209return True210else:211return False212213async def receive(self, timeout: Optional[float] = None) -> WSMessage:214while True:215if self._waiting is not None:216raise RuntimeError("Concurrent call to receive() is not allowed")217218if self._closed:219return WS_CLOSED_MESSAGE220elif self._closing:221await self.close()222return WS_CLOSED_MESSAGE223224try:225self._waiting = self._loop.create_future()226try:227async with async_timeout.timeout(timeout or self._receive_timeout):228msg = await self._reader.read()229self._reset_heartbeat()230finally:231waiter = self._waiting232self._waiting = None233set_result(waiter, True)234except (asyncio.CancelledError, asyncio.TimeoutError):235self._close_code = WSCloseCode.ABNORMAL_CLOSURE236raise237except EofStream:238self._close_code = WSCloseCode.OK239await self.close()240return WSMessage(WSMsgType.CLOSED, None, None)241except ClientError:242self._closed = True243self._close_code = WSCloseCode.ABNORMAL_CLOSURE244return WS_CLOSED_MESSAGE245except WebSocketError as exc:246self._close_code = exc.code247await self.close(code=exc.code)248return WSMessage(WSMsgType.ERROR, exc, None)249except Exception as exc:250self._exception = exc251self._closing = True252self._close_code = WSCloseCode.ABNORMAL_CLOSURE253await self.close()254return WSMessage(WSMsgType.ERROR, exc, None)255256if msg.type == WSMsgType.CLOSE:257self._closing = True258self._close_code = msg.data259if not self._closed and self._autoclose:260await self.close()261elif msg.type == WSMsgType.CLOSING:262self._closing = True263elif msg.type == WSMsgType.PING and self._autoping:264await self.pong(msg.data)265continue266elif msg.type == WSMsgType.PONG and self._autoping:267continue268269return msg270271async def receive_str(self, *, timeout: Optional[float] = None) -> str:272msg = await self.receive(timeout)273if msg.type != WSMsgType.TEXT:274raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")275return cast(str, msg.data)276277async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:278msg = await self.receive(timeout)279if msg.type != WSMsgType.BINARY:280raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")281return cast(bytes, msg.data)282283async def receive_json(284self,285*,286loads: JSONDecoder = DEFAULT_JSON_DECODER,287timeout: Optional[float] = None,288) -> Any:289data = await self.receive_str(timeout=timeout)290return loads(data)291292def __aiter__(self) -> "ClientWebSocketResponse":293return self294295async def __anext__(self) -> WSMessage:296msg = await self.receive()297if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):298raise StopAsyncIteration299return msg300301302