Path: blob/trunk/py/selenium/webdriver/common/bidi/cdp.py
1864 views
# The MIT License(MIT)1#2# Copyright(c) 2018 Hyperion Gray3#4# Permission is hereby granted, free of charge, to any person obtaining a copy5# of this software and associated documentation files(the "Software"), to deal6# in the Software without restriction, including without limitation the rights7# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell8# copies of the Software, and to permit persons to whom the Software is9# furnished to do so, subject to the following conditions:10#11# The above copyright notice and this permission notice shall be included in12# all copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN20# THE SOFTWARE.21#22# This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp2324# flake8: noqa2526import contextvars27import importlib28import itertools29import json30import logging31import pathlib32from collections import defaultdict33from contextlib import asynccontextmanager34from contextlib import contextmanager35from dataclasses import dataclass36from typing import Any37from collections.abc import AsyncGenerator38from collections.abc import AsyncIterator39from collections.abc import Generator40from typing import Type41from typing import TypeVar4243import trio44from trio_websocket import ConnectionClosed as WsConnectionClosed45from trio_websocket import connect_websocket_url4647logger = logging.getLogger("trio_cdp")48T = TypeVar("T")49MAX_WS_MESSAGE_SIZE = 2**245051devtools = None52version = None535455def import_devtools(ver):56"""Attempt to load the current latest available devtools into the module57cache for use later."""58global devtools59global version60version = ver61base = "selenium.webdriver.common.devtools.v"62try:63devtools = importlib.import_module(f"{base}{ver}")64return devtools65except ModuleNotFoundError:66# Attempt to parse and load the 'most recent' devtools module. This is likely67# because cdp has been updated but selenium python has not been released yet.68devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")69versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())70latest = max(int(x[1:]) for x in versions)71selenium_logger = logging.getLogger(__name__)72selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)73devtools = importlib.import_module(f"{base}{latest}")74return devtools757677_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")78_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")798081def get_connection_context(fn_name):82"""Look up the current connection.8384If there is no current connection, raise a ``RuntimeError`` with a85helpful message.86"""87try:88return _connection_context.get()89except LookupError:90raise RuntimeError(f"{fn_name}() must be called in a connection context.")919293def get_session_context(fn_name):94"""Look up the current session.9596If there is no current session, raise a ``RuntimeError`` with a97helpful message.98"""99try:100return _session_context.get()101except LookupError:102raise RuntimeError(f"{fn_name}() must be called in a session context.")103104105@contextmanager106def connection_context(connection):107"""This context manager installs ``connection`` as the session context for108the current Trio task."""109token = _connection_context.set(connection)110try:111yield112finally:113_connection_context.reset(token)114115116@contextmanager117def session_context(session):118"""This context manager installs ``session`` as the session context for the119current Trio task."""120token = _session_context.set(session)121try:122yield123finally:124_session_context.reset(token)125126127def set_global_connection(connection):128"""Install ``connection`` in the root context so that it will become the129default connection for all tasks.130131This is generally not recommended, except it may be necessary in132certain use cases such as running inside Jupyter notebook.133"""134global _connection_context135_connection_context = contextvars.ContextVar("_connection_context", default=connection)136137138def set_global_session(session):139"""Install ``session`` in the root context so that it will become the140default session for all tasks.141142This is generally not recommended, except it may be necessary in143certain use cases such as running inside Jupyter notebook.144"""145global _session_context146_session_context = contextvars.ContextVar("_session_context", default=session)147148149class BrowserError(Exception):150"""This exception is raised when the browser's response to a command151indicates that an error occurred."""152153def __init__(self, obj):154self.code = obj.get("code")155self.message = obj.get("message")156self.detail = obj.get("data")157158def __str__(self):159return f"BrowserError<code={self.code} message={self.message}> {self.detail}"160161162class CdpConnectionClosed(WsConnectionClosed):163"""Raised when a public method is called on a closed CDP connection."""164165def __init__(self, reason):166"""Constructor.167168:param reason:169:type reason: wsproto.frame_protocol.CloseReason170"""171self.reason = reason172173def __repr__(self):174"""Return representation."""175return f"{self.__class__.__name__}<{self.reason}>"176177178class InternalError(Exception):179"""This exception is only raised when there is faulty logic in TrioCDP or180the integration with PyCDP."""181182183@dataclass184class CmEventProxy:185"""A proxy object returned by :meth:`CdpBase.wait_for()``.186187After the context manager executes, this proxy object will have a188value set that contains the returned event.189"""190191value: Any = None192193194class CdpBase:195def __init__(self, ws, session_id, target_id):196self.ws = ws197self.session_id = session_id198self.target_id = target_id199self.channels = defaultdict(set)200self.id_iter = itertools.count()201self.inflight_cmd = {}202self.inflight_result = {}203204async def execute(self, cmd: Generator[dict, T, Any]) -> T:205"""Execute a command on the server and wait for the result.206207:param cmd: any CDP command208:returns: a CDP result209"""210cmd_id = next(self.id_iter)211cmd_event = trio.Event()212self.inflight_cmd[cmd_id] = cmd, cmd_event213request = next(cmd)214request["id"] = cmd_id215if self.session_id:216request["sessionId"] = self.session_id217request_str = json.dumps(request)218if logger.isEnabledFor(logging.DEBUG):219logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}")220try:221await self.ws.send_message(request_str)222except WsConnectionClosed as wcc:223raise CdpConnectionClosed(wcc.reason) from None224await cmd_event.wait()225response = self.inflight_result.pop(cmd_id)226if logger.isEnabledFor(logging.DEBUG):227logger.debug(f"Received CDP message: {response}")228if isinstance(response, Exception):229if logger.isEnabledFor(logging.DEBUG):230logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}")231raise response232return response233234def listen(self, *event_types, buffer_size=10):235"""Return an async iterator that iterates over events matching the236indicated types."""237sender, receiver = trio.open_memory_channel(buffer_size)238for event_type in event_types:239self.channels[event_type].add(sender)240return receiver241242@asynccontextmanager243async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]:244"""Wait for an event of the given type and return it.245246This is an async context manager, so you should open it inside247an async with block. The block will not exit until the indicated248event is received.249"""250sender: trio.MemorySendChannel251receiver: trio.MemoryReceiveChannel252sender, receiver = trio.open_memory_channel(buffer_size)253self.channels[event_type].add(sender)254proxy = CmEventProxy()255yield proxy256async with receiver:257event = await receiver.receive()258proxy.value = event259260def _handle_data(self, data):261"""Handle incoming WebSocket data.262263:param dict data: a JSON dictionary264"""265if "id" in data:266self._handle_cmd_response(data)267else:268self._handle_event(data)269270def _handle_cmd_response(self, data):271"""Handle a response to a command. This will set an event flag that272will return control to the task that called the command.273274:param dict data: response as a JSON dictionary275"""276cmd_id = data["id"]277try:278cmd, event = self.inflight_cmd.pop(cmd_id)279except KeyError:280logger.warning("Got a message with a command ID that does not exist: %s", data)281return282if "error" in data:283# If the server reported an error, convert it to an exception and do284# not process the response any further.285self.inflight_result[cmd_id] = BrowserError(data["error"])286else:287# Otherwise, continue the generator to parse the JSON result288# into a CDP object.289try:290_ = cmd.send(data["result"])291raise InternalError("The command's generator function did not exit when expected!")292except StopIteration as exit:293return_ = exit.value294self.inflight_result[cmd_id] = return_295event.set()296297def _handle_event(self, data):298"""Handle an event.299300:param dict data: event as a JSON dictionary301"""302global devtools303event = devtools.util.parse_json_event(data)304logger.debug("Received event: %s", event)305to_remove = set()306for sender in self.channels[type(event)]:307try:308sender.send_nowait(event)309except trio.WouldBlock:310logger.error('Unable to send event "%r" due to full channel %s', event, sender)311except trio.BrokenResourceError:312to_remove.add(sender)313if to_remove:314self.channels[type(event)] -= to_remove315316317class CdpSession(CdpBase):318"""Contains the state for a CDP session.319320Generally you should not instantiate this object yourself; you should call321:meth:`CdpConnection.open_session`.322"""323324def __init__(self, ws, session_id, target_id):325"""Constructor.326327:param trio_websocket.WebSocketConnection ws:328:param devtools.target.SessionID session_id:329:param devtools.target.TargetID target_id:330"""331super().__init__(ws, session_id, target_id)332333self._dom_enable_count = 0334self._dom_enable_lock = trio.Lock()335self._page_enable_count = 0336self._page_enable_lock = trio.Lock()337338@asynccontextmanager339async def dom_enable(self):340"""A context manager that executes ``dom.enable()`` when it enters and341then calls ``dom.disable()``.342343This keeps track of concurrent callers and only disables DOM344events when all callers have exited.345"""346global devtools347async with self._dom_enable_lock:348self._dom_enable_count += 1349if self._dom_enable_count == 1:350await self.execute(devtools.dom.enable())351352yield353354async with self._dom_enable_lock:355self._dom_enable_count -= 1356if self._dom_enable_count == 0:357await self.execute(devtools.dom.disable())358359@asynccontextmanager360async def page_enable(self):361"""A context manager that executes ``page.enable()`` when it enters and362then calls ``page.disable()`` when it exits.363364This keeps track of concurrent callers and only disables page365events when all callers have exited.366"""367global devtools368async with self._page_enable_lock:369self._page_enable_count += 1370if self._page_enable_count == 1:371await self.execute(devtools.page.enable())372373yield374375async with self._page_enable_lock:376self._page_enable_count -= 1377if self._page_enable_count == 0:378await self.execute(devtools.page.disable())379380381class CdpConnection(CdpBase, trio.abc.AsyncResource):382"""Contains the connection state for a Chrome DevTools Protocol server.383384CDP can multiplex multiple "sessions" over a single connection. This385class corresponds to the "root" session, i.e. the implicitly created386session that has no session ID. This class is responsible for387reading incoming WebSocket messages and forwarding them to the388corresponding session, as well as handling messages targeted at the389root session itself. You should generally call the390:func:`open_cdp()` instead of instantiating this class directly.391"""392393def __init__(self, ws):394"""Constructor.395396:param trio_websocket.WebSocketConnection ws:397"""398super().__init__(ws, session_id=None, target_id=None)399self.sessions = {}400401async def aclose(self):402"""Close the underlying WebSocket connection.403404This will cause the reader task to gracefully exit when it tries405to read the next message from the WebSocket. All of the public406APIs (``execute()``, ``listen()``, etc.) will raise407``CdpConnectionClosed`` after the CDP connection is closed. It408is safe to call this multiple times.409"""410await self.ws.aclose()411412@asynccontextmanager413async def open_session(self, target_id) -> AsyncIterator[CdpSession]:414"""This context manager opens a session and enables the "simple" style415of calling CDP APIs.416417For example, inside a session context, you can call ``await418dom.get_document()`` and it will execute on the current session419automatically.420"""421session = await self.connect_session(target_id)422with session_context(session):423yield session424425async def connect_session(self, target_id) -> "CdpSession":426"""Returns a new :class:`CdpSession` connected to the specified427target."""428global devtools429session_id = await self.execute(devtools.target.attach_to_target(target_id, True))430session = CdpSession(self.ws, session_id, target_id)431self.sessions[session_id] = session432return session433434async def _reader_task(self):435"""Runs in the background and handles incoming messages: dispatching436responses to commands and events to listeners."""437global devtools438while True:439try:440message = await self.ws.get_message()441except WsConnectionClosed:442# If the WebSocket is closed, we don't want to throw an443# exception from the reader task. Instead we will throw444# exceptions from the public API methods, and we can quietly445# exit the reader task here.446break447try:448data = json.loads(message)449except json.JSONDecodeError:450raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})451logger.debug("Received message %r", data)452if "sessionId" in data:453session_id = devtools.target.SessionID(data["sessionId"])454try:455session = self.sessions[session_id]456except KeyError:457raise BrowserError(458{459"code": -32700,460"message": "Browser sent a message for an invalid session",461"data": f"{session_id!r}",462}463)464session._handle_data(data)465else:466self._handle_data(data)467468for _, session in self.sessions.items():469for _, senders in session.channels.items():470for sender in senders:471sender.close()472473474@asynccontextmanager475async def open_cdp(url) -> AsyncIterator[CdpConnection]:476"""This async context manager opens a connection to the browser specified477by ``url`` before entering the block, then closes the connection when the478block exits.479480The context manager also sets the connection as the default481connection for the current task, so that commands like ``await482target.get_targets()`` will run on this connection automatically. If483you want to use multiple connections concurrently, it is recommended484to open each on in a separate task.485"""486487async with trio.open_nursery() as nursery:488conn = await connect_cdp(nursery, url)489try:490with connection_context(conn):491yield conn492finally:493await conn.aclose()494495496async def connect_cdp(nursery, url) -> CdpConnection:497"""Connect to the browser specified by ``url`` and spawn a background task498in the specified nursery.499500The ``open_cdp()`` context manager is preferred in most situations.501You should only use this function if you need to specify a custom502nursery. This connection is not automatically closed! You can either503use the connection object as a context manager (``async with504conn:``) or else call ``await conn.aclose()`` on it when you are505done with it. If ``set_context`` is True, then the returned506connection will be installed as the default connection for the507current task. This argument is for unusual use cases, such as508running inside of a notebook.509"""510ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)511cdp_conn = CdpConnection(ws)512nursery.start_soon(cdp_conn._reader_task)513return cdp_conn514515516