Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/notifiers/discord.py
1494 views
1
import asyncio
2
import datetime
3
import logging
4
from queue import Empty
5
6
import discord
7
from discord.ext import commands, tasks
8
9
from tgtg_scanner.errors import DiscordConfigurationError, MaskConfigurationError
10
from tgtg_scanner.models import Config, Favorites, Item, Reservations
11
from tgtg_scanner.models.reservations import Reservation
12
from tgtg_scanner.notifiers.base import Notifier
13
14
log = logging.getLogger("tgtg")
15
16
discord.VoiceClient.warn_nacl = False
17
18
19
class Discord(Notifier):
20
"""Notifier for Discord."""
21
22
def __init__(self, config: Config, reservations: Reservations, favorites: Favorites):
23
super().__init__(config, reservations, favorites)
24
self.enabled = config.discord.enabled
25
self.prefix = config.discord.prefix
26
self.token = config.discord.token
27
self.channel = config.discord.channel
28
self.body = config.discord.body
29
self.disable_commands = config.discord.disable_commands
30
self.cron = config.discord.cron
31
self.mute: datetime.datetime | None = None
32
self.bot_id = None
33
self.channel_id = None
34
self.server_id = None
35
36
if self.enabled:
37
if self.token is None or self.channel == 0:
38
raise DiscordConfigurationError()
39
try:
40
Item.check_mask(self.body)
41
except MaskConfigurationError as exc:
42
raise DiscordConfigurationError(exc.message) from exc
43
self.bot = commands.Bot(command_prefix=self.prefix, intents=discord.Intents.all())
44
try:
45
# Setting event loop explicitly for python 3.9 compatibility
46
loop = asyncio.new_event_loop()
47
asyncio.set_event_loop(loop)
48
asyncio.run(self.bot.login(self.token))
49
asyncio.run(self.bot.close())
50
except MaskConfigurationError as exc:
51
raise DiscordConfigurationError(exc.message) from exc
52
53
async def _send(self, item: Item | Reservation) -> None: # type: ignore[override]
54
"""Sends item information using Discord bot."""
55
if self.mute and self.mute > datetime.datetime.now():
56
return
57
if self.mute:
58
log.info("Reactivated Discord Notifications")
59
self.mute = None
60
if isinstance(item, Item):
61
message = item.unmask(self.body)
62
self.bot.dispatch("send_notification", message)
63
64
@tasks.loop(seconds=1)
65
async def _listen_for_items(self):
66
"""Method for polling notifications every second."""
67
try:
68
item = self.queue.get(block=False)
69
if item is None:
70
self.bot.dispatch("close")
71
return
72
log.debug("Sending %s Notification", self.name)
73
await self._send(item)
74
except Empty:
75
pass
76
except Exception as exc:
77
log.error("Failed sending %s: %s", self.name, exc)
78
79
def _run(self):
80
self.config.set_locale()
81
# Setting event loop explicitly for python 3.9 compatibility
82
loop = asyncio.new_event_loop()
83
asyncio.set_event_loop(loop)
84
self.bot = commands.Bot(command_prefix=self.prefix, intents=discord.Intents.all())
85
# Events include methods for post-init, shutting down, and notification sending
86
self._setup_events()
87
if not self.disable_commands:
88
# Commands are handled separately, in case commands are not enabled
89
self._setup_commands()
90
asyncio.run(self._start_bot())
91
92
async def _start_bot(self):
93
async with self.bot:
94
await self.bot.start(self.token)
95
96
def _setup_events(self):
97
@self.bot.event
98
async def on_ready():
99
"""Callback after successful login (only explicitly used in test_notifiers.py)."""
100
self.bot_id = self.bot.user.id
101
self.channel_id = self.channel
102
self.server_id = self.bot.guilds[0].id if len(self.bot.guilds) > 0 else 0
103
self._listen_for_items.start()
104
105
@self.bot.event
106
async def on_send_notification(message):
107
"""Callback for item notification."""
108
channel = self.bot.get_channel(self.channel) or await self.bot.fetch_channel(self.channel)
109
if channel:
110
await channel.send(message)
111
112
@self.bot.event
113
async def on_close():
114
"""Logout from Discord (only explicitly used in test_notifiers.py)."""
115
await self.bot.close()
116
117
def _setup_commands(self):
118
@self.bot.command(name="mute")
119
async def _mute(ctx, *args):
120
"""Deactivates Discord Notifications for x days."""
121
days = int(args[0]) if len(args) > 0 and args[0].isnumeric() else 1
122
self.mute = datetime.datetime.now() + datetime.timedelta(days=days)
123
log.info("Deactivated Discord Notifications for %s day(s)", days)
124
log.info("Reactivation at %s", self.mute)
125
await ctx.send(
126
f"Deactivated Discord notifications for {days} days.\nReactivating at {self.mute} or use `{self.prefix}unmute`.",
127
)
128
129
@self.bot.command(name="unmute")
130
async def _unmute(ctx):
131
"""Reactivate Discord notifications."""
132
self.mute = None
133
log.info("Reactivated Discord notifications")
134
await ctx.send("Reactivated Discord notifications")
135
136
@self.bot.command(name="listfavorites")
137
async def _list_favorites(ctx):
138
"""List favorites using display name."""
139
favorites = self.favorites.get_favorites()
140
if not favorites:
141
await ctx.send("You currently don't have any favorites.")
142
else:
143
await ctx.send("\n".join([f"• {item.item_id} - {item.display_name}" for item in favorites]))
144
145
@self.bot.command(name="listfavoriteids")
146
async def _list_favorite_ids(ctx):
147
"""List favorites using id."""
148
favorites = self.favorites.get_favorites()
149
if not favorites:
150
await ctx.send("You currently don't have any favorites.")
151
else:
152
await ctx.send(" ".join([item.item_id for item in favorites]))
153
154
@self.bot.command(name="addfavorites")
155
async def _add_favorites(ctx, *args):
156
"""Add favorite(s)."""
157
item_ids = list(
158
filter(
159
lambda x: x.isdigit() and int(x) != 0,
160
map(
161
str.strip,
162
[split_args for arg in args for split_args in arg.split(",")],
163
),
164
),
165
)
166
if not item_ids:
167
await ctx.channel.send(
168
"Please supply item ids in one of the following ways: "
169
f"'{self.prefix}addfavorites 12345 23456 34567' or "
170
f"'{self.prefix}addfavorites 12345,23456,34567'",
171
)
172
return
173
174
self.favorites.add_favorites(item_ids)
175
await ctx.send(f"Added the following item ids to favorites: {' '.join(item_ids)}")
176
log.debug('Added the following item ids to favorites: "%s"', item_ids)
177
178
@self.bot.command(name="removefavorites")
179
async def _remove_favorites(ctx, *args):
180
"""Remove favorite(s)."""
181
item_ids = list(
182
filter(
183
lambda x: x.isdigit() and int(x) != 0,
184
map(
185
str.strip,
186
[split_args for arg in args for split_args in arg.split(",")],
187
),
188
),
189
)
190
if not item_ids:
191
await ctx.channel.send(
192
"Please supply item ids in one of the following ways: "
193
f"'{self.prefix}removefavorites 12345 23456 34567' or "
194
f"'{self.prefix}removefavorites 12345,23456,34567'",
195
)
196
return
197
198
self.favorites.remove_favorite(item_ids)
199
await ctx.send(f"Removed the following item ids from favorites: {' '.join(item_ids)}")
200
log.debug('Removed the following item ids from favorites: "%s"', item_ids)
201
202
@self.bot.command(name="gettoken")
203
async def _get_token(ctx):
204
"""Display token used to login (without needing to manually check in config.ini)."""
205
await ctx.send(f"Token in use: {self.token}")
206
207
@self.bot.command(name="getinfo")
208
async def _get_info(ctx):
209
"""Display basic info about connection."""
210
bot_id = ctx.me.id
211
bot_name = ctx.me.display_name
212
bot_mention = ctx.me.mention
213
joined_at = ctx.me.joined_at
214
channel_id = ctx.channel.id
215
channel_name = ctx.channel.name
216
guild_id = ctx.guild.id
217
guild_name = ctx.guild.name
218
219
response = (
220
f"Hi! I'm {bot_mention}, the TGTG Bot on this server. I joined at {joined_at}\n"
221
f"```Bot (ID): {bot_name} ({bot_id})\n"
222
f"Channel (ID): {channel_name} ({channel_id})\n"
223
f"Server (ID): {guild_name} ({guild_id})```"
224
)
225
226
await ctx.send(response)
227
228
def __repr__(self) -> str:
229
return f"Discord: Channel ID {self.channel}"
230
231