Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/log.py
3991 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import json
19
import pkgutil
20
from collections.abc import AsyncGenerator
21
from contextlib import asynccontextmanager
22
from importlib import import_module
23
from typing import Any
24
25
from selenium.webdriver.common.by import By
26
27
cdp = None
28
29
30
def import_cdp():
31
global cdp
32
if not cdp:
33
cdp = import_module("selenium.webdriver.common.bidi.cdp")
34
35
36
class Log:
37
"""Class for accessing logging APIs using the WebDriver Bidi protocol.
38
39
This class is not to be used directly and should be used from the
40
webdriver base classes.
41
"""
42
43
def __init__(self, driver, bidi_session) -> None:
44
self.driver = driver
45
self.session = bidi_session.session
46
self.cdp = bidi_session.cdp
47
self.devtools = bidi_session.devtools
48
_pkg = ".".join(__name__.split(".")[:-1])
49
# Ensure _mutation_listener_js is not None before decoding
50
_mutation_listener_js_bytes: bytes | None = pkgutil.get_data(_pkg, "mutation-listener.js")
51
if _mutation_listener_js_bytes is None:
52
raise ValueError("Failed to load mutation-listener.js")
53
self._mutation_listener_js = _mutation_listener_js_bytes.decode("utf8").strip()
54
55
@asynccontextmanager
56
async def mutation_events(self) -> AsyncGenerator[dict[str, Any], None]:
57
"""Listen for mutation events and emit them as they are found.
58
59
Example:
60
async with driver.log.mutation_events() as event:
61
pages.load("dynamic.html")
62
driver.find_element(By.ID, "reveal").click()
63
WebDriverWait(driver, 5)\
64
.until(EC.visibility_of(driver.find_element(By.ID, "revealed")))
65
66
assert event["attribute_name"] == "style"
67
assert event["current_value"] == ""
68
assert event["old_value"] == "display:none;"
69
"""
70
page = self.cdp.get_session_context("page.enable")
71
await page.execute(self.devtools.page.enable())
72
runtime = self.cdp.get_session_context("runtime.enable")
73
await runtime.execute(self.devtools.runtime.enable())
74
await runtime.execute(self.devtools.runtime.add_binding("__webdriver_attribute"))
75
self.driver.pin_script(self._mutation_listener_js)
76
script_key = await page.execute(
77
self.devtools.page.add_script_to_evaluate_on_new_document(self._mutation_listener_js)
78
)
79
self.driver.pin_script(self._mutation_listener_js, script_key)
80
self.driver.execute_script(f"return {self._mutation_listener_js}")
81
82
event: dict[str, Any] = {}
83
async with runtime.wait_for(self.devtools.runtime.BindingCalled) as evnt:
84
yield event
85
86
payload = json.loads(evnt.value.payload)
87
elements: list = self.driver.find_elements(By.CSS_SELECTOR, f'*[data-__webdriver_id="{payload["target"]}"]')
88
if not elements:
89
elements.append(None)
90
event["element"] = elements[0]
91
event["attribute_name"] = payload["name"]
92
event["current_value"] = payload["value"]
93
event["old_value"] = payload["oldValue"]
94
95
@asynccontextmanager
96
async def add_js_error_listener(self) -> AsyncGenerator[dict[str, Any], None]:
97
"""Listen for JS errors and check if they occurred when the context manager exits.
98
99
Example:
100
async with driver.log.add_js_error_listener() as error:
101
driver.find_element(By.ID, "throwing-mouseover").click()
102
assert bool(error)
103
assert error.exception_details.stack_trace.call_frames[0].function_name == "onmouseover"
104
"""
105
session = self.cdp.get_session_context("page.enable")
106
await session.execute(self.devtools.page.enable())
107
session = self.cdp.get_session_context("runtime.enable")
108
await session.execute(self.devtools.runtime.enable())
109
js_exception = self.devtools.runtime.ExceptionThrown(None, None)
110
async with session.wait_for(self.devtools.runtime.ExceptionThrown) as exception:
111
yield js_exception
112
js_exception.timestamp = exception.value.timestamp
113
js_exception.exception_details = exception.value.exception_details
114
115
@asynccontextmanager
116
async def add_listener(self, event_type) -> AsyncGenerator[dict[str, Any], None]:
117
"""Listen for certain events that are passed in.
118
119
Args:
120
event_type: The type of event that we want to look at.
121
122
Example:
123
async with driver.log.add_listener(Console.log) as messages:
124
driver.execute_script("console.log('I like cheese')")
125
assert messages["message"] == "I love cheese"
126
"""
127
from selenium.webdriver.common.bidi.console import Console
128
129
session = self.cdp.get_session_context("page.enable")
130
await session.execute(self.devtools.page.enable())
131
session = self.cdp.get_session_context("runtime.enable")
132
await session.execute(self.devtools.runtime.enable())
133
console: dict[str, Any] = {"message": None, "level": None}
134
async with session.wait_for(self.devtools.runtime.ConsoleAPICalled) as messages:
135
yield console
136
137
if event_type == Console.ALL or event_type.value == messages.value.type_:
138
console["message"] = messages.value.args[0].value
139
console["level"] = messages.value.args[0].type_
140
141