Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_invest-bot-main/blog/blog_worker.py
5925 views
1
import asyncio
2
import logging
3
4
from configuration.settings import BlogSettings
5
from tg_api.telegram_service import TelegramService
6
7
__all__ = ("BlogWorker")
8
9
logger = logging.getLogger(__name__)
10
11
12
class BlogWorker:
13
"""
14
Class is represent worker (coroutine) for asyncio task.
15
Checks available messages in queue and sends they asynchronously.
16
Telegram API works pretty long to use it synchronously.
17
"""
18
def __init__(
19
self,
20
blog_settings: BlogSettings,
21
messages_queue: asyncio.Queue
22
) -> None:
23
self.__messages_queue = messages_queue
24
self.__tg_status = False
25
26
self.__init_tg(blog_settings.bot_token, blog_settings.chat_id)
27
28
def __init_tg(self, token: str, chat_id: str) -> None:
29
"""
30
Init TG API class.
31
TG is optional and can be skipped in case of issue.
32
"""
33
try:
34
self.__telegram_service = TelegramService(
35
token=token,
36
chat_id=chat_id
37
)
38
self.__tg_status = True
39
except Exception as ex:
40
logger.error(f"Error init tg service {repr(ex)}")
41
# Any errors with TG aren't important. Continue trading is important.
42
self.__tg_status = False
43
44
async def worker(self) -> None:
45
while True:
46
try:
47
message = await self.__messages_queue.get()
48
logger.debug(f"Get message form queue (size: {self.__messages_queue.qsize()}): {message}")
49
50
if self.__tg_status:
51
await self.__telegram_service.send_text_message(message)
52
53
except Exception as ex:
54
logger.error(f"TG messages worker error: {repr(ex)}")
55
56