Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/payload_streamer.py
7763 views
"""1Payload implemenation for coroutines as data provider.23As a simple case, you can upload data from file::45@aiohttp.streamer6async def file_sender(writer, file_name=None):7with open(file_name, 'rb') as f:8chunk = f.read(2**16)9while chunk:10await writer.write(chunk)1112chunk = f.read(2**16)1314Then you can use `file_sender` like this:1516async with session.post('http://httpbin.org/post',17data=file_sender(file_name='huge_file')) as resp:18print(await resp.text())1920..note:: Coroutine must accept `writer` as first argument2122"""2324import types25import warnings26from typing import Any, Awaitable, Callable, Dict, Tuple2728from .abc import AbstractStreamWriter29from .payload import Payload, payload_type3031__all__ = ("streamer",)323334class _stream_wrapper:35def __init__(36self,37coro: Callable[..., Awaitable[None]],38args: Tuple[Any, ...],39kwargs: Dict[str, Any],40) -> None:41self.coro = types.coroutine(coro)42self.args = args43self.kwargs = kwargs4445async def __call__(self, writer: AbstractStreamWriter) -> None:46await self.coro(writer, *self.args, **self.kwargs) # type: ignore[operator]474849class streamer:50def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:51warnings.warn(52"@streamer is deprecated, use async generators instead",53DeprecationWarning,54stacklevel=2,55)56self.coro = coro5758def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:59return _stream_wrapper(self.coro, args, kwargs)606162@payload_type(_stream_wrapper)63class StreamWrapperPayload(Payload):64async def write(self, writer: AbstractStreamWriter) -> None:65await self._value(writer)666768@payload_type(streamer)69class StreamPayload(StreamWrapperPayload):70def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:71super().__init__(value(), *args, **kwargs)7273async def write(self, writer: AbstractStreamWriter) -> None:74await self._value(writer)757677