Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/streams.py
7763 views
import asyncio1import collections2import warnings3from typing import Awaitable, Callable, Deque, Generic, List, Optional, Tuple, TypeVar45from .base_protocol import BaseProtocol6from .helpers import BaseTimerContext, set_exception, set_result7from .log import internal_logger8from .typedefs import Final910__all__ = (11"EMPTY_PAYLOAD",12"EofStream",13"StreamReader",14"DataQueue",15"FlowControlDataQueue",16)1718_T = TypeVar("_T")192021class EofStream(Exception):22"""eof stream indication."""232425class AsyncStreamIterator(Generic[_T]):26def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:27self.read_func = read_func2829def __aiter__(self) -> "AsyncStreamIterator[_T]":30return self3132async def __anext__(self) -> _T:33try:34rv = await self.read_func()35except EofStream:36raise StopAsyncIteration37if rv == b"":38raise StopAsyncIteration39return rv404142class ChunkTupleAsyncStreamIterator:43def __init__(self, stream: "StreamReader") -> None:44self._stream = stream4546def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":47return self4849async def __anext__(self) -> Tuple[bytes, bool]:50rv = await self._stream.readchunk()51if rv == (b"", False):52raise StopAsyncIteration53return rv545556class AsyncStreamReaderMixin:57def __aiter__(self) -> AsyncStreamIterator[bytes]:58return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]5960def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:61"""Returns an asynchronous iterator that yields chunks of size n.6263Python-3.5 available for Python 3.5+ only64"""65return AsyncStreamIterator(66lambda: self.read(n) # type: ignore[attr-defined,no-any-return]67)6869def iter_any(self) -> AsyncStreamIterator[bytes]:70"""Yield all available data as soon as it is received.7172Python-3.5 available for Python 3.5+ only73"""74return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]7576def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:77"""Yield chunks of data as they are received by the server.7879The yielded objects are tuples80of (bytes, bool) as returned by the StreamReader.readchunk method.8182Python-3.5 available for Python 3.5+ only83"""84return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]858687class StreamReader(AsyncStreamReaderMixin):88"""An enhancement of asyncio.StreamReader.8990Supports asynchronous iteration by line, chunk or as available::9192async for line in reader:93...94async for chunk in reader.iter_chunked(1024):95...96async for slice in reader.iter_any():97...9899"""100101total_bytes = 0102103def __init__(104self,105protocol: BaseProtocol,106limit: int,107*,108timer: Optional[BaseTimerContext] = None,109loop: Optional[asyncio.AbstractEventLoop] = None,110) -> None:111self._protocol = protocol112self._low_water = limit113self._high_water = limit * 2114if loop is None:115loop = asyncio.get_event_loop()116self._loop = loop117self._size = 0118self._cursor = 0119self._http_chunk_splits = None # type: Optional[List[int]]120self._buffer = collections.deque() # type: Deque[bytes]121self._buffer_offset = 0122self._eof = False123self._waiter = None # type: Optional[asyncio.Future[None]]124self._eof_waiter = None # type: Optional[asyncio.Future[None]]125self._exception = None # type: Optional[BaseException]126self._timer = timer127self._eof_callbacks = [] # type: List[Callable[[], None]]128129def __repr__(self) -> str:130info = [self.__class__.__name__]131if self._size:132info.append("%d bytes" % self._size)133if self._eof:134info.append("eof")135if self._low_water != 2 ** 16: # default limit136info.append("low=%d high=%d" % (self._low_water, self._high_water))137if self._waiter:138info.append("w=%r" % self._waiter)139if self._exception:140info.append("e=%r" % self._exception)141return "<%s>" % " ".join(info)142143def get_read_buffer_limits(self) -> Tuple[int, int]:144return (self._low_water, self._high_water)145146def exception(self) -> Optional[BaseException]:147return self._exception148149def set_exception(self, exc: BaseException) -> None:150self._exception = exc151self._eof_callbacks.clear()152153waiter = self._waiter154if waiter is not None:155self._waiter = None156set_exception(waiter, exc)157158waiter = self._eof_waiter159if waiter is not None:160self._eof_waiter = None161set_exception(waiter, exc)162163def on_eof(self, callback: Callable[[], None]) -> None:164if self._eof:165try:166callback()167except Exception:168internal_logger.exception("Exception in eof callback")169else:170self._eof_callbacks.append(callback)171172def feed_eof(self) -> None:173self._eof = True174175waiter = self._waiter176if waiter is not None:177self._waiter = None178set_result(waiter, None)179180waiter = self._eof_waiter181if waiter is not None:182self._eof_waiter = None183set_result(waiter, None)184185for cb in self._eof_callbacks:186try:187cb()188except Exception:189internal_logger.exception("Exception in eof callback")190191self._eof_callbacks.clear()192193def is_eof(self) -> bool:194"""Return True if 'feed_eof' was called."""195return self._eof196197def at_eof(self) -> bool:198"""Return True if the buffer is empty and 'feed_eof' was called."""199return self._eof and not self._buffer200201async def wait_eof(self) -> None:202if self._eof:203return204205assert self._eof_waiter is None206self._eof_waiter = self._loop.create_future()207try:208await self._eof_waiter209finally:210self._eof_waiter = None211212def unread_data(self, data: bytes) -> None:213"""rollback reading some data from stream, inserting it to buffer head."""214warnings.warn(215"unread_data() is deprecated "216"and will be removed in future releases (#3260)",217DeprecationWarning,218stacklevel=2,219)220if not data:221return222223if self._buffer_offset:224self._buffer[0] = self._buffer[0][self._buffer_offset :]225self._buffer_offset = 0226self._size += len(data)227self._cursor -= len(data)228self._buffer.appendleft(data)229self._eof_counter = 0230231# TODO: size is ignored, remove the param later232def feed_data(self, data: bytes, size: int = 0) -> None:233assert not self._eof, "feed_data after feed_eof"234235if not data:236return237238self._size += len(data)239self._buffer.append(data)240self.total_bytes += len(data)241242waiter = self._waiter243if waiter is not None:244self._waiter = None245set_result(waiter, None)246247if self._size > self._high_water and not self._protocol._reading_paused:248self._protocol.pause_reading()249250def begin_http_chunk_receiving(self) -> None:251if self._http_chunk_splits is None:252if self.total_bytes:253raise RuntimeError(254"Called begin_http_chunk_receiving when" "some data was already fed"255)256self._http_chunk_splits = []257258def end_http_chunk_receiving(self) -> None:259if self._http_chunk_splits is None:260raise RuntimeError(261"Called end_chunk_receiving without calling "262"begin_chunk_receiving first"263)264265# self._http_chunk_splits contains logical byte offsets from start of266# the body transfer. Each offset is the offset of the end of a chunk.267# "Logical" means bytes, accessible for a user.268# If no chunks containig logical data were received, current position269# is difinitely zero.270pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0271272if self.total_bytes == pos:273# We should not add empty chunks here. So we check for that.274# Note, when chunked + gzip is used, we can receive a chunk275# of compressed data, but that data may not be enough for gzip FSM276# to yield any uncompressed data. That's why current position may277# not change after receiving a chunk.278return279280self._http_chunk_splits.append(self.total_bytes)281282# wake up readchunk when end of http chunk received283waiter = self._waiter284if waiter is not None:285self._waiter = None286set_result(waiter, None)287288async def _wait(self, func_name: str) -> None:289# StreamReader uses a future to link the protocol feed_data() method290# to a read coroutine. Running two read coroutines at the same time291# would have an unexpected behaviour. It would not possible to know292# which coroutine would get the next data.293if self._waiter is not None:294raise RuntimeError(295"%s() called while another coroutine is "296"already waiting for incoming data" % func_name297)298299waiter = self._waiter = self._loop.create_future()300try:301if self._timer:302with self._timer:303await waiter304else:305await waiter306finally:307self._waiter = None308309async def readline(self) -> bytes:310return await self.readuntil()311312async def readuntil(self, separator: bytes = b"\n") -> bytes:313seplen = len(separator)314if seplen == 0:315raise ValueError("Separator should be at least one-byte string")316317if self._exception is not None:318raise self._exception319320chunk = b""321chunk_size = 0322not_enough = True323324while not_enough:325while self._buffer and not_enough:326offset = self._buffer_offset327ichar = self._buffer[0].find(separator, offset) + 1328# Read from current offset to found separator or to the end.329data = self._read_nowait_chunk(ichar - offset if ichar else -1)330chunk += data331chunk_size += len(data)332if ichar:333not_enough = False334335if chunk_size > self._high_water:336raise ValueError("Chunk too big")337338if self._eof:339break340341if not_enough:342await self._wait("readuntil")343344return chunk345346async def read(self, n: int = -1) -> bytes:347if self._exception is not None:348raise self._exception349350# migration problem; with DataQueue you have to catch351# EofStream exception, so common way is to run payload.read() inside352# infinite loop. what can cause real infinite loop with StreamReader353# lets keep this code one major release.354if __debug__:355if self._eof and not self._buffer:356self._eof_counter = getattr(self, "_eof_counter", 0) + 1357if self._eof_counter > 5:358internal_logger.warning(359"Multiple access to StreamReader in eof state, "360"might be infinite loop.",361stack_info=True,362)363364if not n:365return b""366367if n < 0:368# This used to just loop creating a new waiter hoping to369# collect everything in self._buffer, but that would370# deadlock if the subprocess sends more than self.limit371# bytes. So just call self.readany() until EOF.372blocks = []373while True:374block = await self.readany()375if not block:376break377blocks.append(block)378return b"".join(blocks)379380# TODO: should be `if` instead of `while`381# because waiter maybe triggered on chunk end,382# without feeding any data383while not self._buffer and not self._eof:384await self._wait("read")385386return self._read_nowait(n)387388async def readany(self) -> bytes:389if self._exception is not None:390raise self._exception391392# TODO: should be `if` instead of `while`393# because waiter maybe triggered on chunk end,394# without feeding any data395while not self._buffer and not self._eof:396await self._wait("readany")397398return self._read_nowait(-1)399400async def readchunk(self) -> Tuple[bytes, bool]:401"""Returns a tuple of (data, end_of_http_chunk).402403When chunked transfer404encoding is used, end_of_http_chunk is a boolean indicating if the end405of the data corresponds to the end of a HTTP chunk , otherwise it is406always False.407"""408while True:409if self._exception is not None:410raise self._exception411412while self._http_chunk_splits:413pos = self._http_chunk_splits.pop(0)414if pos == self._cursor:415return (b"", True)416if pos > self._cursor:417return (self._read_nowait(pos - self._cursor), True)418internal_logger.warning(419"Skipping HTTP chunk end due to data "420"consumption beyond chunk boundary"421)422423if self._buffer:424return (self._read_nowait_chunk(-1), False)425# return (self._read_nowait(-1), False)426427if self._eof:428# Special case for signifying EOF.429# (b'', True) is not a final return value actually.430return (b"", False)431432await self._wait("readchunk")433434async def readexactly(self, n: int) -> bytes:435if self._exception is not None:436raise self._exception437438blocks = [] # type: List[bytes]439while n > 0:440block = await self.read(n)441if not block:442partial = b"".join(blocks)443raise asyncio.IncompleteReadError(partial, len(partial) + n)444blocks.append(block)445n -= len(block)446447return b"".join(blocks)448449def read_nowait(self, n: int = -1) -> bytes:450# default was changed to be consistent with .read(-1)451#452# I believe the most users don't know about the method and453# they are not affected.454if self._exception is not None:455raise self._exception456457if self._waiter and not self._waiter.done():458raise RuntimeError(459"Called while some coroutine is waiting for incoming data."460)461462return self._read_nowait(n)463464def _read_nowait_chunk(self, n: int) -> bytes:465first_buffer = self._buffer[0]466offset = self._buffer_offset467if n != -1 and len(first_buffer) - offset > n:468data = first_buffer[offset : offset + n]469self._buffer_offset += n470471elif offset:472self._buffer.popleft()473data = first_buffer[offset:]474self._buffer_offset = 0475476else:477data = self._buffer.popleft()478479self._size -= len(data)480self._cursor += len(data)481482chunk_splits = self._http_chunk_splits483# Prevent memory leak: drop useless chunk splits484while chunk_splits and chunk_splits[0] < self._cursor:485chunk_splits.pop(0)486487if self._size < self._low_water and self._protocol._reading_paused:488self._protocol.resume_reading()489return data490491def _read_nowait(self, n: int) -> bytes:492"""Read not more than n bytes, or whole buffer if n == -1"""493chunks = []494495while self._buffer:496chunk = self._read_nowait_chunk(n)497chunks.append(chunk)498if n != -1:499n -= len(chunk)500if n == 0:501break502503return b"".join(chunks) if chunks else b""504505506class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]507def __init__(self) -> None:508pass509510def exception(self) -> Optional[BaseException]:511return None512513def set_exception(self, exc: BaseException) -> None:514pass515516def on_eof(self, callback: Callable[[], None]) -> None:517try:518callback()519except Exception:520internal_logger.exception("Exception in eof callback")521522def feed_eof(self) -> None:523pass524525def is_eof(self) -> bool:526return True527528def at_eof(self) -> bool:529return True530531async def wait_eof(self) -> None:532return533534def feed_data(self, data: bytes, n: int = 0) -> None:535pass536537async def readline(self) -> bytes:538return b""539540async def read(self, n: int = -1) -> bytes:541return b""542543# TODO add async def readuntil544545async def readany(self) -> bytes:546return b""547548async def readchunk(self) -> Tuple[bytes, bool]:549return (b"", True)550551async def readexactly(self, n: int) -> bytes:552raise asyncio.IncompleteReadError(b"", n)553554def read_nowait(self, n: int = -1) -> bytes:555return b""556557558EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()559560561class DataQueue(Generic[_T]):562"""DataQueue is a general-purpose blocking queue with one reader."""563564def __init__(self, loop: asyncio.AbstractEventLoop) -> None:565self._loop = loop566self._eof = False567self._waiter = None # type: Optional[asyncio.Future[None]]568self._exception = None # type: Optional[BaseException]569self._size = 0570self._buffer = collections.deque() # type: Deque[Tuple[_T, int]]571572def __len__(self) -> int:573return len(self._buffer)574575def is_eof(self) -> bool:576return self._eof577578def at_eof(self) -> bool:579return self._eof and not self._buffer580581def exception(self) -> Optional[BaseException]:582return self._exception583584def set_exception(self, exc: BaseException) -> None:585self._eof = True586self._exception = exc587588waiter = self._waiter589if waiter is not None:590self._waiter = None591set_exception(waiter, exc)592593def feed_data(self, data: _T, size: int = 0) -> None:594self._size += size595self._buffer.append((data, size))596597waiter = self._waiter598if waiter is not None:599self._waiter = None600set_result(waiter, None)601602def feed_eof(self) -> None:603self._eof = True604605waiter = self._waiter606if waiter is not None:607self._waiter = None608set_result(waiter, None)609610async def read(self) -> _T:611if not self._buffer and not self._eof:612assert not self._waiter613self._waiter = self._loop.create_future()614try:615await self._waiter616except (asyncio.CancelledError, asyncio.TimeoutError):617self._waiter = None618raise619620if self._buffer:621data, size = self._buffer.popleft()622self._size -= size623return data624else:625if self._exception is not None:626raise self._exception627else:628raise EofStream629630def __aiter__(self) -> AsyncStreamIterator[_T]:631return AsyncStreamIterator(self.read)632633634class FlowControlDataQueue(DataQueue[_T]):635"""FlowControlDataQueue resumes and pauses an underlying stream.636637It is a destination for parsed data.638"""639640def __init__(641self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop642) -> None:643super().__init__(loop=loop)644645self._protocol = protocol646self._limit = limit * 2647648def feed_data(self, data: _T, size: int = 0) -> None:649super().feed_data(data, size)650651if self._size > self._limit and not self._protocol._reading_paused:652self._protocol.pause_reading()653654async def read(self) -> _T:655try:656return await super().read()657finally:658if self._size < self._limit and self._protocol._reading_paused:659self._protocol.resume_reading()660661662