Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/locks.py
7762 views
1
import asyncio
2
import collections
3
from typing import Any, Deque, Optional
4
5
6
class EventResultOrError:
7
"""Event asyncio lock helper class.
8
9
Wraps the Event asyncio lock allowing either to awake the
10
locked Tasks without any error or raising an exception.
11
12
thanks to @vorpalsmith for the simple design.
13
"""
14
15
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
16
self._loop = loop
17
self._exc = None # type: Optional[BaseException]
18
self._event = asyncio.Event()
19
self._waiters = collections.deque() # type: Deque[asyncio.Future[Any]]
20
21
def set(self, exc: Optional[BaseException] = None) -> None:
22
self._exc = exc
23
self._event.set()
24
25
async def wait(self) -> Any:
26
waiter = self._loop.create_task(self._event.wait())
27
self._waiters.append(waiter)
28
try:
29
val = await waiter
30
finally:
31
self._waiters.remove(waiter)
32
33
if self._exc is not None:
34
raise self._exc
35
36
return val
37
38
def cancel(self) -> None:
39
"""Cancel all waiters"""
40
for waiter in self._waiters:
41
waiter.cancel()
42
43