Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/client_ws.py
7763 views
1
"""WebSocket client for asyncio."""
2
3
import asyncio
4
from typing import Any, Optional, cast
5
6
import async_timeout
7
8
from .client_exceptions import ClientError
9
from .client_reqrep import ClientResponse
10
from .helpers import call_later, set_result
11
from .http import (
12
WS_CLOSED_MESSAGE,
13
WS_CLOSING_MESSAGE,
14
WebSocketError,
15
WSCloseCode,
16
WSMessage,
17
WSMsgType,
18
)
19
from .http_websocket import WebSocketWriter # WSMessage
20
from .streams import EofStream, FlowControlDataQueue
21
from .typedefs import (
22
DEFAULT_JSON_DECODER,
23
DEFAULT_JSON_ENCODER,
24
JSONDecoder,
25
JSONEncoder,
26
)
27
28
29
class ClientWebSocketResponse:
30
def __init__(
31
self,
32
reader: "FlowControlDataQueue[WSMessage]",
33
writer: WebSocketWriter,
34
protocol: Optional[str],
35
response: ClientResponse,
36
timeout: float,
37
autoclose: bool,
38
autoping: bool,
39
loop: asyncio.AbstractEventLoop,
40
*,
41
receive_timeout: Optional[float] = None,
42
heartbeat: Optional[float] = None,
43
compress: int = 0,
44
client_notakeover: bool = False,
45
) -> None:
46
self._response = response
47
self._conn = response.connection
48
49
self._writer = writer
50
self._reader = reader
51
self._protocol = protocol
52
self._closed = False
53
self._closing = False
54
self._close_code = None # type: Optional[int]
55
self._timeout = timeout
56
self._receive_timeout = receive_timeout
57
self._autoclose = autoclose
58
self._autoping = autoping
59
self._heartbeat = heartbeat
60
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
61
if heartbeat is not None:
62
self._pong_heartbeat = heartbeat / 2.0
63
self._pong_response_cb: Optional[asyncio.TimerHandle] = None
64
self._loop = loop
65
self._waiting = None # type: Optional[asyncio.Future[bool]]
66
self._exception = None # type: Optional[BaseException]
67
self._compress = compress
68
self._client_notakeover = client_notakeover
69
70
self._reset_heartbeat()
71
72
def _cancel_heartbeat(self) -> None:
73
if self._pong_response_cb is not None:
74
self._pong_response_cb.cancel()
75
self._pong_response_cb = None
76
77
if self._heartbeat_cb is not None:
78
self._heartbeat_cb.cancel()
79
self._heartbeat_cb = None
80
81
def _reset_heartbeat(self) -> None:
82
self._cancel_heartbeat()
83
84
if self._heartbeat is not None:
85
self._heartbeat_cb = call_later(
86
self._send_heartbeat, self._heartbeat, self._loop
87
)
88
89
def _send_heartbeat(self) -> None:
90
if self._heartbeat is not None and not self._closed:
91
# fire-and-forget a task is not perfect but maybe ok for
92
# sending ping. Otherwise we need a long-living heartbeat
93
# task in the class.
94
self._loop.create_task(self._writer.ping())
95
96
if self._pong_response_cb is not None:
97
self._pong_response_cb.cancel()
98
self._pong_response_cb = call_later(
99
self._pong_not_received, self._pong_heartbeat, self._loop
100
)
101
102
def _pong_not_received(self) -> None:
103
if not self._closed:
104
self._closed = True
105
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
106
self._exception = asyncio.TimeoutError()
107
self._response.close()
108
109
@property
110
def closed(self) -> bool:
111
return self._closed
112
113
@property
114
def close_code(self) -> Optional[int]:
115
return self._close_code
116
117
@property
118
def protocol(self) -> Optional[str]:
119
return self._protocol
120
121
@property
122
def compress(self) -> int:
123
return self._compress
124
125
@property
126
def client_notakeover(self) -> bool:
127
return self._client_notakeover
128
129
def get_extra_info(self, name: str, default: Any = None) -> Any:
130
"""extra info from connection transport"""
131
conn = self._response.connection
132
if conn is None:
133
return default
134
transport = conn.transport
135
if transport is None:
136
return default
137
return transport.get_extra_info(name, default)
138
139
def exception(self) -> Optional[BaseException]:
140
return self._exception
141
142
async def ping(self, message: bytes = b"") -> None:
143
await self._writer.ping(message)
144
145
async def pong(self, message: bytes = b"") -> None:
146
await self._writer.pong(message)
147
148
async def send_str(self, data: str, compress: Optional[int] = None) -> None:
149
if not isinstance(data, str):
150
raise TypeError("data argument must be str (%r)" % type(data))
151
await self._writer.send(data, binary=False, compress=compress)
152
153
async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:
154
if not isinstance(data, (bytes, bytearray, memoryview)):
155
raise TypeError("data argument must be byte-ish (%r)" % type(data))
156
await self._writer.send(data, binary=True, compress=compress)
157
158
async def send_json(
159
self,
160
data: Any,
161
compress: Optional[int] = None,
162
*,
163
dumps: JSONEncoder = DEFAULT_JSON_ENCODER,
164
) -> None:
165
await self.send_str(dumps(data), compress=compress)
166
167
async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:
168
# we need to break `receive()` cycle first,
169
# `close()` may be called from different task
170
if self._waiting is not None and not self._closed:
171
self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
172
await self._waiting
173
174
if not self._closed:
175
self._cancel_heartbeat()
176
self._closed = True
177
try:
178
await self._writer.close(code, message)
179
except asyncio.CancelledError:
180
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
181
self._response.close()
182
raise
183
except Exception as exc:
184
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
185
self._exception = exc
186
self._response.close()
187
return True
188
189
if self._closing:
190
self._response.close()
191
return True
192
193
while True:
194
try:
195
async with async_timeout.timeout(self._timeout):
196
msg = await self._reader.read()
197
except asyncio.CancelledError:
198
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
199
self._response.close()
200
raise
201
except Exception as exc:
202
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
203
self._exception = exc
204
self._response.close()
205
return True
206
207
if msg.type == WSMsgType.CLOSE:
208
self._close_code = msg.data
209
self._response.close()
210
return True
211
else:
212
return False
213
214
async def receive(self, timeout: Optional[float] = None) -> WSMessage:
215
while True:
216
if self._waiting is not None:
217
raise RuntimeError("Concurrent call to receive() is not allowed")
218
219
if self._closed:
220
return WS_CLOSED_MESSAGE
221
elif self._closing:
222
await self.close()
223
return WS_CLOSED_MESSAGE
224
225
try:
226
self._waiting = self._loop.create_future()
227
try:
228
async with async_timeout.timeout(timeout or self._receive_timeout):
229
msg = await self._reader.read()
230
self._reset_heartbeat()
231
finally:
232
waiter = self._waiting
233
self._waiting = None
234
set_result(waiter, True)
235
except (asyncio.CancelledError, asyncio.TimeoutError):
236
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
237
raise
238
except EofStream:
239
self._close_code = WSCloseCode.OK
240
await self.close()
241
return WSMessage(WSMsgType.CLOSED, None, None)
242
except ClientError:
243
self._closed = True
244
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
245
return WS_CLOSED_MESSAGE
246
except WebSocketError as exc:
247
self._close_code = exc.code
248
await self.close(code=exc.code)
249
return WSMessage(WSMsgType.ERROR, exc, None)
250
except Exception as exc:
251
self._exception = exc
252
self._closing = True
253
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
254
await self.close()
255
return WSMessage(WSMsgType.ERROR, exc, None)
256
257
if msg.type == WSMsgType.CLOSE:
258
self._closing = True
259
self._close_code = msg.data
260
if not self._closed and self._autoclose:
261
await self.close()
262
elif msg.type == WSMsgType.CLOSING:
263
self._closing = True
264
elif msg.type == WSMsgType.PING and self._autoping:
265
await self.pong(msg.data)
266
continue
267
elif msg.type == WSMsgType.PONG and self._autoping:
268
continue
269
270
return msg
271
272
async def receive_str(self, *, timeout: Optional[float] = None) -> str:
273
msg = await self.receive(timeout)
274
if msg.type != WSMsgType.TEXT:
275
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
276
return cast(str, msg.data)
277
278
async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
279
msg = await self.receive(timeout)
280
if msg.type != WSMsgType.BINARY:
281
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
282
return cast(bytes, msg.data)
283
284
async def receive_json(
285
self,
286
*,
287
loads: JSONDecoder = DEFAULT_JSON_DECODER,
288
timeout: Optional[float] = None,
289
) -> Any:
290
data = await self.receive_str(timeout=timeout)
291
return loads(data)
292
293
def __aiter__(self) -> "ClientWebSocketResponse":
294
return self
295
296
async def __anext__(self) -> WSMessage:
297
msg = await self.receive()
298
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
299
raise StopAsyncIteration
300
return msg
301
302