Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/twitchio/websocket.py
7774 views
1
# -*- coding: utf-8 -*-
2
3
"""
4
The MIT License (MIT)
5
6
Copyright (c) 2017-2021 TwitchIO
7
8
Permission is hereby granted, free of charge, to any person obtaining a
9
copy of this software and associated documentation files (the "Software"),
10
to deal in the Software without restriction, including without limitation
11
the rights to use, copy, modify, merge, publish, distribute, sublicense,
12
and/or sell copies of the Software, and to permit persons to whom the
13
Software is furnished to do so, subject to the following conditions:
14
15
The above copyright notice and this permission notice shall be included in
16
all copies or substantial portions of the Software.
17
18
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
19
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24
DEALINGS IN THE SOFTWARE.
25
"""
26
27
import asyncio
28
import logging
29
import re
30
import sys
31
import time
32
import traceback
33
from functools import partial
34
from typing import Union, Optional, List, TYPE_CHECKING
35
36
import aiohttp
37
38
from .backoff import ExponentialBackoff
39
from .channel import Channel
40
from .errors import AuthenticationError
41
from .message import Message
42
from .parse import parser
43
from .chatter import Chatter, PartialChatter, WhisperChatter
44
45
if TYPE_CHECKING:
46
from .client import Client
47
48
49
log = logging.getLogger(__name__)
50
HOST = "wss://irc-ws.chat.twitch.tv:443"
51
52
53
class WSConnection:
54
def __init__(
55
self,
56
*,
57
loop: asyncio.AbstractEventLoop,
58
heartbeat: Optional[float],
59
client: "Client",
60
token: str = None,
61
modes: tuple = None,
62
initial_channels: List[str] = None,
63
):
64
self._loop = loop
65
self._backoff = ExponentialBackoff()
66
self._keeper: Optional[asyncio.Task] = None
67
self._websocket = None
68
self._heartbeat = heartbeat
69
self._ws_ready_event: asyncio.Event = asyncio.Event()
70
self.is_ready: asyncio.Event = asyncio.Event()
71
self._join_lock: asyncio.Lock = asyncio.Lock()
72
self._join_handle = 0
73
self._join_tick = 20
74
self._join_pending = {}
75
self._join_load = {}
76
self._init = False
77
78
self._cache = {}
79
self._actions = {
80
"PING": self._ping,
81
"PART": self._part,
82
"PRIVMSG": self._privmsg,
83
"PRIVMSG(ECHO)": self._privmsg_echo,
84
"USERSTATE": self._userstate,
85
"USERNOTICE": self._usernotice,
86
"JOIN": self._join,
87
"MODE": self._mode,
88
"RECONNECT": self._reconnect,
89
"WHISPER": self._privmsg,
90
}
91
92
self.nick = None
93
self.user_id = None
94
self._token = token
95
self.modes = modes or ("commands", "tags", "membership")
96
self._initial_channels = initial_channels or []
97
98
if callable(self._initial_channels):
99
_temp_initial_channels = self._initial_channels()
100
if isinstance(_temp_initial_channels, (list, tuple)):
101
self._initial_channels = _temp_initial_channels
102
else:
103
self._initial_channels = [_temp_initial_channels]
104
105
self._last_ping = 0
106
self._reconnect_requested = False
107
108
self._client = client
109
110
@property
111
def is_alive(self) -> bool:
112
return self._websocket is not None and not self._websocket.closed
113
114
async def wait_until_ready(self):
115
await self.is_ready.wait()
116
117
async def _connect(self):
118
"""Attempt to connect to Twitch's Websocket."""
119
self.is_ready.clear()
120
121
if self._keeper:
122
self._keeper.cancel() # Stop our current keep alive.
123
124
if self.is_alive:
125
await self._websocket.close() # If for some reason we are in a weird state, close it before retrying.
126
127
if not self._client._http.nick:
128
data = await self._client._http.validate(token=self._token)
129
self.nick = data["login"]
130
self.user_id = int(data["user_id"])
131
132
session = self._client._http.session
133
134
try:
135
self._websocket = await session.ws_connect(url=HOST, heartbeat=self._heartbeat)
136
except Exception as e:
137
retry = self._backoff.delay()
138
log.error(f"Websocket connection failure: {e}:: Attempting reconnect in {retry} seconds.")
139
140
await asyncio.sleep(retry)
141
return asyncio.create_task(self._connect())
142
143
if time.time() > self._last_ping + 240 or self._reconnect_requested:
144
# Re-authenticate as we have surpassed a PING request or Twitch issued a RECONNECT.
145
await self.authenticate(self._initial_channels)
146
self._reconnect_requested = False
147
148
self._keeper = asyncio.create_task(self._keep_alive()) # Create our keep alive.
149
self._ws_ready_event.set()
150
151
async def _keep_alive(self):
152
await self._ws_ready_event.wait()
153
self._ws_ready_event.clear()
154
155
if not self._last_ping:
156
self._last_ping = time.time()
157
158
while not self._websocket.closed:
159
msg = await self._websocket.receive() # Receive data...
160
161
if msg.type is aiohttp.WSMsgType.CLOSED:
162
log.error(f"Websocket connection was closed: {msg.extra}")
163
break
164
165
data = msg.data
166
if data:
167
log.debug(f" < {data}")
168
self.dispatch("raw_data", data) # Dispatch our event_raw_data event...
169
170
events = data.split("\r\n")
171
for event in events:
172
if not event:
173
continue
174
175
task = asyncio.create_task(self._process_data(event))
176
task.add_done_callback(partial(self._task_callback, event)) # Process our raw data
177
178
asyncio.create_task(self._connect())
179
180
def _task_callback(self, data, task):
181
exc = task.exception()
182
183
if isinstance(exc, AuthenticationError): # Check if we failed to log in...
184
log.error("Authentication error. Please check your credentials and try again.")
185
self._close()
186
elif exc:
187
asyncio.create_task(self.event_error(exc, data))
188
189
async def send(self, message: str):
190
message = message.strip()
191
log.debug(f" > {message}")
192
193
if message.startswith("PRIVMSG #"):
194
data = message.replace("PRIVMSG #", "", 1).split(" ")
195
channel = data.pop(0)
196
content = " ".join(data)
197
198
dummy = f"> :{self.nick}!{self.nick}@{self.nick}.tmi.twitch.tv PRIVMSG(ECHO) #{channel} {content}\r\n"
199
200
task = asyncio.create_task(self._process_data(dummy))
201
task.add_done_callback(partial(self._task_callback, dummy)) # Process our raw data
202
203
await self._websocket.send_str(message + "\r\n")
204
205
async def reply(self, msg_id: str, message: str):
206
message = message.strip()
207
log.debug(f" > {message}")
208
209
if message.startswith("PRIVMSG #"):
210
data = message.replace("PRIVMSG #", "", 1).split(" ")
211
channel = data.pop(0)
212
content = " ".join(data)
213
214
dummy = f"> @reply-parent-msg-id={msg_id} :{self.nick}!{self.nick}@{self.nick}.tmi.twitch.tv PRIVMSG(ECHO) #{channel} {content}\r\n"
215
task = asyncio.create_task(self._process_data(dummy))
216
task.add_done_callback(partial(self._task_callback, dummy)) # Process our raw data
217
218
await self._websocket.send_str(f"@reply-parent-msg-id={msg_id} {message} \r\n")
219
220
async def authenticate(self, channels: Union[list, tuple]):
221
"""|coro|
222
223
Automated Authentication process.
224
225
Attempts to authenticate on the Twitch servers with the provided
226
nickname and IRC Token (pass).
227
228
On successful authentication, an attempt to join the provided channels is made.
229
230
Parameters
231
------------
232
channels: Union[list, tuple]
233
A list or tuple of channels to attempt joining.
234
"""
235
if not self.is_alive:
236
return
237
238
await self.send(f"PASS oauth:{self._token}\r\n")
239
await self.send(f"NICK {self.nick}\r\n")
240
241
for cap in self.modes:
242
await self.send(f"CAP REQ :twitch.tv/{cap}") # Ideally no one should overwrite defaults...
243
244
if not channels and not self._initial_channels:
245
return
246
247
channels = channels or self._initial_channels
248
await self.join_channels(*channels)
249
250
async def join_channels(self, *channels: str):
251
"""|coro|
252
253
Attempt to join the provided channels.
254
255
Parameters
256
------------
257
*channels : str
258
An argument list of channels to attempt joining.
259
"""
260
async with self._join_lock: # acquire a lock, allowing only one join_channels at once...
261
for channel in channels:
262
if self._join_handle < time.time(): # Handle is less than the current time
263
self._join_tick = 20 # So lets start a new rate limit bucket..
264
self._join_handle = time.time() + 10 # Set the handle timeout time
265
266
if self._join_tick == 0: # We have exhausted the bucket, wait so we can make a new one...
267
await asyncio.sleep(self._join_handle - time.time())
268
continue
269
270
asyncio.create_task(self._join_channel(channel))
271
self._join_tick -= 1
272
273
async def _join_channel(self, entry):
274
channel = re.sub("[#]", "", entry).lower()
275
await self.send(f"JOIN #{channel}\r\n")
276
277
self._join_pending[channel] = fut = self._loop.create_future()
278
asyncio.create_task(self._join_future_handle(fut, channel))
279
280
async def _join_future_handle(self, fut: asyncio.Future, channel: str):
281
try:
282
await asyncio.wait_for(fut, timeout=10)
283
except asyncio.TimeoutError:
284
log.error(f'The channel "{channel}" was unable to be joined. Check the channel is valid.')
285
self._join_pending.pop(channel)
286
287
data = (
288
f":{self.nick}.tmi.twitch.tv 353 {self.nick} = #TWITCHIOFAILURE :{channel}\r\n"
289
f":{self.nick}.tmi.twitch.tv 366 {self.nick} #TWITCHIOFAILURE :End of /NAMES list"
290
)
291
292
await self._process_data(data)
293
294
async def _process_data(self, data: str):
295
data = data.rstrip()
296
parsed = parser(data, self.nick)
297
298
if parsed["action"] == "PING":
299
return await self._ping()
300
elif parsed["code"] != 0:
301
return await self._code(parsed, parsed["code"])
302
elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"):
303
log.error(
304
f'Login unsuccessful with token "{self._token}". ' f'Check your scopes for "chat_login" and try again.'
305
)
306
return await self._close()
307
308
partial_ = self._actions.get(parsed["action"])
309
if partial_:
310
await partial_(parsed)
311
312
async def _await_futures(self):
313
futures = self._fetch_futures()
314
315
for fut in futures:
316
try:
317
fut.exception()
318
except asyncio.InvalidStateError:
319
pass
320
321
if fut.done():
322
futures.remove(fut)
323
324
if futures:
325
await asyncio.wait(futures)
326
327
async def _code(self, parsed, code: int):
328
if code == 1:
329
log.info(f"Successfully logged onto Twitch WS: {self.nick}")
330
331
await self._await_futures()
332
await self.is_ready.wait()
333
self.dispatch("ready")
334
self._init = True
335
336
elif code == 353:
337
if parsed["channel"] == "TWITCHIOFAILURE":
338
self._initial_channels.remove(parsed["batches"][0])
339
340
if parsed["channel"] in self._initial_channels and not self._init:
341
self._join_load[parsed["channel"]] = None
342
343
if len(self._join_load) == len(self._initial_channels):
344
for channel in self._initial_channels:
345
self._join_load.pop(channel)
346
self._cache_add(parsed)
347
self.is_ready.set()
348
else:
349
self._cache_add(parsed)
350
elif code in {2, 3, 4, 366, 372, 375, 376}:
351
return
352
elif self.is_ready.is_set():
353
return
354
else:
355
self.is_ready.set()
356
# self.dispatch("ready")
357
358
async def _ping(self, _=None):
359
log.debug("ACTION: Sending PONG reply.")
360
self._last_ping = time.time()
361
await self.send("PONG :tmi.twitch.tv\r\n")
362
363
async def _part(self, parsed): # TODO
364
log.debug(f'ACTION: PART:: {parsed["channel"]}')
365
366
async def _privmsg(self, parsed): # TODO(Update Cache properly)
367
log.debug(f'ACTION: PRIVMSG:: {parsed["channel"]}')
368
369
if parsed["channel"] is None:
370
log.debug(f'ACTION: WHISPER:: {parsed["user"]}')
371
channel = None
372
user = WhisperChatter(websocket=self, name=parsed["user"])
373
else:
374
channel = Channel(name=parsed["channel"], websocket=self)
375
self._cache_add(parsed)
376
user = Chatter(
377
tags=parsed["badges"], name=parsed["user"], channel=channel, bot=self._client, websocket=self
378
)
379
380
message = Message(
381
raw_data=parsed["data"],
382
content=parsed["message"],
383
author=user,
384
channel=channel,
385
tags=parsed["badges"],
386
echo="echo" in parsed["action"],
387
)
388
389
self.dispatch("message", message)
390
391
async def _privmsg_echo(self, parsed):
392
log.debug(f'ACTION: PRIVMSG(ECHO):: {parsed["channel"]}')
393
394
channel = Channel(name=parsed["channel"], websocket=self)
395
message = Message(
396
raw_data=parsed["data"], content=parsed["message"], author=None, channel=channel, tags={}, echo=True
397
)
398
399
self.dispatch("message", message)
400
401
async def _userstate(self, parsed):
402
log.debug(f'ACTION: USERSTATE:: {parsed["channel"]}')
403
self._cache_add(parsed)
404
405
channel = Channel(name=parsed["channel"], websocket=self)
406
name = parsed["user"] or parsed["nick"]
407
user = Chatter(tags=parsed["badges"], name=name, channel=channel, bot=self._client, websocket=self)
408
409
self.dispatch("userstate", user)
410
411
async def _usernotice(self, parsed):
412
log.debug(f'ACTION: USERNOTICE:: {parsed["channel"]}')
413
414
channel = Channel(name=parsed["channel"], websocket=self)
415
rawData = parsed["groups"][0]
416
tags = dict(x.split("=") for x in rawData.split(";"))
417
418
self.dispatch("raw_usernotice", channel, tags)
419
420
async def _join(self, parsed):
421
log.debug(f'ACTION: JOIN:: {parsed["channel"]}')
422
channel = parsed["channel"]
423
424
if self._join_pending:
425
try:
426
self._join_pending[channel].set_result(None)
427
except KeyError:
428
pass
429
else:
430
self._join_pending.pop(channel)
431
432
if parsed["user"] != self._client.nick:
433
self._cache_add(parsed)
434
435
channel = Channel(name=channel, websocket=self)
436
user = Chatter(name=parsed["user"], bot=self._client, websocket=self, channel=channel, tags=parsed["badges"])
437
438
self.dispatch("join", channel, user)
439
440
def _cache_add(self, parsed: dict):
441
channel = parsed["channel"].lstrip("#")
442
443
if channel not in self._cache:
444
self._cache[channel] = set()
445
446
channel_ = Channel(name=channel, websocket=self)
447
448
if parsed["batches"]:
449
for u in parsed["batches"]:
450
user = PartialChatter(name=u, bot=self._client, websocket=self, channel=channel_)
451
self._cache[channel].add(user)
452
else:
453
name = parsed["user"] or parsed["nick"]
454
user = Chatter(bot=self._client, name=name, websocket=self, channel=channel_, tags=parsed["badges"])
455
self._cache[channel].discard(user)
456
self._cache[channel].add(user)
457
458
async def _mode(self, parsed): # TODO
459
pass
460
461
async def _reconnect(self, parsed):
462
log.debug("ACTION: RECONNECT:: Twitch has gracefully closed the connection and will reconnect.")
463
self._reconnect_requested = True
464
465
def dispatch(self, event: str, *args, **kwargs):
466
log.debug(f"Dispatching event: {event}")
467
468
self._client.run_event(event, *args, **kwargs)
469
470
async def event_error(self, error: Exception, data: str = None):
471
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
472
473
def _fetch_futures(self):
474
return [
475
fut
476
for chan, fut in self._join_pending.items()
477
if chan.lower() in [re.sub("[#]", "", c).lower() for c in self._initial_channels]
478
]
479
480
async def _close(self):
481
self._keeper.cancel()
482
self.is_ready.clear()
483
484
futures = self._fetch_futures()
485
486
for fut in futures:
487
fut.cancel()
488
489
if self._websocket:
490
await self._websocket.close()
491
if self._client._http.session:
492
await self._client._http.session.close()
493
self._loop.stop()
494
495