Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
seleniumhq
GitHub Repository: seleniumhq/selenium
Path: blob/trunk/py/private/_script_handlers.py
11809 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
"""High-level script-module helpers for the WebDriver BiDi script module.
19
20
This module is copied verbatim into the generated ``selenium.webdriver.common.bidi``
21
package by Bazel (see ``create-bidi-src`` in ``py/BUILD.bazel``). The generated
22
``script`` module re-exports the public classes and instantiates the registries,
23
which layer the cross-binding BiDi API design's handler surface on top of the
24
CDDL-generated low-level commands:
25
26
- :class:`LogHandlerRegistry` owns a single ``log.entryAdded`` subscription and
27
routes entries to console and JavaScript-error handlers. Handlers registered
28
through the doc-aligned ``add_console_handler`` / ``add_error_handler`` receive
29
:class:`ConsoleMessage` / :class:`ScriptError` payloads carrying source URL,
30
line and column numbers extracted from the BiDi stack trace; handlers
31
registered through the longer-standing ``add_console_message_handler`` /
32
``add_javascript_error_handler`` keep receiving the generated log-entry
33
dataclasses unchanged.
34
- :class:`DomMutationRegistry` owns the DOM-observation preload script and the
35
``script.message`` channel subscription, and dispatches :class:`DomMutation`
36
payloads. Beyond attribute changes it can observe ``childList`` and
37
``characterData`` mutations on request.
38
- :class:`PinnedScript` and :class:`ScriptResult` implement the design doc's
39
pinned-script surface: ``pin()`` returns a :class:`PinnedScript` (a ``str``
40
subclass, so code treating it as a plain script ID keeps working) and
41
``execute(pinned, code)`` returns a non-raising :class:`ScriptResult`.
42
"""
43
44
from __future__ import annotations
45
46
import json
47
import logging
48
import threading
49
import uuid
50
from collections.abc import Callable, Iterable
51
from dataclasses import dataclass, field
52
from typing import Any
53
54
logger = logging.getLogger(__name__)
55
56
# DOM mutation listener used as a BiDi preload script. This extends the
57
# shared javascript/bidi-support/bidi-mutation-listener.js (which only emits
58
# attribute mutations) with opt-in childList and characterData reporting,
59
# selected through the ``options`` argument so each registration only emits
60
# the mutation types it asked for. Kept Python-side until the shared listener
61
# grows the same options across bindings.
62
DOM_MUTATION_LISTENER_JS = """\
63
function observeMutations(channel, options) {
64
const config = options || { attributes: true }
65
const idFor = (element) => {
66
let id = element.dataset.__webdriver_id
67
if (!id) {
68
id = Math.random().toString(36).substring(2) + Date.now().toString(36)
69
element.dataset.__webdriver_id = id
70
}
71
return id
72
}
73
const describeNode = (node) => {
74
const description = { nodeType: node.nodeType, nodeName: node.nodeName }
75
if (node.nodeType === Node.ELEMENT_NODE) {
76
description.id = idFor(node)
77
} else {
78
description.value = node.nodeValue
79
}
80
return description
81
}
82
const observer = new MutationObserver((mutations) => {
83
for (const mutation of mutations) {
84
switch (mutation.type) {
85
case 'attributes': {
86
if (!config.attributes) break
87
// Don't report our own attribute has changed.
88
if (mutation.attributeName === 'data-__webdriver_id') break
89
channel(JSON.stringify({
90
type: 'attributes',
91
target: idFor(mutation.target),
92
name: mutation.attributeName,
93
value: mutation.target.getAttribute(mutation.attributeName),
94
oldValue: mutation.oldValue,
95
}))
96
break
97
}
98
case 'childList': {
99
if (!config.childList) break
100
channel(JSON.stringify({
101
type: 'childList',
102
target: mutation.target.nodeType === Node.ELEMENT_NODE ? idFor(mutation.target) : null,
103
addedNodes: Array.from(mutation.addedNodes, describeNode),
104
removedNodes: Array.from(mutation.removedNodes, describeNode),
105
}))
106
break
107
}
108
case 'characterData': {
109
if (!config.characterData) break
110
const parent = mutation.target.parentElement
111
channel(JSON.stringify({
112
type: 'characterData',
113
target: parent ? idFor(parent) : null,
114
value: mutation.target.data,
115
oldValue: mutation.oldValue,
116
}))
117
break
118
}
119
default:
120
break
121
}
122
}
123
})
124
const observeInit = { subtree: true }
125
if (config.attributes) {
126
observeInit.attributes = true
127
observeInit.attributeOldValue = true
128
}
129
if (config.childList) {
130
observeInit.childList = true
131
}
132
if (config.characterData) {
133
observeInit.characterData = true
134
observeInit.characterDataOldValue = true
135
}
136
observer.observe(document, observeInit)
137
}
138
"""
139
140
141
@dataclass
142
class ScriptError:
143
"""A JavaScript error observed in the browser.
144
145
Attributes:
146
message: The error message.
147
source: Source file/URL where the error occurred (top stack frame).
148
line_number: Line number of the error.
149
column_number: Column number of the error.
150
stack_trace: Formatted stack trace, one ``at function (url:line:col)``
151
line per frame.
152
timestamp: Time the entry was generated, in milliseconds since epoch.
153
"""
154
155
message: str | None = None
156
source: str | None = None
157
line_number: int | None = None
158
column_number: int | None = None
159
stack_trace: str | None = None
160
timestamp: float | None = None
161
162
163
@dataclass
164
class ConsoleMessage:
165
"""A console message observed in the browser.
166
167
Attributes:
168
level: Console level (``debug``, ``info``, ``warn`` or ``error``).
169
text: The console message text.
170
source: Source file/URL the message originated from (top stack frame).
171
line_number: Line number where the message originated.
172
column_number: Column number where the message originated.
173
stack_trace: Formatted stack trace where available.
174
timestamp: Time the entry was generated, in milliseconds since epoch.
175
method: The console method used (e.g. ``log``, ``warn``).
176
args: The raw BiDi RemoteValue arguments passed to the console call.
177
"""
178
179
level: str | None = None
180
text: str | None = None
181
source: str | None = None
182
line_number: int | None = None
183
column_number: int | None = None
184
stack_trace: str | None = None
185
timestamp: float | None = None
186
method: str | None = None
187
args: list[Any] | None = None
188
189
190
@dataclass
191
class ScriptResult:
192
"""Result of executing a pinned script, without raising on failure.
193
194
Attributes:
195
value: The BiDi RemoteValue result of the execution, or ``None``
196
when the script raised.
197
error: A :class:`ScriptError` describing the failure, or ``None``
198
on success.
199
realm: The realm the script was executed in.
200
"""
201
202
value: Any | None = None
203
error: ScriptError | None = None
204
realm: str | None = None
205
206
207
class PinnedScript(str):
208
"""Identifier of a pinned script, as returned by ``Script.pin()``.
209
210
Subclasses ``str`` so existing code that treats the return value of
211
``pin()`` as a plain script ID string keeps working, while exposing the
212
cross-binding API design's ``id``, ``source`` and ``realm`` properties.
213
"""
214
215
def __new__(cls, script_id: str, source: str | None = None, realm: str | None = None):
216
instance = super().__new__(cls, script_id)
217
instance._source = source
218
instance._realm = realm
219
return instance
220
221
@property
222
def id(self) -> str:
223
"""The unique identifier of the pinned script."""
224
return str(self)
225
226
@property
227
def source(self) -> str | None:
228
"""The JavaScript source the script was pinned with."""
229
return self._source
230
231
@property
232
def realm(self) -> str | None:
233
"""The realm the script is associated with, where known."""
234
return self._realm
235
236
def __repr__(self) -> str:
237
return f"PinnedScript(id={str.__repr__(self)}, realm={self._realm!r})"
238
239
240
@dataclass
241
class DomMutation:
242
"""Represents a DOM mutation event from add_dom_mutation_handler.
243
244
Attributes:
245
element_id: The ``data-__webdriver_id`` attribute value set on the
246
mutated element by the MutationObserver. Use this to locate the
247
element from the main thread if needed. For ``characterData``
248
mutations this identifies the parent element of the text node.
249
attribute_name: The name of the changed attribute (``attributes``
250
mutations only).
251
current_value: The value after the mutation (attribute value or
252
character data; ``None`` if the attribute was removed).
253
old_value: The value before the mutation.
254
type: The mutation type: ``attributes``, ``childList`` or
255
``characterData``.
256
target: Same identifier as ``element_id``; named per the
257
cross-binding BiDi API design.
258
added_nodes: Node descriptors added by a ``childList`` mutation.
259
Element descriptors carry ``nodeType``/``nodeName``/``id``
260
(a ``data-__webdriver_id`` value); other nodes carry
261
``nodeType``/``nodeName``/``value``.
262
removed_nodes: Node descriptors removed by a ``childList`` mutation.
263
"""
264
265
element_id: str | None = None
266
attribute_name: str | None = None
267
current_value: str | None = None
268
old_value: str | None = None
269
type: str | None = None
270
target: str | None = None
271
added_nodes: list[Any] = field(default_factory=list)
272
removed_nodes: list[Any] = field(default_factory=list)
273
274
275
def _stack_frames(stack_trace: Any) -> list[dict]:
276
if isinstance(stack_trace, dict):
277
frames = stack_trace.get("callFrames")
278
if isinstance(frames, list):
279
return [frame for frame in frames if isinstance(frame, dict)]
280
return []
281
282
283
def _format_stack_trace(stack_trace: Any) -> str | None:
284
frames = _stack_frames(stack_trace)
285
if not frames:
286
return None
287
lines = []
288
for frame in frames:
289
name = frame.get("functionName") or "<anonymous>"
290
lines.append(f" at {name} ({frame.get('url')}:{frame.get('lineNumber')}:{frame.get('columnNumber')})")
291
return "\n".join(lines)
292
293
294
def console_message_from_log_entry(params: dict) -> ConsoleMessage:
295
"""Build a :class:`ConsoleMessage` from raw ``log.entryAdded`` params."""
296
frames = _stack_frames(params.get("stackTrace"))
297
top = frames[0] if frames else {}
298
return ConsoleMessage(
299
level=params.get("level"),
300
text=params.get("text"),
301
source=top.get("url"),
302
line_number=top.get("lineNumber"),
303
column_number=top.get("columnNumber"),
304
stack_trace=_format_stack_trace(params.get("stackTrace")),
305
timestamp=params.get("timestamp"),
306
method=params.get("method"),
307
args=params.get("args"),
308
)
309
310
311
def script_error_from_log_entry(params: dict) -> ScriptError:
312
"""Build a :class:`ScriptError` from raw ``log.entryAdded`` params."""
313
frames = _stack_frames(params.get("stackTrace"))
314
top = frames[0] if frames else {}
315
return ScriptError(
316
message=params.get("text"),
317
source=top.get("url"),
318
line_number=top.get("lineNumber"),
319
column_number=top.get("columnNumber"),
320
stack_trace=_format_stack_trace(params.get("stackTrace")),
321
timestamp=params.get("timestamp"),
322
)
323
324
325
def script_error_from_exception_details(details: dict) -> ScriptError:
326
"""Build a :class:`ScriptError` from BiDi ``script.ExceptionDetails``."""
327
frames = _stack_frames(details.get("stackTrace"))
328
top = frames[0] if frames else {}
329
return ScriptError(
330
message=details.get("text"),
331
source=top.get("url"),
332
line_number=details.get("lineNumber"),
333
column_number=details.get("columnNumber"),
334
stack_trace=_format_stack_trace(details.get("stackTrace")),
335
)
336
337
338
def dom_mutation_from_payload(payload: dict) -> DomMutation:
339
"""Build a :class:`DomMutation` from a mutation-listener channel payload."""
340
target = payload.get("target")
341
target_id = None if target is None else str(target)
342
return DomMutation(
343
element_id=target_id,
344
attribute_name=payload.get("name"),
345
current_value=payload.get("value"),
346
old_value=payload.get("oldValue"),
347
type=payload.get("type", "attributes"),
348
target=target_id,
349
added_nodes=list(payload.get("addedNodes") or []),
350
removed_nodes=list(payload.get("removedNodes") or []),
351
)
352
353
354
class _EventRef:
355
"""Minimal event wrapper accepted by WebSocketConnection callbacks."""
356
357
def __init__(self, event_class: str) -> None:
358
self.event_class = event_class
359
360
def from_json(self, params: Any) -> Any:
361
return params
362
363
364
def _subscribe_to_event(conn: Any, event: str) -> str | None:
365
from selenium.webdriver.common.bidi.session import Session
366
367
result = Session(conn).subscribe([event])
368
return result.get("subscription") if isinstance(result, dict) else None
369
370
371
def _unsubscribe_from_event(conn: Any, event: str, subscription_id: str | None) -> None:
372
from selenium.webdriver.common.bidi.session import Session
373
374
session = Session(conn)
375
if subscription_id:
376
session.unsubscribe(subscriptions=[subscription_id])
377
else:
378
session.unsubscribe(events=[event])
379
380
381
def _legacy_log_entry(params: dict) -> Any:
382
"""Deserialize raw log params into the generated log-entry dataclasses."""
383
from selenium.webdriver.common.bidi import log as log_mod
384
385
cls_name = {"console": "ConsoleLogEntry", "javascript": "JavascriptLogEntry"}.get(params.get("type"))
386
if cls_name:
387
cls = getattr(log_mod, cls_name, None)
388
if cls is not None and hasattr(cls, "from_json"):
389
try:
390
return cls.from_json(params)
391
except Exception:
392
pass
393
return params
394
395
396
def execute_pinned(script: Any, pinned: PinnedScript, code: str, context_id: str | None = None) -> ScriptResult:
397
"""Execute ``code`` with a pinned script's source in scope.
398
399
The pinned source and the code are wrapped into a single function and
400
evaluated via ``script.callFunction`` in the given (or current) browsing
401
context, so functions declared by the pinned source are callable from
402
``code``. Unlike ``Script.execute``, failures do not raise: they are
403
reported through :attr:`ScriptResult.error`.
404
"""
405
source = pinned.source if isinstance(pinned, PinnedScript) else None
406
declaration = "function() {\n" + (source or "") + "\n" + (code or "") + "\n}"
407
if context_id is None and getattr(script, "_driver", None) is not None:
408
try:
409
context_id = script._driver.current_window_handle
410
except Exception:
411
pass
412
target = {"context": context_id} if context_id else {}
413
raw = script.call_function(
414
function_declaration=declaration,
415
await_promise=True,
416
target=target,
417
)
418
if isinstance(raw, dict):
419
realm = raw.get("realm")
420
if raw.get("type") == "exception":
421
details = raw.get("exceptionDetails")
422
details = details if isinstance(details, dict) else {}
423
return ScriptResult(value=None, error=script_error_from_exception_details(details), realm=realm)
424
if raw.get("type") == "success":
425
return ScriptResult(value=raw.get("result"), error=None, realm=realm)
426
return ScriptResult(value=raw, error=None, realm=None)
427
428
429
class LogHandlerRegistry:
430
"""Routes ``log.entryAdded`` events to console and error handlers.
431
432
All console and JavaScript-error handlers share one BiDi session
433
subscription, created when the first handler is added and removed when
434
the last one is removed. Each handler is tracked under a category
435
(``console`` or ``error``) so ``clear_console_handlers`` /
436
``clear_error_handlers`` can remove every handler of that category,
437
regardless of which ``add_*`` method registered it.
438
"""
439
440
EVENT = "log.entryAdded"
441
CONSOLE = "console"
442
ERROR = "error"
443
_CATEGORY_ENTRY_TYPES = {CONSOLE: "console", ERROR: "javascript"}
444
445
def __init__(self, script: Any) -> None:
446
self._script = script
447
self._lock = threading.Lock()
448
self._subscription_id: str | None = None
449
self._categories: dict[int, str] = {}
450
451
def add_handler(self, callback: Callable, category: str, legacy: bool = False) -> int:
452
"""Register a handler and subscribe to ``log.entryAdded`` if needed.
453
454
Args:
455
callback: User callback invoked with the shaped payload.
456
category: ``console`` or ``error``.
457
legacy: When ``True`` the callback receives the generated
458
``ConsoleLogEntry`` / ``JavascriptLogEntry`` dataclasses;
459
otherwise it receives :class:`ConsoleMessage` /
460
:class:`ScriptError`.
461
"""
462
entry_type = self._CATEGORY_ENTRY_TYPES[category]
463
464
def _dispatch(params: Any) -> None:
465
if not isinstance(params, dict) or params.get("type") != entry_type:
466
return
467
if legacy:
468
payload = _legacy_log_entry(params)
469
elif category == self.CONSOLE:
470
payload = console_message_from_log_entry(params)
471
else:
472
payload = script_error_from_log_entry(params)
473
callback(payload)
474
475
conn = self._script._conn
476
with self._lock:
477
callback_id = conn.add_callback(_EventRef(self.EVENT), _dispatch)
478
if not self._categories:
479
try:
480
self._subscription_id = _subscribe_to_event(conn, self.EVENT)
481
except Exception:
482
conn.remove_callback(_EventRef(self.EVENT), callback_id)
483
raise
484
self._categories[callback_id] = category
485
return callback_id
486
487
def remove_handler(self, callback_id: int) -> None:
488
"""Remove a handler; drops the session subscription with the last one."""
489
conn = self._script._conn
490
conn.remove_callback(_EventRef(self.EVENT), callback_id)
491
with self._lock:
492
removed = self._categories.pop(callback_id, None)
493
if removed is not None and not self._categories:
494
_unsubscribe_from_event(conn, self.EVENT, self._subscription_id)
495
self._subscription_id = None
496
497
def clear_handlers(self, category: str) -> None:
498
"""Remove every handler registered under ``category``."""
499
with self._lock:
500
ids = [callback_id for callback_id, cat in self._categories.items() if cat == category]
501
for callback_id in ids:
502
self.remove_handler(callback_id)
503
504
505
class DomMutationRegistry:
506
"""Owns the DOM-observation preload script and channel subscription.
507
508
The first handler installs a preload script observing the requested
509
mutation types and subscribes to ``script.message``; later handlers that
510
request additional mutation types install one further observer covering
511
only the missing types, so no mutation is reported twice. Each handler
512
only receives the mutation types it asked for. When the last handler is
513
removed the subscription and every observer preload script are removed.
514
"""
515
516
EVENT = "script.message"
517
MUTATION_TYPES = ("attributes", "childList", "characterData")
518
DEFAULT_MUTATION_TYPES = ("attributes",)
519
520
def __init__(self, script: Any) -> None:
521
self._script = script
522
self._lock = threading.Lock()
523
self._channel: str | None = None
524
self._subscription_id: str | None = None
525
self._handlers: dict[int, frozenset[str]] = {}
526
self._preload_script_ids: list[str] = []
527
self._active_types: set[str] = set()
528
529
def _normalize_types(self, mutation_types: str | Iterable[str] | None) -> frozenset[str]:
530
if mutation_types is None:
531
return frozenset(self.DEFAULT_MUTATION_TYPES)
532
if isinstance(mutation_types, str):
533
mutation_types = (mutation_types,)
534
types = frozenset(mutation_types)
535
unknown = types - set(self.MUTATION_TYPES)
536
if unknown:
537
raise ValueError(
538
f"Unsupported DOM mutation type(s) {sorted(unknown)}; expected a subset of {self.MUTATION_TYPES}"
539
)
540
if not types:
541
raise ValueError("mutation_types must name at least one mutation type")
542
return types
543
544
def _channel_argument(self) -> dict:
545
if self._channel is None:
546
# Stable, namespaced channel to avoid collisions with user scripts.
547
self._channel = f"selenium.domMutation.{uuid.uuid4().hex}"
548
return {"type": "channel", "value": {"channel": self._channel}}
549
550
def _listener_declaration(self, types: set[str]) -> str:
551
# script.addPreloadScript arguments may only be channels, so the
552
# observation options are inlined into the function declaration.
553
options = json.dumps({name: True for name in sorted(types)})
554
return "function(channel) { return (" + DOM_MUTATION_LISTENER_JS + ")(channel, " + options + "); }"
555
556
def _observe_types(self, channel_arg: dict, types: set[str]) -> None:
557
declaration = self._listener_declaration(types)
558
preload_script_id = self._script._add_preload_script(declaration, arguments=[channel_arg])
559
self._preload_script_ids.append(preload_script_id)
560
# Preload scripts only fire on future document creations, so also
561
# invoke the observer immediately on the current page.
562
driver = getattr(self._script, "_driver", None)
563
if driver is not None:
564
context = None
565
try:
566
context = driver.current_window_handle
567
except Exception:
568
pass
569
if context is not None:
570
self._script.call_function(
571
function_declaration=declaration,
572
target={"context": context},
573
await_promise=False,
574
arguments=[channel_arg],
575
)
576
577
def add_handler(self, callback: Callable, mutation_types: str | Iterable[str] | None = None) -> int:
578
"""Register a mutation handler for the given mutation types."""
579
types = self._normalize_types(mutation_types)
580
581
def _dispatch(message: Any) -> None:
582
if not isinstance(message, dict) or message.get("channel") != self._channel:
583
return
584
data = message.get("data")
585
value = data.get("value") if isinstance(data, dict) else None
586
if value is None:
587
return
588
try:
589
payload = json.loads(value)
590
except (ValueError, TypeError):
591
return
592
if not isinstance(payload, dict):
593
return
594
mutation = dom_mutation_from_payload(payload)
595
if mutation.type == "attributes" and not mutation.element_id and mutation.element_id != "0":
596
return
597
if mutation.type in types:
598
callback(mutation)
599
600
conn = self._script._conn
601
with self._lock:
602
channel_arg = self._channel_argument()
603
missing = set(types) - self._active_types
604
if missing:
605
self._observe_types(channel_arg, missing)
606
self._active_types |= missing
607
if not self._handlers:
608
self._subscription_id = _subscribe_to_event(conn, self.EVENT)
609
# Register the callback AFTER setup to avoid leaking it if setup fails.
610
callback_id = conn.add_callback(_EventRef(self.EVENT), _dispatch)
611
self._handlers[callback_id] = types
612
return callback_id
613
614
def remove_handler(self, callback_id: int) -> None:
615
"""Remove a handler; tears down observers with the last one."""
616
conn = self._script._conn
617
conn.remove_callback(_EventRef(self.EVENT), callback_id)
618
with self._lock:
619
removed = self._handlers.pop(callback_id, None)
620
if removed is not None and not self._handlers:
621
self._teardown(conn)
622
623
def clear_handlers(self) -> None:
624
"""Remove every DOM mutation handler."""
625
with self._lock:
626
ids = list(self._handlers)
627
for callback_id in ids:
628
self.remove_handler(callback_id)
629
630
def _teardown(self, conn: Any) -> None:
631
try:
632
_unsubscribe_from_event(conn, self.EVENT, self._subscription_id)
633
finally:
634
self._subscription_id = None
635
preload_script_ids, self._preload_script_ids = self._preload_script_ids, []
636
self._active_types = set()
637
for preload_script_id in preload_script_ids:
638
try:
639
self._script._remove_preload_script(preload_script_id)
640
except Exception:
641
logger.warning("Failed to remove DOM mutation preload script %s", preload_script_id)
642
643