Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/log.py
1864 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import json
19
import pkgutil
20
from collections.abc import AsyncGenerator
21
from contextlib import asynccontextmanager
22
from importlib import import_module
23
from typing import Any, Optional
24
25
from selenium.webdriver.common.by import By
26
27
cdp = None
28
29
30
def import_cdp():
31
global cdp
32
if not cdp:
33
cdp = import_module("selenium.webdriver.common.bidi.cdp")
34
35
36
class Log:
37
"""This class allows access to logging APIs that use the new WebDriver Bidi
38
protocol.
39
40
This class is not to be used directly and should be used from the
41
webdriver base classes.
42
"""
43
44
def __init__(self, driver, bidi_session) -> None:
45
self.driver = driver
46
self.session = bidi_session.session
47
self.cdp = bidi_session.cdp
48
self.devtools = bidi_session.devtools
49
_pkg = ".".join(__name__.split(".")[:-1])
50
# Ensure _mutation_listener_js is not None before decoding
51
_mutation_listener_js_bytes: Optional[bytes] = pkgutil.get_data(_pkg, "mutation-listener.js")
52
if _mutation_listener_js_bytes is None:
53
raise ValueError("Failed to load mutation-listener.js")
54
self._mutation_listener_js = _mutation_listener_js_bytes.decode("utf8").strip()
55
56
@asynccontextmanager
57
async def mutation_events(self) -> AsyncGenerator[dict[str, Any], None]:
58
"""Listen for mutation events and emit them as they are found.
59
60
:Usage:
61
::
62
63
async with driver.log.mutation_events() as event:
64
pages.load("dynamic.html")
65
driver.find_element(By.ID, "reveal").click()
66
WebDriverWait(driver, 5)\
67
.until(EC.visibility_of(driver.find_element(By.ID, "revealed")))
68
69
assert event["attribute_name"] == "style"
70
assert event["current_value"] == ""
71
assert event["old_value"] == "display:none;"
72
"""
73
74
page = self.cdp.get_session_context("page.enable")
75
await page.execute(self.devtools.page.enable())
76
runtime = self.cdp.get_session_context("runtime.enable")
77
await runtime.execute(self.devtools.runtime.enable())
78
await runtime.execute(self.devtools.runtime.add_binding("__webdriver_attribute"))
79
self.driver.pin_script(self._mutation_listener_js)
80
script_key = await page.execute(
81
self.devtools.page.add_script_to_evaluate_on_new_document(self._mutation_listener_js)
82
)
83
self.driver.pin_script(self._mutation_listener_js, script_key)
84
self.driver.execute_script(f"return {self._mutation_listener_js}")
85
86
event: dict[str, Any] = {}
87
async with runtime.wait_for(self.devtools.runtime.BindingCalled) as evnt:
88
yield event
89
90
payload = json.loads(evnt.value.payload)
91
elements: list = self.driver.find_elements(By.CSS_SELECTOR, f'*[data-__webdriver_id="{payload["target"]}"]')
92
if not elements:
93
elements.append(None)
94
event["element"] = elements[0]
95
event["attribute_name"] = payload["name"]
96
event["current_value"] = payload["value"]
97
event["old_value"] = payload["oldValue"]
98
99
@asynccontextmanager
100
async def add_js_error_listener(self) -> AsyncGenerator[dict[str, Any], None]:
101
"""Listen for JS errors and when the contextmanager exits check if
102
there were JS Errors.
103
104
:Usage:
105
::
106
107
async with driver.log.add_js_error_listener() as error:
108
driver.find_element(By.ID, "throwing-mouseover").click()
109
assert bool(error)
110
assert error.exception_details.stack_trace.call_frames[0].function_name == "onmouseover"
111
"""
112
113
session = self.cdp.get_session_context("page.enable")
114
await session.execute(self.devtools.page.enable())
115
session = self.cdp.get_session_context("runtime.enable")
116
await session.execute(self.devtools.runtime.enable())
117
js_exception = self.devtools.runtime.ExceptionThrown(None, None)
118
async with session.wait_for(self.devtools.runtime.ExceptionThrown) as exception:
119
yield js_exception
120
js_exception.timestamp = exception.value.timestamp
121
js_exception.exception_details = exception.value.exception_details
122
123
@asynccontextmanager
124
async def add_listener(self, event_type) -> AsyncGenerator[dict[str, Any], None]:
125
"""Listen for certain events that are passed in.
126
127
:Args:
128
- event_type: The type of event that we want to look at.
129
130
:Usage:
131
::
132
133
async with driver.log.add_listener(Console.log) as messages:
134
driver.execute_script("console.log('I like cheese')")
135
assert messages["message"] == "I love cheese"
136
"""
137
138
from selenium.webdriver.common.bidi.console import Console
139
140
session = self.cdp.get_session_context("page.enable")
141
await session.execute(self.devtools.page.enable())
142
session = self.cdp.get_session_context("runtime.enable")
143
await session.execute(self.devtools.runtime.enable())
144
console: dict[str, Any] = {"message": None, "level": None}
145
async with session.wait_for(self.devtools.runtime.ConsoleAPICalled) as messages:
146
yield console
147
148
if event_type == Console.ALL or event_type.value == messages.value.type_:
149
console["message"] = messages.value.args[0].value
150
console["level"] = messages.value.args[0].type_
151
152