Path: blob/trunk/py/selenium/webdriver/common/bidi/script.py
3991 views
# Licensed to the Software Freedom Conservancy (SFC) under one1# or more contributor license agreements. See the NOTICE file2# distributed with this work for additional information3# regarding copyright ownership. The SFC licenses this file4# to you under the Apache License, Version 2.0 (the5# "License"); you may not use this file except in compliance6# with the License. You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing,11# software distributed under the License is distributed on an12# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY13# KIND, either express or implied. See the License for the14# specific language governing permissions and limitations15# under the License.1617import datetime18import math19from dataclasses import dataclass20from typing import Any2122from selenium.common.exceptions import WebDriverException23from selenium.webdriver.common.bidi.common import command_builder24from selenium.webdriver.common.bidi.log import LogEntryAdded25from selenium.webdriver.common.bidi.session import Session262728class ResultOwnership:29"""Represents the possible result ownership types."""3031NONE = "none"32ROOT = "root"333435class RealmType:36"""Represents the possible realm types."""3738WINDOW = "window"39DEDICATED_WORKER = "dedicated-worker"40SHARED_WORKER = "shared-worker"41SERVICE_WORKER = "service-worker"42WORKER = "worker"43PAINT_WORKLET = "paint-worklet"44AUDIO_WORKLET = "audio-worklet"45WORKLET = "worklet"464748@dataclass49class RealmInfo:50"""Represents information about a realm."""5152realm: str53origin: str54type: str55context: str | None = None56sandbox: str | None = None5758@classmethod59def from_json(cls, json: dict[str, Any]) -> "RealmInfo":60"""Creates a RealmInfo instance from a dictionary.6162Args:63json: A dictionary containing the realm information.6465Returns:66RealmInfo: A new instance of RealmInfo.67"""68if "realm" not in json:69raise ValueError("Missing required field 'realm' in RealmInfo")70if "origin" not in json:71raise ValueError("Missing required field 'origin' in RealmInfo")72if "type" not in json:73raise ValueError("Missing required field 'type' in RealmInfo")7475return cls(76realm=json["realm"],77origin=json["origin"],78type=json["type"],79context=json.get("context"),80sandbox=json.get("sandbox"),81)828384@dataclass85class Source:86"""Represents the source of a script message."""8788realm: str89context: str | None = None9091@classmethod92def from_json(cls, json: dict[str, Any]) -> "Source":93"""Creates a Source instance from a dictionary.9495Args:96json: A dictionary containing the source information.9798Returns:99Source: A new instance of Source.100"""101if "realm" not in json:102raise ValueError("Missing required field 'realm' in Source")103104return cls(105realm=json["realm"],106context=json.get("context"),107)108109110@dataclass111class EvaluateResult:112"""Represents the result of script evaluation."""113114type: str115realm: str116result: dict | None = None117exception_details: dict | None = None118119@classmethod120def from_json(cls, json: dict[str, Any]) -> "EvaluateResult":121"""Creates an EvaluateResult instance from a dictionary.122123Args:124json: A dictionary containing the evaluation result.125126Returns:127EvaluateResult: A new instance of EvaluateResult.128"""129if "realm" not in json:130raise ValueError("Missing required field 'realm' in EvaluateResult")131if "type" not in json:132raise ValueError("Missing required field 'type' in EvaluateResult")133134return cls(135type=json["type"],136realm=json["realm"],137result=json.get("result"),138exception_details=json.get("exceptionDetails"),139)140141142class ScriptMessage:143"""Represents a script message event."""144145event_class = "script.message"146147def __init__(self, channel: str, data: dict, source: Source):148self.channel = channel149self.data = data150self.source = source151152@classmethod153def from_json(cls, json: dict[str, Any]) -> "ScriptMessage":154"""Creates a ScriptMessage instance from a dictionary.155156Args:157json: A dictionary containing the script message.158159Returns:160ScriptMessage: A new instance of ScriptMessage.161"""162if "channel" not in json:163raise ValueError("Missing required field 'channel' in ScriptMessage")164if "data" not in json:165raise ValueError("Missing required field 'data' in ScriptMessage")166if "source" not in json:167raise ValueError("Missing required field 'source' in ScriptMessage")168169return cls(170channel=json["channel"],171data=json["data"],172source=Source.from_json(json["source"]),173)174175176class RealmCreated:177"""Represents a realm created event."""178179event_class = "script.realmCreated"180181def __init__(self, realm_info: RealmInfo):182self.realm_info = realm_info183184@classmethod185def from_json(cls, json: dict[str, Any]) -> "RealmCreated":186"""Creates a RealmCreated instance from a dictionary.187188Args:189json: A dictionary containing the realm created event.190191Returns:192RealmCreated: A new instance of RealmCreated.193"""194return cls(realm_info=RealmInfo.from_json(json))195196197class RealmDestroyed:198"""Represents a realm destroyed event."""199200event_class = "script.realmDestroyed"201202def __init__(self, realm: str):203self.realm = realm204205@classmethod206def from_json(cls, json: dict[str, Any]) -> "RealmDestroyed":207"""Creates a RealmDestroyed instance from a dictionary.208209Args:210json: A dictionary containing the realm destroyed event.211212Returns:213RealmDestroyed: A new instance of RealmDestroyed.214"""215if "realm" not in json:216raise ValueError("Missing required field 'realm' in RealmDestroyed")217218return cls(realm=json["realm"])219220221class Script:222"""BiDi implementation of the script module."""223224EVENTS = {225"message": "script.message",226"realm_created": "script.realmCreated",227"realm_destroyed": "script.realmDestroyed",228}229230def __init__(self, conn, driver=None):231self.conn = conn232self.driver = driver233self.log_entry_subscribed = False234self.subscriptions = {}235self.callbacks = {}236237# High-level APIs for SCRIPT module238239def add_console_message_handler(self, handler):240self._subscribe_to_log_entries()241return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("console", handler))242243def add_javascript_error_handler(self, handler):244self._subscribe_to_log_entries()245return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("javascript", handler))246247def remove_console_message_handler(self, id):248self.conn.remove_callback(LogEntryAdded, id)249self._unsubscribe_from_log_entries()250251remove_javascript_error_handler = remove_console_message_handler252253def pin(self, script: str) -> str:254"""Pins a script to the current browsing context.255256Args:257script: The script to pin.258259Returns:260str: The ID of the pinned script.261"""262return self._add_preload_script(script)263264def unpin(self, script_id: str) -> None:265"""Unpins a script from the current browsing context.266267Args:268script_id: The ID of the pinned script to unpin.269"""270self._remove_preload_script(script_id)271272def execute(self, script: str, *args) -> dict:273"""Executes a script in the current browsing context.274275Args:276script: The script function to execute.277*args: Arguments to pass to the script function.278279Returns:280dict: The result value from the script execution.281282Raises:283WebDriverException: If the script execution fails.284"""285if self.driver is None:286raise WebDriverException("Driver reference is required for script execution")287browsing_context_id = self.driver.current_window_handle288289# Convert arguments to the format expected by BiDi call_function (LocalValue Type)290arguments = []291for arg in args:292arguments.append(self.__convert_to_local_value(arg))293294target = {"context": browsing_context_id}295296result = self._call_function(297function_declaration=script, await_promise=True, target=target, arguments=arguments if arguments else None298)299300if result.type == "success":301return result.result if result.result is not None else {}302else:303error_message = "Error while executing script"304if result.exception_details:305if "text" in result.exception_details:306error_message += f": {result.exception_details['text']}"307elif "message" in result.exception_details:308error_message += f": {result.exception_details['message']}"309310raise WebDriverException(error_message)311312def __convert_to_local_value(self, value) -> dict:313"""Converts a Python value to BiDi LocalValue format."""314if value is None:315return {"type": "null"}316elif isinstance(value, bool):317return {"type": "boolean", "value": value}318elif isinstance(value, (int, float)):319if isinstance(value, float):320if math.isnan(value):321return {"type": "number", "value": "NaN"}322elif math.isinf(value):323if value > 0:324return {"type": "number", "value": "Infinity"}325else:326return {"type": "number", "value": "-Infinity"}327elif value == 0.0 and math.copysign(1.0, value) < 0:328return {"type": "number", "value": "-0"}329330JS_MAX_SAFE_INTEGER = 9007199254740991331if isinstance(value, int) and (value > JS_MAX_SAFE_INTEGER or value < -JS_MAX_SAFE_INTEGER):332return {"type": "bigint", "value": str(value)}333334return {"type": "number", "value": value}335336elif isinstance(value, str):337return {"type": "string", "value": value}338elif isinstance(value, datetime.datetime):339# Convert Python datetime to JavaScript Date (ISO 8601 format)340return {"type": "date", "value": value.isoformat() + "Z" if value.tzinfo is None else value.isoformat()}341elif isinstance(value, datetime.date):342# Convert Python date to JavaScript Date343dt = datetime.datetime.combine(value, datetime.time.min).replace(tzinfo=datetime.timezone.utc)344return {"type": "date", "value": dt.isoformat()}345elif isinstance(value, set):346return {"type": "set", "value": [self.__convert_to_local_value(item) for item in value]}347elif isinstance(value, (list, tuple)):348return {"type": "array", "value": [self.__convert_to_local_value(item) for item in value]}349elif isinstance(value, dict):350return {351"type": "object",352"value": [353[self.__convert_to_local_value(k), self.__convert_to_local_value(v)] for k, v in value.items()354],355}356else:357# For other types, convert to string358return {"type": "string", "value": str(value)}359360# low-level APIs for script module361def _add_preload_script(362self,363function_declaration: str,364arguments: list[dict[str, Any]] | None = None,365contexts: list[str] | None = None,366user_contexts: list[str] | None = None,367sandbox: str | None = None,368) -> str:369"""Adds a preload script.370371Args:372function_declaration: The function declaration to preload.373arguments: The arguments to pass to the function.374contexts: The browsing context IDs to apply the script to.375user_contexts: The user context IDs to apply the script to.376sandbox: The sandbox name to apply the script to.377378Returns:379str: The preload script ID.380381Raises:382ValueError: If both contexts and user_contexts are provided.383"""384if contexts is not None and user_contexts is not None:385raise ValueError("Cannot specify both contexts and user_contexts")386387params: dict[str, Any] = {"functionDeclaration": function_declaration}388389if arguments is not None:390params["arguments"] = arguments391if contexts is not None:392params["contexts"] = contexts393if user_contexts is not None:394params["userContexts"] = user_contexts395if sandbox is not None:396params["sandbox"] = sandbox397398result = self.conn.execute(command_builder("script.addPreloadScript", params))399return result["script"]400401def _remove_preload_script(self, script_id: str) -> None:402"""Removes a preload script.403404Args:405script_id: The preload script ID to remove.406"""407params = {"script": script_id}408self.conn.execute(command_builder("script.removePreloadScript", params))409410def _disown(self, handles: list[str], target: dict) -> None:411"""Disowns the given handles.412413Args:414handles: The handles to disown.415target: The target realm or context.416"""417params = {418"handles": handles,419"target": target,420}421self.conn.execute(command_builder("script.disown", params))422423def _call_function(424self,425function_declaration: str,426await_promise: bool,427target: dict,428arguments: list[dict] | None = None,429result_ownership: str | None = None,430serialization_options: dict | None = None,431this: dict | None = None,432user_activation: bool = False,433) -> EvaluateResult:434"""Calls a provided function with given arguments in a given realm.435436Args:437function_declaration: The function declaration to call.438await_promise: Whether to await promise resolution.439target: The target realm or context.440arguments: The arguments to pass to the function.441result_ownership: The result ownership type.442serialization_options: The serialization options.443this: The 'this' value for the function call.444user_activation: Whether to trigger user activation.445446Returns:447EvaluateResult: The result of the function call.448"""449params = {450"functionDeclaration": function_declaration,451"awaitPromise": await_promise,452"target": target,453"userActivation": user_activation,454}455456if arguments is not None:457params["arguments"] = arguments458if result_ownership is not None:459params["resultOwnership"] = result_ownership460if serialization_options is not None:461params["serializationOptions"] = serialization_options462if this is not None:463params["this"] = this464465result = self.conn.execute(command_builder("script.callFunction", params))466return EvaluateResult.from_json(result)467468def _evaluate(469self,470expression: str,471target: dict,472await_promise: bool,473result_ownership: str | None = None,474serialization_options: dict | None = None,475user_activation: bool = False,476) -> EvaluateResult:477"""Evaluates a provided script in a given realm.478479Args:480expression: The script expression to evaluate.481target: The target realm or context.482await_promise: Whether to await promise resolution.483result_ownership: The result ownership type.484serialization_options: The serialization options.485user_activation: Whether to trigger user activation.486487Returns:488EvaluateResult: The result of the script evaluation.489"""490params = {491"expression": expression,492"target": target,493"awaitPromise": await_promise,494"userActivation": user_activation,495}496497if result_ownership is not None:498params["resultOwnership"] = result_ownership499if serialization_options is not None:500params["serializationOptions"] = serialization_options501502result = self.conn.execute(command_builder("script.evaluate", params))503return EvaluateResult.from_json(result)504505def _get_realms(506self,507context: str | None = None,508type: str | None = None,509) -> list[RealmInfo]:510"""Returns a list of all realms, optionally filtered.511512Args:513context: The browsing context ID to filter by.514type: The realm type to filter by.515516Returns:517List[RealmInfo]: A list of realm information.518"""519params = {}520521if context is not None:522params["context"] = context523if type is not None:524params["type"] = type525526result = self.conn.execute(command_builder("script.getRealms", params))527return [RealmInfo.from_json(realm) for realm in result["realms"]]528529def _subscribe_to_log_entries(self):530if not self.log_entry_subscribed:531session = Session(self.conn)532self.conn.execute(session.subscribe(LogEntryAdded.event_class))533self.log_entry_subscribed = True534535def _unsubscribe_from_log_entries(self):536if self.log_entry_subscribed and LogEntryAdded.event_class not in self.conn.callbacks:537session = Session(self.conn)538self.conn.execute(session.unsubscribe(LogEntryAdded.event_class))539self.log_entry_subscribed = False540541def _handle_log_entry(self, type, handler):542def _handle_log_entry(log_entry):543if log_entry.type_ == type:544handler(log_entry)545546return _handle_log_entry547548549