Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/http_websocket.py
7767 views
"""WebSocket protocol versions 13 and 8."""12import asyncio3import collections4import json5import random6import re7import sys8import zlib9from enum import IntEnum10from struct import Struct11from typing import Any, Callable, List, Optional, Pattern, Set, Tuple, Union, cast1213from .base_protocol import BaseProtocol14from .helpers import NO_EXTENSIONS15from .streams import DataQueue16from .typedefs import Final1718__all__ = (19"WS_CLOSED_MESSAGE",20"WS_CLOSING_MESSAGE",21"WS_KEY",22"WebSocketReader",23"WebSocketWriter",24"WSMessage",25"WebSocketError",26"WSMsgType",27"WSCloseCode",28)293031class WSCloseCode(IntEnum):32OK = 100033GOING_AWAY = 100134PROTOCOL_ERROR = 100235UNSUPPORTED_DATA = 100336ABNORMAL_CLOSURE = 100637INVALID_TEXT = 100738POLICY_VIOLATION = 100839MESSAGE_TOO_BIG = 100940MANDATORY_EXTENSION = 101041INTERNAL_ERROR = 101142SERVICE_RESTART = 101243TRY_AGAIN_LATER = 101344BAD_GATEWAY = 1014454647ALLOWED_CLOSE_CODES: Final[Set[int]] = {int(i) for i in WSCloseCode}484950class WSMsgType(IntEnum):51# websocket spec types52CONTINUATION = 0x053TEXT = 0x154BINARY = 0x255PING = 0x956PONG = 0xA57CLOSE = 0x85859# aiohttp specific types60CLOSING = 0x10061CLOSED = 0x10162ERROR = 0x1026364text = TEXT65binary = BINARY66ping = PING67pong = PONG68close = CLOSE69closing = CLOSING70closed = CLOSED71error = ERROR727374WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"757677UNPACK_LEN2 = Struct("!H").unpack_from78UNPACK_LEN3 = Struct("!Q").unpack_from79UNPACK_CLOSE_CODE = Struct("!H").unpack80PACK_LEN1 = Struct("!BB").pack81PACK_LEN2 = Struct("!BBH").pack82PACK_LEN3 = Struct("!BBQ").pack83PACK_CLOSE_CODE = Struct("!H").pack84MSG_SIZE: Final[int] = 2 ** 1485DEFAULT_LIMIT: Final[int] = 2 ** 16868788_WSMessageBase = collections.namedtuple("_WSMessageBase", ["type", "data", "extra"])899091class WSMessage(_WSMessageBase):92def json(self, *, loads: Callable[[Any], Any] = json.loads) -> Any:93"""Return parsed JSON data.9495.. versionadded:: 0.2296"""97return loads(self.data)9899100WS_CLOSED_MESSAGE = WSMessage(WSMsgType.CLOSED, None, None)101WS_CLOSING_MESSAGE = WSMessage(WSMsgType.CLOSING, None, None)102103104class WebSocketError(Exception):105"""WebSocket protocol parser error."""106107def __init__(self, code: int, message: str) -> None:108self.code = code109super().__init__(code, message)110111def __str__(self) -> str:112return cast(str, self.args[1])113114115class WSHandshakeError(Exception):116"""WebSocket protocol handshake error."""117118119native_byteorder: Final[str] = sys.byteorder120121122# Used by _websocket_mask_python123_XOR_TABLE: Final[List[bytes]] = [bytes(a ^ b for a in range(256)) for b in range(256)]124125126def _websocket_mask_python(mask: bytes, data: bytearray) -> None:127"""Websocket masking function.128129`mask` is a `bytes` object of length 4; `data` is a `bytearray`130object of any length. The contents of `data` are masked with `mask`,131as specified in section 5.3 of RFC 6455.132133Note that this function mutates the `data` argument.134135This pure-python implementation may be replaced by an optimized136version when available.137138"""139assert isinstance(data, bytearray), data140assert len(mask) == 4, mask141142if data:143a, b, c, d = (_XOR_TABLE[n] for n in mask)144data[::4] = data[::4].translate(a)145data[1::4] = data[1::4].translate(b)146data[2::4] = data[2::4].translate(c)147data[3::4] = data[3::4].translate(d)148149150if NO_EXTENSIONS: # pragma: no cover151_websocket_mask = _websocket_mask_python152else:153try:154from ._websocket import _websocket_mask_cython # type: ignore[import]155156_websocket_mask = _websocket_mask_cython157except ImportError: # pragma: no cover158_websocket_mask = _websocket_mask_python159160_WS_DEFLATE_TRAILING: Final[bytes] = bytes([0x00, 0x00, 0xFF, 0xFF])161162163_WS_EXT_RE: Final[Pattern[str]] = re.compile(164r"^(?:;\s*(?:"165r"(server_no_context_takeover)|"166r"(client_no_context_takeover)|"167r"(server_max_window_bits(?:=(\d+))?)|"168r"(client_max_window_bits(?:=(\d+))?)))*$"169)170171_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?")172173174def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]:175if not extstr:176return 0, False177178compress = 0179notakeover = False180for ext in _WS_EXT_RE_SPLIT.finditer(extstr):181defext = ext.group(1)182# Return compress = 15 when get `permessage-deflate`183if not defext:184compress = 15185break186match = _WS_EXT_RE.match(defext)187if match:188compress = 15189if isserver:190# Server never fail to detect compress handshake.191# Server does not need to send max wbit to client192if match.group(4):193compress = int(match.group(4))194# Group3 must match if group4 matches195# Compress wbit 8 does not support in zlib196# If compress level not support,197# CONTINUE to next extension198if compress > 15 or compress < 9:199compress = 0200continue201if match.group(1):202notakeover = True203# Ignore regex group 5 & 6 for client_max_window_bits204break205else:206if match.group(6):207compress = int(match.group(6))208# Group5 must match if group6 matches209# Compress wbit 8 does not support in zlib210# If compress level not support,211# FAIL the parse progress212if compress > 15 or compress < 9:213raise WSHandshakeError("Invalid window size")214if match.group(2):215notakeover = True216# Ignore regex group 5 & 6 for client_max_window_bits217break218# Return Fail if client side and not match219elif not isserver:220raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))221222return compress, notakeover223224225def ws_ext_gen(226compress: int = 15, isserver: bool = False, server_notakeover: bool = False227) -> str:228# client_notakeover=False not used for server229# compress wbit 8 does not support in zlib230if compress < 9 or compress > 15:231raise ValueError(232"Compress wbits must between 9 and 15, " "zlib does not support wbits=8"233)234enabledext = ["permessage-deflate"]235if not isserver:236enabledext.append("client_max_window_bits")237238if compress < 15:239enabledext.append("server_max_window_bits=" + str(compress))240if server_notakeover:241enabledext.append("server_no_context_takeover")242# if client_notakeover:243# enabledext.append('client_no_context_takeover')244return "; ".join(enabledext)245246247class WSParserState(IntEnum):248READ_HEADER = 1249READ_PAYLOAD_LENGTH = 2250READ_PAYLOAD_MASK = 3251READ_PAYLOAD = 4252253254class WebSocketReader:255def __init__(256self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True257) -> None:258self.queue = queue259self._max_msg_size = max_msg_size260261self._exc = None # type: Optional[BaseException]262self._partial = bytearray()263self._state = WSParserState.READ_HEADER264265self._opcode = None # type: Optional[int]266self._frame_fin = False267self._frame_opcode = None # type: Optional[int]268self._frame_payload = bytearray()269270self._tail = b""271self._has_mask = False272self._frame_mask = None # type: Optional[bytes]273self._payload_length = 0274self._payload_length_flag = 0275self._compressed = None # type: Optional[bool]276self._decompressobj = None # type: Any # zlib.decompressobj actually277self._compress = compress278279def feed_eof(self) -> None:280self.queue.feed_eof()281282def feed_data(self, data: bytes) -> Tuple[bool, bytes]:283if self._exc:284return True, data285286try:287return self._feed_data(data)288except Exception as exc:289self._exc = exc290self.queue.set_exception(exc)291return True, b""292293def _feed_data(self, data: bytes) -> Tuple[bool, bytes]:294for fin, opcode, payload, compressed in self.parse_frame(data):295if compressed and not self._decompressobj:296self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS)297if opcode == WSMsgType.CLOSE:298if len(payload) >= 2:299close_code = UNPACK_CLOSE_CODE(payload[:2])[0]300if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:301raise WebSocketError(302WSCloseCode.PROTOCOL_ERROR,303f"Invalid close code: {close_code}",304)305try:306close_message = payload[2:].decode("utf-8")307except UnicodeDecodeError as exc:308raise WebSocketError(309WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"310) from exc311msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)312elif payload:313raise WebSocketError(314WSCloseCode.PROTOCOL_ERROR,315f"Invalid close frame: {fin} {opcode} {payload!r}",316)317else:318msg = WSMessage(WSMsgType.CLOSE, 0, "")319320self.queue.feed_data(msg, 0)321322elif opcode == WSMsgType.PING:323self.queue.feed_data(324WSMessage(WSMsgType.PING, payload, ""), len(payload)325)326327elif opcode == WSMsgType.PONG:328self.queue.feed_data(329WSMessage(WSMsgType.PONG, payload, ""), len(payload)330)331332elif (333opcode not in (WSMsgType.TEXT, WSMsgType.BINARY)334and self._opcode is None335):336raise WebSocketError(337WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"338)339else:340# load text/binary341if not fin:342# got partial frame payload343if opcode != WSMsgType.CONTINUATION:344self._opcode = opcode345self._partial.extend(payload)346if self._max_msg_size and len(self._partial) >= self._max_msg_size:347raise WebSocketError(348WSCloseCode.MESSAGE_TOO_BIG,349"Message size {} exceeds limit {}".format(350len(self._partial), self._max_msg_size351),352)353else:354# previous frame was non finished355# we should get continuation opcode356if self._partial:357if opcode != WSMsgType.CONTINUATION:358raise WebSocketError(359WSCloseCode.PROTOCOL_ERROR,360"The opcode in non-fin frame is expected "361"to be zero, got {!r}".format(opcode),362)363364if opcode == WSMsgType.CONTINUATION:365assert self._opcode is not None366opcode = self._opcode367self._opcode = None368369self._partial.extend(payload)370if self._max_msg_size and len(self._partial) >= self._max_msg_size:371raise WebSocketError(372WSCloseCode.MESSAGE_TOO_BIG,373"Message size {} exceeds limit {}".format(374len(self._partial), self._max_msg_size375),376)377378# Decompress process must to be done after all packets379# received.380if compressed:381self._partial.extend(_WS_DEFLATE_TRAILING)382payload_merged = self._decompressobj.decompress(383self._partial, self._max_msg_size384)385if self._decompressobj.unconsumed_tail:386left = len(self._decompressobj.unconsumed_tail)387raise WebSocketError(388WSCloseCode.MESSAGE_TOO_BIG,389"Decompressed message size {} exceeds limit {}".format(390self._max_msg_size + left, self._max_msg_size391),392)393else:394payload_merged = bytes(self._partial)395396self._partial.clear()397398if opcode == WSMsgType.TEXT:399try:400text = payload_merged.decode("utf-8")401self.queue.feed_data(402WSMessage(WSMsgType.TEXT, text, ""), len(text)403)404except UnicodeDecodeError as exc:405raise WebSocketError(406WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"407) from exc408else:409self.queue.feed_data(410WSMessage(WSMsgType.BINARY, payload_merged, ""),411len(payload_merged),412)413414return False, b""415416def parse_frame(417self, buf: bytes418) -> List[Tuple[bool, Optional[int], bytearray, Optional[bool]]]:419"""Return the next frame from the socket."""420frames = []421if self._tail:422buf, self._tail = self._tail + buf, b""423424start_pos = 0425buf_length = len(buf)426427while True:428# read header429if self._state == WSParserState.READ_HEADER:430if buf_length - start_pos >= 2:431data = buf[start_pos : start_pos + 2]432start_pos += 2433first_byte, second_byte = data434435fin = (first_byte >> 7) & 1436rsv1 = (first_byte >> 6) & 1437rsv2 = (first_byte >> 5) & 1438rsv3 = (first_byte >> 4) & 1439opcode = first_byte & 0xF440441# frame-fin = %x0 ; more frames of this message follow442# / %x1 ; final frame of this message443# frame-rsv1 = %x0 ;444# 1 bit, MUST be 0 unless negotiated otherwise445# frame-rsv2 = %x0 ;446# 1 bit, MUST be 0 unless negotiated otherwise447# frame-rsv3 = %x0 ;448# 1 bit, MUST be 0 unless negotiated otherwise449#450# Remove rsv1 from this test for deflate development451if rsv2 or rsv3 or (rsv1 and not self._compress):452raise WebSocketError(453WSCloseCode.PROTOCOL_ERROR,454"Received frame with non-zero reserved bits",455)456457if opcode > 0x7 and fin == 0:458raise WebSocketError(459WSCloseCode.PROTOCOL_ERROR,460"Received fragmented control frame",461)462463has_mask = (second_byte >> 7) & 1464length = second_byte & 0x7F465466# Control frames MUST have a payload467# length of 125 bytes or less468if opcode > 0x7 and length > 125:469raise WebSocketError(470WSCloseCode.PROTOCOL_ERROR,471"Control frame payload cannot be " "larger than 125 bytes",472)473474# Set compress status if last package is FIN475# OR set compress status if this is first fragment476# Raise error if not first fragment with rsv1 = 0x1477if self._frame_fin or self._compressed is None:478self._compressed = True if rsv1 else False479elif rsv1:480raise WebSocketError(481WSCloseCode.PROTOCOL_ERROR,482"Received frame with non-zero reserved bits",483)484485self._frame_fin = bool(fin)486self._frame_opcode = opcode487self._has_mask = bool(has_mask)488self._payload_length_flag = length489self._state = WSParserState.READ_PAYLOAD_LENGTH490else:491break492493# read payload length494if self._state == WSParserState.READ_PAYLOAD_LENGTH:495length = self._payload_length_flag496if length == 126:497if buf_length - start_pos >= 2:498data = buf[start_pos : start_pos + 2]499start_pos += 2500length = UNPACK_LEN2(data)[0]501self._payload_length = length502self._state = (503WSParserState.READ_PAYLOAD_MASK504if self._has_mask505else WSParserState.READ_PAYLOAD506)507else:508break509elif length > 126:510if buf_length - start_pos >= 8:511data = buf[start_pos : start_pos + 8]512start_pos += 8513length = UNPACK_LEN3(data)[0]514self._payload_length = length515self._state = (516WSParserState.READ_PAYLOAD_MASK517if self._has_mask518else WSParserState.READ_PAYLOAD519)520else:521break522else:523self._payload_length = length524self._state = (525WSParserState.READ_PAYLOAD_MASK526if self._has_mask527else WSParserState.READ_PAYLOAD528)529530# read payload mask531if self._state == WSParserState.READ_PAYLOAD_MASK:532if buf_length - start_pos >= 4:533self._frame_mask = buf[start_pos : start_pos + 4]534start_pos += 4535self._state = WSParserState.READ_PAYLOAD536else:537break538539if self._state == WSParserState.READ_PAYLOAD:540length = self._payload_length541payload = self._frame_payload542543chunk_len = buf_length - start_pos544if length >= chunk_len:545self._payload_length = length - chunk_len546payload.extend(buf[start_pos:])547start_pos = buf_length548else:549self._payload_length = 0550payload.extend(buf[start_pos : start_pos + length])551start_pos = start_pos + length552553if self._payload_length == 0:554if self._has_mask:555assert self._frame_mask is not None556_websocket_mask(self._frame_mask, payload)557558frames.append(559(self._frame_fin, self._frame_opcode, payload, self._compressed)560)561562self._frame_payload = bytearray()563self._state = WSParserState.READ_HEADER564else:565break566567self._tail = buf[start_pos:]568569return frames570571572class WebSocketWriter:573def __init__(574self,575protocol: BaseProtocol,576transport: asyncio.Transport,577*,578use_mask: bool = False,579limit: int = DEFAULT_LIMIT,580random: Any = random.Random(),581compress: int = 0,582notakeover: bool = False,583) -> None:584self.protocol = protocol585self.transport = transport586self.use_mask = use_mask587self.randrange = random.randrange588self.compress = compress589self.notakeover = notakeover590self._closing = False591self._limit = limit592self._output_size = 0593self._compressobj = None # type: Any # actually compressobj594595async def _send_frame(596self, message: bytes, opcode: int, compress: Optional[int] = None597) -> None:598"""Send a frame over the websocket with message as its payload."""599if self._closing and not (opcode & WSMsgType.CLOSE):600raise ConnectionResetError("Cannot write to closing transport")601602rsv = 0603604# Only compress larger packets (disabled)605# Does small packet needs to be compressed?606# if self.compress and opcode < 8 and len(message) > 124:607if (compress or self.compress) and opcode < 8:608if compress:609# Do not set self._compress if compressing is for this frame610compressobj = zlib.compressobj(level=zlib.Z_BEST_SPEED, wbits=-compress)611else: # self.compress612if not self._compressobj:613self._compressobj = zlib.compressobj(614level=zlib.Z_BEST_SPEED, wbits=-self.compress615)616compressobj = self._compressobj617618message = compressobj.compress(message)619message = message + compressobj.flush(620zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH621)622if message.endswith(_WS_DEFLATE_TRAILING):623message = message[:-4]624rsv = rsv | 0x40625626msg_length = len(message)627628use_mask = self.use_mask629if use_mask:630mask_bit = 0x80631else:632mask_bit = 0633634if msg_length < 126:635header = PACK_LEN1(0x80 | rsv | opcode, msg_length | mask_bit)636elif msg_length < (1 << 16):637header = PACK_LEN2(0x80 | rsv | opcode, 126 | mask_bit, msg_length)638else:639header = PACK_LEN3(0x80 | rsv | opcode, 127 | mask_bit, msg_length)640if use_mask:641mask = self.randrange(0, 0xFFFFFFFF)642mask = mask.to_bytes(4, "big")643message = bytearray(message)644_websocket_mask(mask, message)645self._write(header + mask + message)646self._output_size += len(header) + len(mask) + len(message)647else:648if len(message) > MSG_SIZE:649self._write(header)650self._write(message)651else:652self._write(header + message)653654self._output_size += len(header) + len(message)655656if self._output_size > self._limit:657self._output_size = 0658await self.protocol._drain_helper()659660def _write(self, data: bytes) -> None:661if self.transport is None or self.transport.is_closing():662raise ConnectionResetError("Cannot write to closing transport")663self.transport.write(data)664665async def pong(self, message: bytes = b"") -> None:666"""Send pong message."""667if isinstance(message, str):668message = message.encode("utf-8")669await self._send_frame(message, WSMsgType.PONG)670671async def ping(self, message: bytes = b"") -> None:672"""Send ping message."""673if isinstance(message, str):674message = message.encode("utf-8")675await self._send_frame(message, WSMsgType.PING)676677async def send(678self,679message: Union[str, bytes],680binary: bool = False,681compress: Optional[int] = None,682) -> None:683"""Send a frame over the websocket with message as its payload."""684if isinstance(message, str):685message = message.encode("utf-8")686if binary:687await self._send_frame(message, WSMsgType.BINARY, compress)688else:689await self._send_frame(message, WSMsgType.TEXT, compress)690691async def close(self, code: int = 1000, message: bytes = b"") -> None:692"""Close the websocket, sending the specified code and message."""693if isinstance(message, str):694message = message.encode("utf-8")695try:696await self._send_frame(697PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE698)699finally:700self._closing = True701702703