Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/test_utils.py
7757 views
"""Utilities shared by tests."""12import asyncio3import contextlib4import gc5import inspect6import ipaddress7import os8import socket9import sys10import warnings11from abc import ABC, abstractmethod12from types import TracebackType13from typing import (14TYPE_CHECKING,15Any,16Callable,17Iterator,18List,19Optional,20Type,21Union,22cast,23)24from unittest import mock2526from aiosignal import Signal27from multidict import CIMultiDict, CIMultiDictProxy28from yarl import URL2930import aiohttp31from aiohttp.client import _RequestContextManager, _WSRequestContextManager3233from . import ClientSession, hdrs34from .abc import AbstractCookieJar35from .client_reqrep import ClientResponse36from .client_ws import ClientWebSocketResponse37from .helpers import PY_38, sentinel38from .http import HttpVersion, RawRequestMessage39from .web import (40Application,41AppRunner,42BaseRunner,43Request,44Server,45ServerRunner,46SockSite,47UrlMappingMatchInfo,48)49from .web_protocol import _RequestHandler5051if TYPE_CHECKING: # pragma: no cover52from ssl import SSLContext53else:54SSLContext = None5556if PY_38:57from unittest import IsolatedAsyncioTestCase as TestCase58else:59from asynctest import TestCase # type: ignore[no-redef]6061REUSE_ADDRESS = os.name == "posix" and sys.platform != "cygwin"626364def get_unused_port_socket(65host: str, family: socket.AddressFamily = socket.AF_INET66) -> socket.socket:67return get_port_socket(host, 0, family)686970def get_port_socket(71host: str, port: int, family: socket.AddressFamily72) -> socket.socket:73s = socket.socket(family, socket.SOCK_STREAM)74if REUSE_ADDRESS:75# Windows has different semantics for SO_REUSEADDR,76# so don't set it. Ref:77# https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse78s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)79s.bind((host, port))80return s818283def unused_port() -> int:84"""Return a port that is unused on the current host."""85with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:86s.bind(("127.0.0.1", 0))87return cast(int, s.getsockname()[1])888990class BaseTestServer(ABC):91__test__ = False9293def __init__(94self,95*,96scheme: Union[str, object] = sentinel,97loop: Optional[asyncio.AbstractEventLoop] = None,98host: str = "127.0.0.1",99port: Optional[int] = None,100skip_url_asserts: bool = False,101socket_factory: Callable[102[str, int, socket.AddressFamily], socket.socket103] = get_port_socket,104**kwargs: Any,105) -> None:106self._loop = loop107self.runner = None # type: Optional[BaseRunner]108self._root = None # type: Optional[URL]109self.host = host110self.port = port111self._closed = False112self.scheme = scheme113self.skip_url_asserts = skip_url_asserts114self.socket_factory = socket_factory115116async def start_server(117self, loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any118) -> None:119if self.runner:120return121self._loop = loop122self._ssl = kwargs.pop("ssl", None)123self.runner = await self._make_runner(**kwargs)124await self.runner.setup()125if not self.port:126self.port = 0127try:128version = ipaddress.ip_address(self.host).version129except ValueError:130version = 4131family = socket.AF_INET6 if version == 6 else socket.AF_INET132_sock = self.socket_factory(self.host, self.port, family)133self.host, self.port = _sock.getsockname()[:2]134site = SockSite(self.runner, sock=_sock, ssl_context=self._ssl)135await site.start()136server = site._server137assert server is not None138sockets = server.sockets139assert sockets is not None140self.port = sockets[0].getsockname()[1]141if self.scheme is sentinel:142if self._ssl:143scheme = "https"144else:145scheme = "http"146self.scheme = scheme147self._root = URL(f"{self.scheme}://{self.host}:{self.port}")148149@abstractmethod # pragma: no cover150async def _make_runner(self, **kwargs: Any) -> BaseRunner:151pass152153def make_url(self, path: str) -> URL:154assert self._root is not None155url = URL(path)156if not self.skip_url_asserts:157assert not url.is_absolute()158return self._root.join(url)159else:160return URL(str(self._root) + path)161162@property163def started(self) -> bool:164return self.runner is not None165166@property167def closed(self) -> bool:168return self._closed169170@property171def handler(self) -> Server:172# for backward compatibility173# web.Server instance174runner = self.runner175assert runner is not None176assert runner.server is not None177return runner.server178179async def close(self) -> None:180"""Close all fixtures created by the test client.181182After that point, the TestClient is no longer usable.183184This is an idempotent function: running close multiple times185will not have any additional effects.186187close is also run when the object is garbage collected, and on188exit when used as a context manager.189190"""191if self.started and not self.closed:192assert self.runner is not None193await self.runner.cleanup()194self._root = None195self.port = None196self._closed = True197198def __enter__(self) -> None:199raise TypeError("Use async with instead")200201def __exit__(202self,203exc_type: Optional[Type[BaseException]],204exc_value: Optional[BaseException],205traceback: Optional[TracebackType],206) -> None:207# __exit__ should exist in pair with __enter__ but never executed208pass # pragma: no cover209210async def __aenter__(self) -> "BaseTestServer":211await self.start_server(loop=self._loop)212return self213214async def __aexit__(215self,216exc_type: Optional[Type[BaseException]],217exc_value: Optional[BaseException],218traceback: Optional[TracebackType],219) -> None:220await self.close()221222223class TestServer(BaseTestServer):224def __init__(225self,226app: Application,227*,228scheme: Union[str, object] = sentinel,229host: str = "127.0.0.1",230port: Optional[int] = None,231**kwargs: Any,232):233self.app = app234super().__init__(scheme=scheme, host=host, port=port, **kwargs)235236async def _make_runner(self, **kwargs: Any) -> BaseRunner:237return AppRunner(self.app, **kwargs)238239240class RawTestServer(BaseTestServer):241def __init__(242self,243handler: _RequestHandler,244*,245scheme: Union[str, object] = sentinel,246host: str = "127.0.0.1",247port: Optional[int] = None,248**kwargs: Any,249) -> None:250self._handler = handler251super().__init__(scheme=scheme, host=host, port=port, **kwargs)252253async def _make_runner(self, debug: bool = True, **kwargs: Any) -> ServerRunner:254srv = Server(self._handler, loop=self._loop, debug=debug, **kwargs)255return ServerRunner(srv, debug=debug, **kwargs)256257258class TestClient:259"""260A test client implementation.261262To write functional tests for aiohttp based servers.263264"""265266__test__ = False267268def __init__(269self,270server: BaseTestServer,271*,272cookie_jar: Optional[AbstractCookieJar] = None,273loop: Optional[asyncio.AbstractEventLoop] = None,274**kwargs: Any,275) -> None:276if not isinstance(server, BaseTestServer):277raise TypeError(278"server must be TestServer " "instance, found type: %r" % type(server)279)280self._server = server281self._loop = loop282if cookie_jar is None:283cookie_jar = aiohttp.CookieJar(unsafe=True, loop=loop)284self._session = ClientSession(loop=loop, cookie_jar=cookie_jar, **kwargs)285self._closed = False286self._responses = [] # type: List[ClientResponse]287self._websockets = [] # type: List[ClientWebSocketResponse]288289async def start_server(self) -> None:290await self._server.start_server(loop=self._loop)291292@property293def host(self) -> str:294return self._server.host295296@property297def port(self) -> Optional[int]:298return self._server.port299300@property301def server(self) -> BaseTestServer:302return self._server303304@property305def app(self) -> Optional[Application]:306return cast(Optional[Application], getattr(self._server, "app", None))307308@property309def session(self) -> ClientSession:310"""An internal aiohttp.ClientSession.311312Unlike the methods on the TestClient, client session requests313do not automatically include the host in the url queried, and314will require an absolute path to the resource.315316"""317return self._session318319def make_url(self, path: str) -> URL:320return self._server.make_url(path)321322async def _request(self, method: str, path: str, **kwargs: Any) -> ClientResponse:323resp = await self._session.request(method, self.make_url(path), **kwargs)324# save it to close later325self._responses.append(resp)326return resp327328def request(self, method: str, path: str, **kwargs: Any) -> _RequestContextManager:329"""Routes a request to tested http server.330331The interface is identical to aiohttp.ClientSession.request,332except the loop kwarg is overridden by the instance used by the333test server.334335"""336return _RequestContextManager(self._request(method, path, **kwargs))337338def get(self, path: str, **kwargs: Any) -> _RequestContextManager:339"""Perform an HTTP GET request."""340return _RequestContextManager(self._request(hdrs.METH_GET, path, **kwargs))341342def post(self, path: str, **kwargs: Any) -> _RequestContextManager:343"""Perform an HTTP POST request."""344return _RequestContextManager(self._request(hdrs.METH_POST, path, **kwargs))345346def options(self, path: str, **kwargs: Any) -> _RequestContextManager:347"""Perform an HTTP OPTIONS request."""348return _RequestContextManager(self._request(hdrs.METH_OPTIONS, path, **kwargs))349350def head(self, path: str, **kwargs: Any) -> _RequestContextManager:351"""Perform an HTTP HEAD request."""352return _RequestContextManager(self._request(hdrs.METH_HEAD, path, **kwargs))353354def put(self, path: str, **kwargs: Any) -> _RequestContextManager:355"""Perform an HTTP PUT request."""356return _RequestContextManager(self._request(hdrs.METH_PUT, path, **kwargs))357358def patch(self, path: str, **kwargs: Any) -> _RequestContextManager:359"""Perform an HTTP PATCH request."""360return _RequestContextManager(self._request(hdrs.METH_PATCH, path, **kwargs))361362def delete(self, path: str, **kwargs: Any) -> _RequestContextManager:363"""Perform an HTTP PATCH request."""364return _RequestContextManager(self._request(hdrs.METH_DELETE, path, **kwargs))365366def ws_connect(self, path: str, **kwargs: Any) -> _WSRequestContextManager:367"""Initiate websocket connection.368369The api corresponds to aiohttp.ClientSession.ws_connect.370371"""372return _WSRequestContextManager(self._ws_connect(path, **kwargs))373374async def _ws_connect(self, path: str, **kwargs: Any) -> ClientWebSocketResponse:375ws = await self._session.ws_connect(self.make_url(path), **kwargs)376self._websockets.append(ws)377return ws378379async def close(self) -> None:380"""Close all fixtures created by the test client.381382After that point, the TestClient is no longer usable.383384This is an idempotent function: running close multiple times385will not have any additional effects.386387close is also run on exit when used as a(n) (asynchronous)388context manager.389390"""391if not self._closed:392for resp in self._responses:393resp.close()394for ws in self._websockets:395await ws.close()396await self._session.close()397await self._server.close()398self._closed = True399400def __enter__(self) -> None:401raise TypeError("Use async with instead")402403def __exit__(404self,405exc_type: Optional[Type[BaseException]],406exc: Optional[BaseException],407tb: Optional[TracebackType],408) -> None:409# __exit__ should exist in pair with __enter__ but never executed410pass # pragma: no cover411412async def __aenter__(self) -> "TestClient":413await self.start_server()414return self415416async def __aexit__(417self,418exc_type: Optional[Type[BaseException]],419exc: Optional[BaseException],420tb: Optional[TracebackType],421) -> None:422await self.close()423424425class AioHTTPTestCase(TestCase):426"""A base class to allow for unittest web applications using aiohttp.427428Provides the following:429430* self.client (aiohttp.test_utils.TestClient): an aiohttp test client.431* self.loop (asyncio.BaseEventLoop): the event loop in which the432application and server are running.433* self.app (aiohttp.web.Application): the application returned by434self.get_application()435436Note that the TestClient's methods are asynchronous: you have to437execute function on the test client using asynchronous methods.438"""439440async def get_application(self) -> Application:441"""Get application.442443This method should be overridden444to return the aiohttp.web.Application445object to test.446"""447return self.get_app()448449def get_app(self) -> Application:450"""Obsolete method used to constructing web application.451452Use .get_application() coroutine instead.453"""454raise RuntimeError("Did you forget to define get_application()?")455456def setUp(self) -> None:457try:458self.loop = asyncio.get_running_loop()459except (AttributeError, RuntimeError): # AttributeError->py36460self.loop = asyncio.get_event_loop_policy().get_event_loop()461462self.loop.run_until_complete(self.setUpAsync())463464async def setUpAsync(self) -> None:465self.app = await self.get_application()466self.server = await self.get_server(self.app)467self.client = await self.get_client(self.server)468469await self.client.start_server()470471def tearDown(self) -> None:472self.loop.run_until_complete(self.tearDownAsync())473474async def tearDownAsync(self) -> None:475await self.client.close()476477async def get_server(self, app: Application) -> TestServer:478"""Return a TestServer instance."""479return TestServer(app, loop=self.loop)480481async def get_client(self, server: TestServer) -> TestClient:482"""Return a TestClient instance."""483return TestClient(server, loop=self.loop)484485486def unittest_run_loop(func: Any, *args: Any, **kwargs: Any) -> Any:487"""488A decorator dedicated to use with asynchronous AioHTTPTestCase test methods.489490In 3.8+, this does nothing.491"""492warnings.warn(493"Decorator `@unittest_run_loop` is no longer needed in aiohttp 3.8+",494DeprecationWarning,495stacklevel=2,496)497return func498499500_LOOP_FACTORY = Callable[[], asyncio.AbstractEventLoop]501502503@contextlib.contextmanager504def loop_context(505loop_factory: _LOOP_FACTORY = asyncio.new_event_loop, fast: bool = False506) -> Iterator[asyncio.AbstractEventLoop]:507"""A contextmanager that creates an event_loop, for test purposes.508509Handles the creation and cleanup of a test loop.510"""511loop = setup_test_loop(loop_factory)512yield loop513teardown_test_loop(loop, fast=fast)514515516def setup_test_loop(517loop_factory: _LOOP_FACTORY = asyncio.new_event_loop,518) -> asyncio.AbstractEventLoop:519"""Create and return an asyncio.BaseEventLoop instance.520521The caller should also call teardown_test_loop,522once they are done with the loop.523"""524loop = loop_factory()525try:526module = loop.__class__.__module__527skip_watcher = "uvloop" in module528except AttributeError: # pragma: no cover529# Just in case530skip_watcher = True531asyncio.set_event_loop(loop)532if sys.platform != "win32" and not skip_watcher:533policy = asyncio.get_event_loop_policy()534watcher: asyncio.AbstractChildWatcher535try: # Python >= 3.8536# Refs:537# * https://github.com/pytest-dev/pytest-xdist/issues/620538# * https://stackoverflow.com/a/58614689/595220539# * https://bugs.python.org/issue35621540# * https://github.com/python/cpython/pull/14344541watcher = asyncio.ThreadedChildWatcher()542except AttributeError: # Python < 3.8543watcher = asyncio.SafeChildWatcher()544watcher.attach_loop(loop)545with contextlib.suppress(NotImplementedError):546policy.set_child_watcher(watcher)547return loop548549550def teardown_test_loop(loop: asyncio.AbstractEventLoop, fast: bool = False) -> None:551"""Teardown and cleanup an event_loop created by setup_test_loop."""552closed = loop.is_closed()553if not closed:554loop.call_soon(loop.stop)555loop.run_forever()556loop.close()557558if not fast:559gc.collect()560561asyncio.set_event_loop(None)562563564def _create_app_mock() -> mock.MagicMock:565def get_dict(app: Any, key: str) -> Any:566return app.__app_dict[key]567568def set_dict(app: Any, key: str, value: Any) -> None:569app.__app_dict[key] = value570571app = mock.MagicMock()572app.__app_dict = {}573app.__getitem__ = get_dict574app.__setitem__ = set_dict575576app._debug = False577app.on_response_prepare = Signal(app)578app.on_response_prepare.freeze()579return app580581582def _create_transport(sslcontext: Optional[SSLContext] = None) -> mock.Mock:583transport = mock.Mock()584585def get_extra_info(key: str) -> Optional[SSLContext]:586if key == "sslcontext":587return sslcontext588else:589return None590591transport.get_extra_info.side_effect = get_extra_info592return transport593594595def make_mocked_request(596method: str,597path: str,598headers: Any = None,599*,600match_info: Any = sentinel,601version: HttpVersion = HttpVersion(1, 1),602closing: bool = False,603app: Any = None,604writer: Any = sentinel,605protocol: Any = sentinel,606transport: Any = sentinel,607payload: Any = sentinel,608sslcontext: Optional[SSLContext] = None,609client_max_size: int = 1024 ** 2,610loop: Any = ...,611) -> Request:612"""Creates mocked web.Request testing purposes.613614Useful in unit tests, when spinning full web server is overkill or615specific conditions and errors are hard to trigger.616"""617task = mock.Mock()618if loop is ...:619loop = mock.Mock()620loop.create_future.return_value = ()621622if version < HttpVersion(1, 1):623closing = True624625if headers:626headers = CIMultiDictProxy(CIMultiDict(headers))627raw_hdrs = tuple(628(k.encode("utf-8"), v.encode("utf-8")) for k, v in headers.items()629)630else:631headers = CIMultiDictProxy(CIMultiDict())632raw_hdrs = ()633634chunked = "chunked" in headers.get(hdrs.TRANSFER_ENCODING, "").lower()635636message = RawRequestMessage(637method,638path,639version,640headers,641raw_hdrs,642closing,643None,644False,645chunked,646URL(path),647)648if app is None:649app = _create_app_mock()650651if transport is sentinel:652transport = _create_transport(sslcontext)653654if protocol is sentinel:655protocol = mock.Mock()656protocol.transport = transport657658if writer is sentinel:659writer = mock.Mock()660writer.write_headers = make_mocked_coro(None)661writer.write = make_mocked_coro(None)662writer.write_eof = make_mocked_coro(None)663writer.drain = make_mocked_coro(None)664writer.transport = transport665666protocol.transport = transport667protocol.writer = writer668669if payload is sentinel:670payload = mock.Mock()671672req = Request(673message, payload, protocol, writer, task, loop, client_max_size=client_max_size674)675676match_info = UrlMappingMatchInfo(677{} if match_info is sentinel else match_info, mock.Mock()678)679match_info.add_app(app)680req._match_info = match_info681682return req683684685def make_mocked_coro(686return_value: Any = sentinel, raise_exception: Any = sentinel687) -> Any:688"""Creates a coroutine mock."""689690async def mock_coro(*args: Any, **kwargs: Any) -> Any:691if raise_exception is not sentinel:692raise raise_exception693if not inspect.isawaitable(return_value):694return return_value695await return_value696697return mock.Mock(wraps=mock_coro)698699700