Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/twitchio/websocket.py
7774 views
# -*- coding: utf-8 -*-12"""3The MIT License (MIT)45Copyright (c) 2017-2021 TwitchIO67Permission is hereby granted, free of charge, to any person obtaining a8copy of this software and associated documentation files (the "Software"),9to deal in the Software without restriction, including without limitation10the rights to use, copy, modify, merge, publish, distribute, sublicense,11and/or sell copies of the Software, and to permit persons to whom the12Software is furnished to do so, subject to the following conditions:1314The above copyright notice and this permission notice shall be included in15all copies or substantial portions of the Software.1617THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS18OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,19FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE20AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER21LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING22FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER23DEALINGS IN THE SOFTWARE.24"""2526import asyncio27import logging28import re29import sys30import time31import traceback32from functools import partial33from typing import Union, Optional, List, TYPE_CHECKING3435import aiohttp3637from .backoff import ExponentialBackoff38from .channel import Channel39from .errors import AuthenticationError40from .message import Message41from .parse import parser42from .chatter import Chatter, PartialChatter, WhisperChatter4344if TYPE_CHECKING:45from .client import Client464748log = logging.getLogger(__name__)49HOST = "wss://irc-ws.chat.twitch.tv:443"505152class WSConnection:53def __init__(54self,55*,56loop: asyncio.AbstractEventLoop,57heartbeat: Optional[float],58client: "Client",59token: str = None,60modes: tuple = None,61initial_channels: List[str] = None,62):63self._loop = loop64self._backoff = ExponentialBackoff()65self._keeper: Optional[asyncio.Task] = None66self._websocket = None67self._heartbeat = heartbeat68self._ws_ready_event: asyncio.Event = asyncio.Event()69self.is_ready: asyncio.Event = asyncio.Event()70self._join_lock: asyncio.Lock = asyncio.Lock()71self._join_handle = 072self._join_tick = 2073self._join_pending = {}74self._join_load = {}75self._init = False7677self._cache = {}78self._actions = {79"PING": self._ping,80"PART": self._part,81"PRIVMSG": self._privmsg,82"PRIVMSG(ECHO)": self._privmsg_echo,83"USERSTATE": self._userstate,84"USERNOTICE": self._usernotice,85"JOIN": self._join,86"MODE": self._mode,87"RECONNECT": self._reconnect,88"WHISPER": self._privmsg,89}9091self.nick = None92self.user_id = None93self._token = token94self.modes = modes or ("commands", "tags", "membership")95self._initial_channels = initial_channels or []9697if callable(self._initial_channels):98_temp_initial_channels = self._initial_channels()99if isinstance(_temp_initial_channels, (list, tuple)):100self._initial_channels = _temp_initial_channels101else:102self._initial_channels = [_temp_initial_channels]103104self._last_ping = 0105self._reconnect_requested = False106107self._client = client108109@property110def is_alive(self) -> bool:111return self._websocket is not None and not self._websocket.closed112113async def wait_until_ready(self):114await self.is_ready.wait()115116async def _connect(self):117"""Attempt to connect to Twitch's Websocket."""118self.is_ready.clear()119120if self._keeper:121self._keeper.cancel() # Stop our current keep alive.122123if self.is_alive:124await self._websocket.close() # If for some reason we are in a weird state, close it before retrying.125126if not self._client._http.nick:127data = await self._client._http.validate(token=self._token)128self.nick = data["login"]129self.user_id = int(data["user_id"])130131session = self._client._http.session132133try:134self._websocket = await session.ws_connect(url=HOST, heartbeat=self._heartbeat)135except Exception as e:136retry = self._backoff.delay()137log.error(f"Websocket connection failure: {e}:: Attempting reconnect in {retry} seconds.")138139await asyncio.sleep(retry)140return asyncio.create_task(self._connect())141142if time.time() > self._last_ping + 240 or self._reconnect_requested:143# Re-authenticate as we have surpassed a PING request or Twitch issued a RECONNECT.144await self.authenticate(self._initial_channels)145self._reconnect_requested = False146147self._keeper = asyncio.create_task(self._keep_alive()) # Create our keep alive.148self._ws_ready_event.set()149150async def _keep_alive(self):151await self._ws_ready_event.wait()152self._ws_ready_event.clear()153154if not self._last_ping:155self._last_ping = time.time()156157while not self._websocket.closed:158msg = await self._websocket.receive() # Receive data...159160if msg.type is aiohttp.WSMsgType.CLOSED:161log.error(f"Websocket connection was closed: {msg.extra}")162break163164data = msg.data165if data:166log.debug(f" < {data}")167self.dispatch("raw_data", data) # Dispatch our event_raw_data event...168169events = data.split("\r\n")170for event in events:171if not event:172continue173174task = asyncio.create_task(self._process_data(event))175task.add_done_callback(partial(self._task_callback, event)) # Process our raw data176177asyncio.create_task(self._connect())178179def _task_callback(self, data, task):180exc = task.exception()181182if isinstance(exc, AuthenticationError): # Check if we failed to log in...183log.error("Authentication error. Please check your credentials and try again.")184self._close()185elif exc:186asyncio.create_task(self.event_error(exc, data))187188async def send(self, message: str):189message = message.strip()190log.debug(f" > {message}")191192if message.startswith("PRIVMSG #"):193data = message.replace("PRIVMSG #", "", 1).split(" ")194channel = data.pop(0)195content = " ".join(data)196197dummy = f"> :{self.nick}!{self.nick}@{self.nick}.tmi.twitch.tv PRIVMSG(ECHO) #{channel} {content}\r\n"198199task = asyncio.create_task(self._process_data(dummy))200task.add_done_callback(partial(self._task_callback, dummy)) # Process our raw data201202await self._websocket.send_str(message + "\r\n")203204async def reply(self, msg_id: str, message: str):205message = message.strip()206log.debug(f" > {message}")207208if message.startswith("PRIVMSG #"):209data = message.replace("PRIVMSG #", "", 1).split(" ")210channel = data.pop(0)211content = " ".join(data)212213dummy = f"> @reply-parent-msg-id={msg_id} :{self.nick}!{self.nick}@{self.nick}.tmi.twitch.tv PRIVMSG(ECHO) #{channel} {content}\r\n"214task = asyncio.create_task(self._process_data(dummy))215task.add_done_callback(partial(self._task_callback, dummy)) # Process our raw data216217await self._websocket.send_str(f"@reply-parent-msg-id={msg_id} {message} \r\n")218219async def authenticate(self, channels: Union[list, tuple]):220"""|coro|221222Automated Authentication process.223224Attempts to authenticate on the Twitch servers with the provided225nickname and IRC Token (pass).226227On successful authentication, an attempt to join the provided channels is made.228229Parameters230------------231channels: Union[list, tuple]232A list or tuple of channels to attempt joining.233"""234if not self.is_alive:235return236237await self.send(f"PASS oauth:{self._token}\r\n")238await self.send(f"NICK {self.nick}\r\n")239240for cap in self.modes:241await self.send(f"CAP REQ :twitch.tv/{cap}") # Ideally no one should overwrite defaults...242243if not channels and not self._initial_channels:244return245246channels = channels or self._initial_channels247await self.join_channels(*channels)248249async def join_channels(self, *channels: str):250"""|coro|251252Attempt to join the provided channels.253254Parameters255------------256*channels : str257An argument list of channels to attempt joining.258"""259async with self._join_lock: # acquire a lock, allowing only one join_channels at once...260for channel in channels:261if self._join_handle < time.time(): # Handle is less than the current time262self._join_tick = 20 # So lets start a new rate limit bucket..263self._join_handle = time.time() + 10 # Set the handle timeout time264265if self._join_tick == 0: # We have exhausted the bucket, wait so we can make a new one...266await asyncio.sleep(self._join_handle - time.time())267continue268269asyncio.create_task(self._join_channel(channel))270self._join_tick -= 1271272async def _join_channel(self, entry):273channel = re.sub("[#]", "", entry).lower()274await self.send(f"JOIN #{channel}\r\n")275276self._join_pending[channel] = fut = self._loop.create_future()277asyncio.create_task(self._join_future_handle(fut, channel))278279async def _join_future_handle(self, fut: asyncio.Future, channel: str):280try:281await asyncio.wait_for(fut, timeout=10)282except asyncio.TimeoutError:283log.error(f'The channel "{channel}" was unable to be joined. Check the channel is valid.')284self._join_pending.pop(channel)285286data = (287f":{self.nick}.tmi.twitch.tv 353 {self.nick} = #TWITCHIOFAILURE :{channel}\r\n"288f":{self.nick}.tmi.twitch.tv 366 {self.nick} #TWITCHIOFAILURE :End of /NAMES list"289)290291await self._process_data(data)292293async def _process_data(self, data: str):294data = data.rstrip()295parsed = parser(data, self.nick)296297if parsed["action"] == "PING":298return await self._ping()299elif parsed["code"] != 0:300return await self._code(parsed, parsed["code"])301elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"):302log.error(303f'Login unsuccessful with token "{self._token}". ' f'Check your scopes for "chat_login" and try again.'304)305return await self._close()306307partial_ = self._actions.get(parsed["action"])308if partial_:309await partial_(parsed)310311async def _await_futures(self):312futures = self._fetch_futures()313314for fut in futures:315try:316fut.exception()317except asyncio.InvalidStateError:318pass319320if fut.done():321futures.remove(fut)322323if futures:324await asyncio.wait(futures)325326async def _code(self, parsed, code: int):327if code == 1:328log.info(f"Successfully logged onto Twitch WS: {self.nick}")329330await self._await_futures()331await self.is_ready.wait()332self.dispatch("ready")333self._init = True334335elif code == 353:336if parsed["channel"] == "TWITCHIOFAILURE":337self._initial_channels.remove(parsed["batches"][0])338339if parsed["channel"] in self._initial_channels and not self._init:340self._join_load[parsed["channel"]] = None341342if len(self._join_load) == len(self._initial_channels):343for channel in self._initial_channels:344self._join_load.pop(channel)345self._cache_add(parsed)346self.is_ready.set()347else:348self._cache_add(parsed)349elif code in {2, 3, 4, 366, 372, 375, 376}:350return351elif self.is_ready.is_set():352return353else:354self.is_ready.set()355# self.dispatch("ready")356357async def _ping(self, _=None):358log.debug("ACTION: Sending PONG reply.")359self._last_ping = time.time()360await self.send("PONG :tmi.twitch.tv\r\n")361362async def _part(self, parsed): # TODO363log.debug(f'ACTION: PART:: {parsed["channel"]}')364365async def _privmsg(self, parsed): # TODO(Update Cache properly)366log.debug(f'ACTION: PRIVMSG:: {parsed["channel"]}')367368if parsed["channel"] is None:369log.debug(f'ACTION: WHISPER:: {parsed["user"]}')370channel = None371user = WhisperChatter(websocket=self, name=parsed["user"])372else:373channel = Channel(name=parsed["channel"], websocket=self)374self._cache_add(parsed)375user = Chatter(376tags=parsed["badges"], name=parsed["user"], channel=channel, bot=self._client, websocket=self377)378379message = Message(380raw_data=parsed["data"],381content=parsed["message"],382author=user,383channel=channel,384tags=parsed["badges"],385echo="echo" in parsed["action"],386)387388self.dispatch("message", message)389390async def _privmsg_echo(self, parsed):391log.debug(f'ACTION: PRIVMSG(ECHO):: {parsed["channel"]}')392393channel = Channel(name=parsed["channel"], websocket=self)394message = Message(395raw_data=parsed["data"], content=parsed["message"], author=None, channel=channel, tags={}, echo=True396)397398self.dispatch("message", message)399400async def _userstate(self, parsed):401log.debug(f'ACTION: USERSTATE:: {parsed["channel"]}')402self._cache_add(parsed)403404channel = Channel(name=parsed["channel"], websocket=self)405name = parsed["user"] or parsed["nick"]406user = Chatter(tags=parsed["badges"], name=name, channel=channel, bot=self._client, websocket=self)407408self.dispatch("userstate", user)409410async def _usernotice(self, parsed):411log.debug(f'ACTION: USERNOTICE:: {parsed["channel"]}')412413channel = Channel(name=parsed["channel"], websocket=self)414rawData = parsed["groups"][0]415tags = dict(x.split("=") for x in rawData.split(";"))416417self.dispatch("raw_usernotice", channel, tags)418419async def _join(self, parsed):420log.debug(f'ACTION: JOIN:: {parsed["channel"]}')421channel = parsed["channel"]422423if self._join_pending:424try:425self._join_pending[channel].set_result(None)426except KeyError:427pass428else:429self._join_pending.pop(channel)430431if parsed["user"] != self._client.nick:432self._cache_add(parsed)433434channel = Channel(name=channel, websocket=self)435user = Chatter(name=parsed["user"], bot=self._client, websocket=self, channel=channel, tags=parsed["badges"])436437self.dispatch("join", channel, user)438439def _cache_add(self, parsed: dict):440channel = parsed["channel"].lstrip("#")441442if channel not in self._cache:443self._cache[channel] = set()444445channel_ = Channel(name=channel, websocket=self)446447if parsed["batches"]:448for u in parsed["batches"]:449user = PartialChatter(name=u, bot=self._client, websocket=self, channel=channel_)450self._cache[channel].add(user)451else:452name = parsed["user"] or parsed["nick"]453user = Chatter(bot=self._client, name=name, websocket=self, channel=channel_, tags=parsed["badges"])454self._cache[channel].discard(user)455self._cache[channel].add(user)456457async def _mode(self, parsed): # TODO458pass459460async def _reconnect(self, parsed):461log.debug("ACTION: RECONNECT:: Twitch has gracefully closed the connection and will reconnect.")462self._reconnect_requested = True463464def dispatch(self, event: str, *args, **kwargs):465log.debug(f"Dispatching event: {event}")466467self._client.run_event(event, *args, **kwargs)468469async def event_error(self, error: Exception, data: str = None):470traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)471472def _fetch_futures(self):473return [474fut475for chan, fut in self._join_pending.items()476if chan.lower() in [re.sub("[#]", "", c).lower() for c in self._initial_channels]477]478479async def _close(self):480self._keeper.cancel()481self.is_ready.clear()482483futures = self._fetch_futures()484485for fut in futures:486fut.cancel()487488if self._websocket:489await self._websocket.close()490if self._client._http.session:491await self._client._http.session.close()492self._loop.stop()493494495