Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client_proto.py
7763 views
1
import asyncio
2
from contextlib import suppress
3
from typing import Any, Optional, Tuple
4
5
from .base_protocol import BaseProtocol
6
from .client_exceptions import (
7
ClientOSError,
8
ClientPayloadError,
9
ServerDisconnectedError,
10
ServerTimeoutError,
11
)
12
from .helpers import BaseTimerContext
13
from .http import HttpResponseParser, RawResponseMessage
14
from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
15
16
17
class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
18
"""Helper class to adapt between Protocol and StreamReader."""
19
20
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21
BaseProtocol.__init__(self, loop=loop)
22
DataQueue.__init__(self, loop)
23
24
self._should_close = False
25
26
self._payload: Optional[StreamReader] = None
27
self._skip_payload = False
28
self._payload_parser = None
29
30
self._timer = None
31
32
self._tail = b""
33
self._upgraded = False
34
self._parser = None # type: Optional[HttpResponseParser]
35
36
self._read_timeout = None # type: Optional[float]
37
self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]
38
39
@property
40
def upgraded(self) -> bool:
41
return self._upgraded
42
43
@property
44
def should_close(self) -> bool:
45
if self._payload is not None and not self._payload.is_eof() or self._upgraded:
46
return True
47
48
return (
49
self._should_close
50
or self._upgraded
51
or self.exception() is not None
52
or self._payload_parser is not None
53
or len(self) > 0
54
or bool(self._tail)
55
)
56
57
def force_close(self) -> None:
58
self._should_close = True
59
60
def close(self) -> None:
61
transport = self.transport
62
if transport is not None:
63
transport.close()
64
self.transport = None
65
self._payload = None
66
self._drop_timeout()
67
68
def is_connected(self) -> bool:
69
return self.transport is not None and not self.transport.is_closing()
70
71
def connection_lost(self, exc: Optional[BaseException]) -> None:
72
self._drop_timeout()
73
74
if self._payload_parser is not None:
75
with suppress(Exception):
76
self._payload_parser.feed_eof()
77
78
uncompleted = None
79
if self._parser is not None:
80
try:
81
uncompleted = self._parser.feed_eof()
82
except Exception:
83
if self._payload is not None:
84
self._payload.set_exception(
85
ClientPayloadError("Response payload is not completed")
86
)
87
88
if not self.is_eof():
89
if isinstance(exc, OSError):
90
exc = ClientOSError(*exc.args)
91
if exc is None:
92
exc = ServerDisconnectedError(uncompleted)
93
# assigns self._should_close to True as side effect,
94
# we do it anyway below
95
self.set_exception(exc)
96
97
self._should_close = True
98
self._parser = None
99
self._payload = None
100
self._payload_parser = None
101
self._reading_paused = False
102
103
super().connection_lost(exc)
104
105
def eof_received(self) -> None:
106
# should call parser.feed_eof() most likely
107
self._drop_timeout()
108
109
def pause_reading(self) -> None:
110
super().pause_reading()
111
self._drop_timeout()
112
113
def resume_reading(self) -> None:
114
super().resume_reading()
115
self._reschedule_timeout()
116
117
def set_exception(self, exc: BaseException) -> None:
118
self._should_close = True
119
self._drop_timeout()
120
super().set_exception(exc)
121
122
def set_parser(self, parser: Any, payload: Any) -> None:
123
# TODO: actual types are:
124
# parser: WebSocketReader
125
# payload: FlowControlDataQueue
126
# but they are not generi enough
127
# Need an ABC for both types
128
self._payload = payload
129
self._payload_parser = parser
130
131
self._drop_timeout()
132
133
if self._tail:
134
data, self._tail = self._tail, b""
135
self.data_received(data)
136
137
def set_response_params(
138
self,
139
*,
140
timer: Optional[BaseTimerContext] = None,
141
skip_payload: bool = False,
142
read_until_eof: bool = False,
143
auto_decompress: bool = True,
144
read_timeout: Optional[float] = None,
145
read_bufsize: int = 2 ** 16,
146
) -> None:
147
self._skip_payload = skip_payload
148
149
self._read_timeout = read_timeout
150
self._reschedule_timeout()
151
152
self._parser = HttpResponseParser(
153
self,
154
self._loop,
155
read_bufsize,
156
timer=timer,
157
payload_exception=ClientPayloadError,
158
response_with_body=not skip_payload,
159
read_until_eof=read_until_eof,
160
auto_decompress=auto_decompress,
161
)
162
163
if self._tail:
164
data, self._tail = self._tail, b""
165
self.data_received(data)
166
167
def _drop_timeout(self) -> None:
168
if self._read_timeout_handle is not None:
169
self._read_timeout_handle.cancel()
170
self._read_timeout_handle = None
171
172
def _reschedule_timeout(self) -> None:
173
timeout = self._read_timeout
174
if self._read_timeout_handle is not None:
175
self._read_timeout_handle.cancel()
176
177
if timeout:
178
self._read_timeout_handle = self._loop.call_later(
179
timeout, self._on_read_timeout
180
)
181
else:
182
self._read_timeout_handle = None
183
184
def _on_read_timeout(self) -> None:
185
exc = ServerTimeoutError("Timeout on reading data from socket")
186
self.set_exception(exc)
187
if self._payload is not None:
188
self._payload.set_exception(exc)
189
190
def data_received(self, data: bytes) -> None:
191
self._reschedule_timeout()
192
193
if not data:
194
return
195
196
# custom payload parser
197
if self._payload_parser is not None:
198
eof, tail = self._payload_parser.feed_data(data)
199
if eof:
200
self._payload = None
201
self._payload_parser = None
202
203
if tail:
204
self.data_received(tail)
205
return
206
else:
207
if self._upgraded or self._parser is None:
208
# i.e. websocket connection, websocket parser is not set yet
209
self._tail += data
210
else:
211
# parse http messages
212
try:
213
messages, upgraded, tail = self._parser.feed_data(data)
214
except BaseException as exc:
215
if self.transport is not None:
216
# connection.release() could be called BEFORE
217
# data_received(), the transport is already
218
# closed in this case
219
self.transport.close()
220
# should_close is True after the call
221
self.set_exception(exc)
222
return
223
224
self._upgraded = upgraded
225
226
payload: Optional[StreamReader] = None
227
for message, payload in messages:
228
if message.should_close:
229
self._should_close = True
230
231
self._payload = payload
232
233
if self._skip_payload or message.code in (204, 304):
234
self.feed_data((message, EMPTY_PAYLOAD), 0)
235
else:
236
self.feed_data((message, payload), 0)
237
if payload is not None:
238
# new message(s) was processed
239
# register timeout handler unsubscribing
240
# either on end-of-stream or immediately for
241
# EMPTY_PAYLOAD
242
if payload is not EMPTY_PAYLOAD:
243
payload.on_eof(self._drop_timeout)
244
else:
245
self._drop_timeout()
246
247
if tail:
248
if upgraded:
249
self.data_received(tail)
250
else:
251
self._tail = tail
252
253