Path: blob/trunk/py/selenium/webdriver/common/bidi/cdp.py
4041 views
# The MIT License(MIT)1#2# Copyright(c) 2018 Hyperion Gray3#4# Permission is hereby granted, free of charge, to any person obtaining a copy5# of this software and associated documentation files(the "Software"), to deal6# in the Software without restriction, including without limitation the rights7# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell8# copies of the Software, and to permit persons to whom the Software is9# furnished to do so, subject to the following conditions:10#11# The above copyright notice and this permission notice shall be included in12# all copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN20# THE SOFTWARE.21#22# This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp2324import contextvars25import importlib26import itertools27import json28import logging29import pathlib30from collections import defaultdict31from collections.abc import AsyncGenerator, AsyncIterator, Generator32from contextlib import asynccontextmanager, contextmanager33from dataclasses import dataclass34from typing import Any, TypeVar3536import trio37from trio_websocket import ConnectionClosed as WsConnectionClosed38from trio_websocket import connect_websocket_url3940logger = logging.getLogger("trio_cdp")41T = TypeVar("T")42MAX_WS_MESSAGE_SIZE = 2**244344devtools = None45version = None464748def import_devtools(ver):49"""Attempt to load the current latest available devtools into the module cache for use later."""50global devtools51global version52version = ver53base = "selenium.webdriver.common.devtools.v"54try:55devtools = importlib.import_module(f"{base}{ver}")56return devtools57except ModuleNotFoundError:58# Attempt to parse and load the 'most recent' devtools module. This is likely59# because cdp has been updated but selenium python has not been released yet.60devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")61versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())62latest = max(int(x[1:]) for x in versions)63selenium_logger = logging.getLogger(__name__)64selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)65devtools = importlib.import_module(f"{base}{latest}")66return devtools676869_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")70_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")717273def get_connection_context(fn_name):74"""Look up the current connection.7576If there is no current connection, raise a ``RuntimeError`` with a77helpful message.78"""79try:80return _connection_context.get()81except LookupError:82raise RuntimeError(f"{fn_name}() must be called in a connection context.")838485def get_session_context(fn_name):86"""Look up the current session.8788If there is no current session, raise a ``RuntimeError`` with a89helpful message.90"""91try:92return _session_context.get()93except LookupError:94raise RuntimeError(f"{fn_name}() must be called in a session context.")959697@contextmanager98def connection_context(connection):99"""Context manager installs ``connection`` as the session context for the current Trio task."""100token = _connection_context.set(connection)101try:102yield103finally:104_connection_context.reset(token)105106107@contextmanager108def session_context(session):109"""Context manager installs ``session`` as the session context for the current Trio task."""110token = _session_context.set(session)111try:112yield113finally:114_session_context.reset(token)115116117def set_global_connection(connection):118"""Install ``connection`` in the root context so that it will become the default connection for all tasks.119120This is generally not recommended, except it may be necessary in121certain use cases such as running inside Jupyter notebook.122"""123global _connection_context124_connection_context = contextvars.ContextVar("_connection_context", default=connection)125126127def set_global_session(session):128"""Install ``session`` in the root context so that it will become the default session for all tasks.129130This is generally not recommended, except it may be necessary in131certain use cases such as running inside Jupyter notebook.132"""133global _session_context134_session_context = contextvars.ContextVar("_session_context", default=session)135136137class BrowserError(Exception):138"""This exception is raised when the browser's response to a command indicates that an error occurred."""139140def __init__(self, obj):141self.code = obj.get("code")142self.message = obj.get("message")143self.detail = obj.get("data")144145def __str__(self):146return f"BrowserError<code={self.code} message={self.message}> {self.detail}"147148149class CdpConnectionClosed(WsConnectionClosed):150"""Raised when a public method is called on a closed CDP connection."""151152def __init__(self, reason):153"""Constructor.154155Args:156reason: wsproto.frame_protocol.CloseReason157"""158self.reason = reason159160def __repr__(self):161"""Return representation."""162return f"{self.__class__.__name__}<{self.reason}>"163164165class InternalError(Exception):166"""This exception is only raised when there is faulty logic in TrioCDP or the integration with PyCDP."""167168pass169170171@dataclass172class CmEventProxy:173"""A proxy object returned by :meth:`CdpBase.wait_for()``.174175After the context manager executes, this proxy object will have a176value set that contains the returned event.177"""178179value: Any = None180181182class CdpBase:183def __init__(self, ws, session_id, target_id):184self.ws = ws185self.session_id = session_id186self.target_id = target_id187self.channels = defaultdict(set)188self.id_iter = itertools.count()189self.inflight_cmd = {}190self.inflight_result = {}191192async def execute(self, cmd: Generator[dict, T, Any]) -> T:193"""Execute a command on the server and wait for the result.194195Args:196cmd: any CDP command197198Returns:199a CDP result200"""201cmd_id = next(self.id_iter)202cmd_event = trio.Event()203self.inflight_cmd[cmd_id] = cmd, cmd_event204request = next(cmd)205request["id"] = cmd_id206if self.session_id:207request["sessionId"] = self.session_id208request_str = json.dumps(request)209if logger.isEnabledFor(logging.DEBUG):210logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}")211try:212await self.ws.send_message(request_str)213except WsConnectionClosed as wcc:214raise CdpConnectionClosed(wcc.reason) from None215await cmd_event.wait()216response = self.inflight_result.pop(cmd_id)217if logger.isEnabledFor(logging.DEBUG):218logger.debug(f"Received CDP message: {response}")219if isinstance(response, Exception):220if logger.isEnabledFor(logging.DEBUG):221logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}")222raise response223return response224225def listen(self, *event_types, buffer_size=10):226"""Listen for events.227228Returns:229An async iterator that iterates over events matching the indicated types.230"""231sender, receiver = trio.open_memory_channel(buffer_size)232for event_type in event_types:233self.channels[event_type].add(sender)234return receiver235236@asynccontextmanager237async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]:238"""Wait for an event of the given type and return it.239240This is an async context manager, so you should open it inside241an async with block. The block will not exit until the indicated242event is received.243"""244sender: trio.MemorySendChannel245receiver: trio.MemoryReceiveChannel246sender, receiver = trio.open_memory_channel(buffer_size)247self.channels[event_type].add(sender)248proxy = CmEventProxy()249yield proxy250async with receiver:251event = await receiver.receive()252proxy.value = event253254def _handle_data(self, data):255"""Handle incoming WebSocket data.256257Args:258data: a JSON dictionary259"""260if "id" in data:261self._handle_cmd_response(data)262else:263self._handle_event(data)264265def _handle_cmd_response(self, data: dict):266"""Handle a response to a command.267268This will set an event flag that will return control to the269task that called the command.270271Args:272data: response as a JSON dictionary273"""274cmd_id = data["id"]275try:276cmd, event = self.inflight_cmd.pop(cmd_id)277except KeyError:278logger.warning("Got a message with a command ID that does not exist: %s", data)279return280if "error" in data:281# If the server reported an error, convert it to an exception and do282# not process the response any further.283self.inflight_result[cmd_id] = BrowserError(data["error"])284else:285# Otherwise, continue the generator to parse the JSON result286# into a CDP object.287try:288_ = cmd.send(data["result"])289raise InternalError("The command's generator function did not exit when expected!")290except StopIteration as exit:291return_ = exit.value292self.inflight_result[cmd_id] = return_293event.set()294295def _handle_event(self, data: dict):296"""Handle an event.297298Args:299data: event as a JSON dictionary300"""301global devtools302if devtools is None:303raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")304event = devtools.util.parse_json_event(data)305logger.debug("Received event: %s", event)306to_remove = set()307for sender in self.channels[type(event)]:308try:309sender.send_nowait(event)310except trio.WouldBlock:311logger.error('Unable to send event "%r" due to full channel %s', event, sender)312except trio.BrokenResourceError:313to_remove.add(sender)314if to_remove:315self.channels[type(event)] -= to_remove316317318class CdpSession(CdpBase):319"""Contains the state for a CDP session.320321Generally you should not instantiate this object yourself; you should call322:meth:`CdpConnection.open_session`.323"""324325def __init__(self, ws, session_id, target_id):326"""Constructor.327328Args:329ws: trio_websocket.WebSocketConnection330session_id: devtools.target.SessionID331target_id: devtools.target.TargetID332"""333super().__init__(ws, session_id, target_id)334335self._dom_enable_count = 0336self._dom_enable_lock = trio.Lock()337self._page_enable_count = 0338self._page_enable_lock = trio.Lock()339340@asynccontextmanager341async def dom_enable(self):342"""Context manager that executes ``dom.enable()`` when it enters and then calls ``dom.disable()``.343344This keeps track of concurrent callers and only disables DOM345events when all callers have exited.346"""347global devtools348async with self._dom_enable_lock:349self._dom_enable_count += 1350if self._dom_enable_count == 1:351await self.execute(devtools.dom.enable())352353yield354355async with self._dom_enable_lock:356self._dom_enable_count -= 1357if self._dom_enable_count == 0:358await self.execute(devtools.dom.disable())359360@asynccontextmanager361async def page_enable(self):362"""Context manager executes ``page.enable()`` when it enters and then calls ``page.disable()`` when it exits.363364This keeps track of concurrent callers and only disables page365events when all callers have exited.366"""367global devtools368async with self._page_enable_lock:369self._page_enable_count += 1370if self._page_enable_count == 1:371await self.execute(devtools.page.enable())372373yield374375async with self._page_enable_lock:376self._page_enable_count -= 1377if self._page_enable_count == 0:378await self.execute(devtools.page.disable())379380381class CdpConnection(CdpBase, trio.abc.AsyncResource):382"""Contains the connection state for a Chrome DevTools Protocol server.383384CDP can multiplex multiple "sessions" over a single connection. This385class corresponds to the "root" session, i.e. the implicitly created386session that has no session ID. This class is responsible for387reading incoming WebSocket messages and forwarding them to the388corresponding session, as well as handling messages targeted at the389root session itself. You should generally call the390:func:`open_cdp()` instead of instantiating this class directly.391"""392393def __init__(self, ws):394"""Constructor.395396Args:397ws: trio_websocket.WebSocketConnection398"""399super().__init__(ws, session_id=None, target_id=None)400self.sessions = {}401402async def aclose(self):403"""Close the underlying WebSocket connection.404405This will cause the reader task to gracefully exit when it tries406to read the next message from the WebSocket. All of the public407APIs (``execute()``, ``listen()``, etc.) will raise408``CdpConnectionClosed`` after the CDP connection is closed. It409is safe to call this multiple times.410"""411await self.ws.aclose()412413@asynccontextmanager414async def open_session(self, target_id) -> AsyncIterator[CdpSession]:415"""Context manager opens a session and enables the "simple" style of calling CDP APIs.416417For example, inside a session context, you can call ``await418dom.get_document()`` and it will execute on the current session419automatically.420"""421session = await self.connect_session(target_id)422with session_context(session):423yield session424425async def connect_session(self, target_id) -> "CdpSession":426"""Returns a new :class:`CdpSession` connected to the specified target."""427global devtools428if devtools is None:429raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")430session_id = await self.execute(devtools.target.attach_to_target(target_id, True))431session = CdpSession(self.ws, session_id, target_id)432self.sessions[session_id] = session433return session434435async def _reader_task(self):436"""Runs in the background and handles incoming messages.437438Dispatches responses to commands and events to listeners.439"""440global devtools441if devtools is None:442raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")443while True:444try:445message = await self.ws.get_message()446except WsConnectionClosed:447# If the WebSocket is closed, we don't want to throw an448# exception from the reader task. Instead we will throw449# exceptions from the public API methods, and we can quietly450# exit the reader task here.451break452try:453data = json.loads(message)454except json.JSONDecodeError:455raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})456logger.debug("Received message %r", data)457if "sessionId" in data:458session_id = devtools.target.SessionID(data["sessionId"])459try:460session = self.sessions[session_id]461except KeyError:462raise BrowserError(463{464"code": -32700,465"message": "Browser sent a message for an invalid session",466"data": f"{session_id!r}",467}468)469session._handle_data(data)470else:471self._handle_data(data)472473for _, session in self.sessions.items():474for _, senders in session.channels.items():475for sender in senders:476sender.close()477478479@asynccontextmanager480async def open_cdp(url) -> AsyncIterator[CdpConnection]:481"""Async context manager opens a connection to the browser then closes the connection when the block exits.482483The context manager also sets the connection as the default484connection for the current task, so that commands like ``await485target.get_targets()`` will run on this connection automatically. If486you want to use multiple connections concurrently, it is recommended487to open each on in a separate task.488"""489async with trio.open_nursery() as nursery:490conn = await connect_cdp(nursery, url)491try:492with connection_context(conn):493yield conn494finally:495await conn.aclose()496497498async def connect_cdp(nursery, url) -> CdpConnection:499"""Connect to the browser specified by ``url`` and spawn a background task in the specified nursery.500501The ``open_cdp()`` context manager is preferred in most situations.502You should only use this function if you need to specify a custom503nursery. This connection is not automatically closed! You can either504use the connection object as a context manager (``async with505conn:``) or else call ``await conn.aclose()`` on it when you are506done with it. If ``set_context`` is True, then the returned507connection will be installed as the default connection for the508current task. This argument is for unusual use cases, such as509running inside of a notebook.510"""511ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)512cdp_conn = CdpConnection(ws)513nursery.start_soon(cdp_conn._reader_task)514return cdp_conn515516517