Path: blob/trunk/py/selenium/webdriver/common/bidi/script.py
1864 views
# Licensed to the Software Freedom Conservancy (SFC) under one1# or more contributor license agreements. See the NOTICE file2# distributed with this work for additional information3# regarding copyright ownership. The SFC licenses this file4# to you under the Apache License, Version 2.0 (the5# "License"); you may not use this file except in compliance6# with the License. You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing,11# software distributed under the License is distributed on an12# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY13# KIND, either express or implied. See the License for the14# specific language governing permissions and limitations15# under the License.1617import datetime18import math19from dataclasses import dataclass20from typing import Any, Optional2122from selenium.common.exceptions import WebDriverException23from selenium.webdriver.common.bidi.common import command_builder2425from .log import LogEntryAdded26from .session import Session272829class ResultOwnership:30"""Represents the possible result ownership types."""3132NONE = "none"33ROOT = "root"343536class RealmType:37"""Represents the possible realm types."""3839WINDOW = "window"40DEDICATED_WORKER = "dedicated-worker"41SHARED_WORKER = "shared-worker"42SERVICE_WORKER = "service-worker"43WORKER = "worker"44PAINT_WORKLET = "paint-worklet"45AUDIO_WORKLET = "audio-worklet"46WORKLET = "worklet"474849@dataclass50class RealmInfo:51"""Represents information about a realm."""5253realm: str54origin: str55type: str56context: Optional[str] = None57sandbox: Optional[str] = None5859@classmethod60def from_json(cls, json: dict[str, Any]) -> "RealmInfo":61"""Creates a RealmInfo instance from a dictionary.6263Parameters:64-----------65json: A dictionary containing the realm information.6667Returns:68-------69RealmInfo: A new instance of RealmInfo.70"""71if "realm" not in json:72raise ValueError("Missing required field 'realm' in RealmInfo")73if "origin" not in json:74raise ValueError("Missing required field 'origin' in RealmInfo")75if "type" not in json:76raise ValueError("Missing required field 'type' in RealmInfo")7778return cls(79realm=json["realm"],80origin=json["origin"],81type=json["type"],82context=json.get("context"),83sandbox=json.get("sandbox"),84)858687@dataclass88class Source:89"""Represents the source of a script message."""9091realm: str92context: Optional[str] = None9394@classmethod95def from_json(cls, json: dict[str, Any]) -> "Source":96"""Creates a Source instance from a dictionary.9798Parameters:99-----------100json: A dictionary containing the source information.101102Returns:103-------104Source: A new instance of Source.105"""106if "realm" not in json:107raise ValueError("Missing required field 'realm' in Source")108109return cls(110realm=json["realm"],111context=json.get("context"),112)113114115@dataclass116class EvaluateResult:117"""Represents the result of script evaluation."""118119type: str120realm: str121result: Optional[dict] = None122exception_details: Optional[dict] = None123124@classmethod125def from_json(cls, json: dict[str, Any]) -> "EvaluateResult":126"""Creates an EvaluateResult instance from a dictionary.127128Parameters:129-----------130json: A dictionary containing the evaluation result.131132Returns:133-------134EvaluateResult: A new instance of EvaluateResult.135"""136if "realm" not in json:137raise ValueError("Missing required field 'realm' in EvaluateResult")138if "type" not in json:139raise ValueError("Missing required field 'type' in EvaluateResult")140141return cls(142type=json["type"],143realm=json["realm"],144result=json.get("result"),145exception_details=json.get("exceptionDetails"),146)147148149class ScriptMessage:150"""Represents a script message event."""151152event_class = "script.message"153154def __init__(self, channel: str, data: dict, source: Source):155self.channel = channel156self.data = data157self.source = source158159@classmethod160def from_json(cls, json: dict[str, Any]) -> "ScriptMessage":161"""Creates a ScriptMessage instance from a dictionary.162163Parameters:164-----------165json: A dictionary containing the script message.166167Returns:168-------169ScriptMessage: A new instance of ScriptMessage.170"""171if "channel" not in json:172raise ValueError("Missing required field 'channel' in ScriptMessage")173if "data" not in json:174raise ValueError("Missing required field 'data' in ScriptMessage")175if "source" not in json:176raise ValueError("Missing required field 'source' in ScriptMessage")177178return cls(179channel=json["channel"],180data=json["data"],181source=Source.from_json(json["source"]),182)183184185class RealmCreated:186"""Represents a realm created event."""187188event_class = "script.realmCreated"189190def __init__(self, realm_info: RealmInfo):191self.realm_info = realm_info192193@classmethod194def from_json(cls, json: dict[str, Any]) -> "RealmCreated":195"""Creates a RealmCreated instance from a dictionary.196197Parameters:198-----------199json: A dictionary containing the realm created event.200201Returns:202-------203RealmCreated: A new instance of RealmCreated.204"""205return cls(realm_info=RealmInfo.from_json(json))206207208class RealmDestroyed:209"""Represents a realm destroyed event."""210211event_class = "script.realmDestroyed"212213def __init__(self, realm: str):214self.realm = realm215216@classmethod217def from_json(cls, json: dict[str, Any]) -> "RealmDestroyed":218"""Creates a RealmDestroyed instance from a dictionary.219220Parameters:221-----------222json: A dictionary containing the realm destroyed event.223224Returns:225-------226RealmDestroyed: A new instance of RealmDestroyed.227"""228if "realm" not in json:229raise ValueError("Missing required field 'realm' in RealmDestroyed")230231return cls(realm=json["realm"])232233234class Script:235"""BiDi implementation of the script module."""236237EVENTS = {238"message": "script.message",239"realm_created": "script.realmCreated",240"realm_destroyed": "script.realmDestroyed",241}242243def __init__(self, conn, driver=None):244self.conn = conn245self.driver = driver246self.log_entry_subscribed = False247self.subscriptions = {}248self.callbacks = {}249250# High-level APIs for SCRIPT module251252def add_console_message_handler(self, handler):253self._subscribe_to_log_entries()254return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("console", handler))255256def add_javascript_error_handler(self, handler):257self._subscribe_to_log_entries()258return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("javascript", handler))259260def remove_console_message_handler(self, id):261self.conn.remove_callback(LogEntryAdded, id)262self._unsubscribe_from_log_entries()263264remove_javascript_error_handler = remove_console_message_handler265266def pin(self, script: str) -> str:267"""Pins a script to the current browsing context.268269Parameters:270-----------271script: The script to pin.272273Returns:274-------275str: The ID of the pinned script.276"""277return self._add_preload_script(script)278279def unpin(self, script_id: str) -> None:280"""Unpins a script from the current browsing context.281282Parameters:283-----------284script_id: The ID of the pinned script to unpin.285"""286self._remove_preload_script(script_id)287288def execute(self, script: str, *args) -> dict:289"""Executes a script in the current browsing context.290291Parameters:292-----------293script: The script function to execute.294*args: Arguments to pass to the script function.295296Returns:297-------298dict: The result value from the script execution.299300Raises:301------302WebDriverException: If the script execution fails.303"""304305if self.driver is None:306raise WebDriverException("Driver reference is required for script execution")307browsing_context_id = self.driver.current_window_handle308309# Convert arguments to the format expected by BiDi call_function (LocalValue Type)310arguments = []311for arg in args:312arguments.append(self.__convert_to_local_value(arg))313314target = {"context": browsing_context_id}315316result = self._call_function(317function_declaration=script, await_promise=True, target=target, arguments=arguments if arguments else None318)319320if result.type == "success":321return result.result if result.result is not None else {}322else:323error_message = "Error while executing script"324if result.exception_details:325if "text" in result.exception_details:326error_message += f": {result.exception_details['text']}"327elif "message" in result.exception_details:328error_message += f": {result.exception_details['message']}"329330raise WebDriverException(error_message)331332def __convert_to_local_value(self, value) -> dict:333"""334Converts a Python value to BiDi LocalValue format.335"""336if value is None:337return {"type": "null"}338elif isinstance(value, bool):339return {"type": "boolean", "value": value}340elif isinstance(value, (int, float)):341if isinstance(value, float):342if math.isnan(value):343return {"type": "number", "value": "NaN"}344elif math.isinf(value):345if value > 0:346return {"type": "number", "value": "Infinity"}347else:348return {"type": "number", "value": "-Infinity"}349elif value == 0.0 and math.copysign(1.0, value) < 0:350return {"type": "number", "value": "-0"}351352JS_MAX_SAFE_INTEGER = 9007199254740991353if isinstance(value, int) and (value > JS_MAX_SAFE_INTEGER or value < -JS_MAX_SAFE_INTEGER):354return {"type": "bigint", "value": str(value)}355356return {"type": "number", "value": value}357358elif isinstance(value, str):359return {"type": "string", "value": value}360elif isinstance(value, datetime.datetime):361# Convert Python datetime to JavaScript Date (ISO 8601 format)362return {"type": "date", "value": value.isoformat() + "Z" if value.tzinfo is None else value.isoformat()}363elif isinstance(value, datetime.date):364# Convert Python date to JavaScript Date365dt = datetime.datetime.combine(value, datetime.time.min).replace(tzinfo=datetime.timezone.utc)366return {"type": "date", "value": dt.isoformat()}367elif isinstance(value, set):368return {"type": "set", "value": [self.__convert_to_local_value(item) for item in value]}369elif isinstance(value, (list, tuple)):370return {"type": "array", "value": [self.__convert_to_local_value(item) for item in value]}371elif isinstance(value, dict):372return {373"type": "object",374"value": [375[self.__convert_to_local_value(k), self.__convert_to_local_value(v)] for k, v in value.items()376],377}378else:379# For other types, convert to string380return {"type": "string", "value": str(value)}381382# low-level APIs for script module383def _add_preload_script(384self,385function_declaration: str,386arguments: Optional[list[dict[str, Any]]] = None,387contexts: Optional[list[str]] = None,388user_contexts: Optional[list[str]] = None,389sandbox: Optional[str] = None,390) -> str:391"""Adds a preload script.392393Parameters:394-----------395function_declaration: The function declaration to preload.396arguments: The arguments to pass to the function.397contexts: The browsing context IDs to apply the script to.398user_contexts: The user context IDs to apply the script to.399sandbox: The sandbox name to apply the script to.400401Returns:402-------403str: The preload script ID.404405Raises:406------407ValueError: If both contexts and user_contexts are provided.408"""409if contexts is not None and user_contexts is not None:410raise ValueError("Cannot specify both contexts and user_contexts")411412params: dict[str, Any] = {"functionDeclaration": function_declaration}413414if arguments is not None:415params["arguments"] = arguments416if contexts is not None:417params["contexts"] = contexts418if user_contexts is not None:419params["userContexts"] = user_contexts420if sandbox is not None:421params["sandbox"] = sandbox422423result = self.conn.execute(command_builder("script.addPreloadScript", params))424return result["script"]425426def _remove_preload_script(self, script_id: str) -> None:427"""Removes a preload script.428429Parameters:430-----------431script_id: The preload script ID to remove.432"""433params = {"script": script_id}434self.conn.execute(command_builder("script.removePreloadScript", params))435436def _disown(self, handles: list[str], target: dict) -> None:437"""Disowns the given handles.438439Parameters:440-----------441handles: The handles to disown.442target: The target realm or context.443"""444params = {445"handles": handles,446"target": target,447}448self.conn.execute(command_builder("script.disown", params))449450def _call_function(451self,452function_declaration: str,453await_promise: bool,454target: dict,455arguments: Optional[list[dict]] = None,456result_ownership: Optional[str] = None,457serialization_options: Optional[dict] = None,458this: Optional[dict] = None,459user_activation: bool = False,460) -> EvaluateResult:461"""Calls a provided function with given arguments in a given realm.462463Parameters:464-----------465function_declaration: The function declaration to call.466await_promise: Whether to await promise resolution.467target: The target realm or context.468arguments: The arguments to pass to the function.469result_ownership: The result ownership type.470serialization_options: The serialization options.471this: The 'this' value for the function call.472user_activation: Whether to trigger user activation.473474Returns:475-------476EvaluateResult: The result of the function call.477"""478params = {479"functionDeclaration": function_declaration,480"awaitPromise": await_promise,481"target": target,482"userActivation": user_activation,483}484485if arguments is not None:486params["arguments"] = arguments487if result_ownership is not None:488params["resultOwnership"] = result_ownership489if serialization_options is not None:490params["serializationOptions"] = serialization_options491if this is not None:492params["this"] = this493494result = self.conn.execute(command_builder("script.callFunction", params))495return EvaluateResult.from_json(result)496497def _evaluate(498self,499expression: str,500target: dict,501await_promise: bool,502result_ownership: Optional[str] = None,503serialization_options: Optional[dict] = None,504user_activation: bool = False,505) -> EvaluateResult:506"""Evaluates a provided script in a given realm.507508Parameters:509-----------510expression: The script expression to evaluate.511target: The target realm or context.512await_promise: Whether to await promise resolution.513result_ownership: The result ownership type.514serialization_options: The serialization options.515user_activation: Whether to trigger user activation.516517Returns:518-------519EvaluateResult: The result of the script evaluation.520"""521params = {522"expression": expression,523"target": target,524"awaitPromise": await_promise,525"userActivation": user_activation,526}527528if result_ownership is not None:529params["resultOwnership"] = result_ownership530if serialization_options is not None:531params["serializationOptions"] = serialization_options532533result = self.conn.execute(command_builder("script.evaluate", params))534return EvaluateResult.from_json(result)535536def _get_realms(537self,538context: Optional[str] = None,539type: Optional[str] = None,540) -> list[RealmInfo]:541"""Returns a list of all realms, optionally filtered.542543Parameters:544-----------545context: The browsing context ID to filter by.546type: The realm type to filter by.547548Returns:549-------550List[RealmInfo]: A list of realm information.551"""552params = {}553554if context is not None:555params["context"] = context556if type is not None:557params["type"] = type558559result = self.conn.execute(command_builder("script.getRealms", params))560return [RealmInfo.from_json(realm) for realm in result["realms"]]561562def _subscribe_to_log_entries(self):563if not self.log_entry_subscribed:564session = Session(self.conn)565self.conn.execute(session.subscribe(LogEntryAdded.event_class))566self.log_entry_subscribed = True567568def _unsubscribe_from_log_entries(self):569if self.log_entry_subscribed and LogEntryAdded.event_class not in self.conn.callbacks:570session = Session(self.conn)571self.conn.execute(session.unsubscribe(LogEntryAdded.event_class))572self.log_entry_subscribed = False573574def _handle_log_entry(self, type, handler):575def _handle_log_entry(log_entry):576if log_entry.type_ == type:577handler(log_entry)578579return _handle_log_entry580581582