Path: blob/master/ invest-robot-contest_invest-bot-main/blog/blog_worker.py
5925 views
import asyncio1import logging23from configuration.settings import BlogSettings4from tg_api.telegram_service import TelegramService56__all__ = ("BlogWorker")78logger = logging.getLogger(__name__)91011class BlogWorker:12"""13Class is represent worker (coroutine) for asyncio task.14Checks available messages in queue and sends they asynchronously.15Telegram API works pretty long to use it synchronously.16"""17def __init__(18self,19blog_settings: BlogSettings,20messages_queue: asyncio.Queue21) -> None:22self.__messages_queue = messages_queue23self.__tg_status = False2425self.__init_tg(blog_settings.bot_token, blog_settings.chat_id)2627def __init_tg(self, token: str, chat_id: str) -> None:28"""29Init TG API class.30TG is optional and can be skipped in case of issue.31"""32try:33self.__telegram_service = TelegramService(34token=token,35chat_id=chat_id36)37self.__tg_status = True38except Exception as ex:39logger.error(f"Error init tg service {repr(ex)}")40# Any errors with TG aren't important. Continue trading is important.41self.__tg_status = False4243async def worker(self) -> None:44while True:45try:46message = await self.__messages_queue.get()47logger.debug(f"Get message form queue (size: {self.__messages_queue.qsize()}): {message}")4849if self.__tg_status:50await self.__telegram_service.send_text_message(message)5152except Exception as ex:53logger.error(f"TG messages worker error: {repr(ex)}")545556