Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/locks.py
7762 views
import asyncio1import collections2from typing import Any, Deque, Optional345class EventResultOrError:6"""Event asyncio lock helper class.78Wraps the Event asyncio lock allowing either to awake the9locked Tasks without any error or raising an exception.1011thanks to @vorpalsmith for the simple design.12"""1314def __init__(self, loop: asyncio.AbstractEventLoop) -> None:15self._loop = loop16self._exc = None # type: Optional[BaseException]17self._event = asyncio.Event()18self._waiters = collections.deque() # type: Deque[asyncio.Future[Any]]1920def set(self, exc: Optional[BaseException] = None) -> None:21self._exc = exc22self._event.set()2324async def wait(self) -> Any:25waiter = self._loop.create_task(self._event.wait())26self._waiters.append(waiter)27try:28val = await waiter29finally:30self._waiters.remove(waiter)3132if self._exc is not None:33raise self._exc3435return val3637def cancel(self) -> None:38"""Cancel all waiters"""39for waiter in self._waiters:40waiter.cancel()414243