Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/payload_streamer.py
7763 views
1
"""
2
Payload implemenation for coroutines as data provider.
3
4
As a simple case, you can upload data from file::
5
6
@aiohttp.streamer
7
async def file_sender(writer, file_name=None):
8
with open(file_name, 'rb') as f:
9
chunk = f.read(2**16)
10
while chunk:
11
await writer.write(chunk)
12
13
chunk = f.read(2**16)
14
15
Then you can use `file_sender` like this:
16
17
async with session.post('http://httpbin.org/post',
18
data=file_sender(file_name='huge_file')) as resp:
19
print(await resp.text())
20
21
..note:: Coroutine must accept `writer` as first argument
22
23
"""
24
25
import types
26
import warnings
27
from typing import Any, Awaitable, Callable, Dict, Tuple
28
29
from .abc import AbstractStreamWriter
30
from .payload import Payload, payload_type
31
32
__all__ = ("streamer",)
33
34
35
class _stream_wrapper:
36
def __init__(
37
self,
38
coro: Callable[..., Awaitable[None]],
39
args: Tuple[Any, ...],
40
kwargs: Dict[str, Any],
41
) -> None:
42
self.coro = types.coroutine(coro)
43
self.args = args
44
self.kwargs = kwargs
45
46
async def __call__(self, writer: AbstractStreamWriter) -> None:
47
await self.coro(writer, *self.args, **self.kwargs) # type: ignore[operator]
48
49
50
class streamer:
51
def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:
52
warnings.warn(
53
"@streamer is deprecated, use async generators instead",
54
DeprecationWarning,
55
stacklevel=2,
56
)
57
self.coro = coro
58
59
def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:
60
return _stream_wrapper(self.coro, args, kwargs)
61
62
63
@payload_type(_stream_wrapper)
64
class StreamWrapperPayload(Payload):
65
async def write(self, writer: AbstractStreamWriter) -> None:
66
await self._value(writer)
67
68
69
@payload_type(streamer)
70
class StreamPayload(StreamWrapperPayload):
71
def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:
72
super().__init__(value(), *args, **kwargs)
73
74
async def write(self, writer: AbstractStreamWriter) -> None:
75
await self._value(writer)
76
77