Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/bidi/script.py
1864 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import datetime
19
import math
20
from dataclasses import dataclass
21
from typing import Any, Optional
22
23
from selenium.common.exceptions import WebDriverException
24
from selenium.webdriver.common.bidi.common import command_builder
25
26
from .log import LogEntryAdded
27
from .session import Session
28
29
30
class ResultOwnership:
31
"""Represents the possible result ownership types."""
32
33
NONE = "none"
34
ROOT = "root"
35
36
37
class RealmType:
38
"""Represents the possible realm types."""
39
40
WINDOW = "window"
41
DEDICATED_WORKER = "dedicated-worker"
42
SHARED_WORKER = "shared-worker"
43
SERVICE_WORKER = "service-worker"
44
WORKER = "worker"
45
PAINT_WORKLET = "paint-worklet"
46
AUDIO_WORKLET = "audio-worklet"
47
WORKLET = "worklet"
48
49
50
@dataclass
51
class RealmInfo:
52
"""Represents information about a realm."""
53
54
realm: str
55
origin: str
56
type: str
57
context: Optional[str] = None
58
sandbox: Optional[str] = None
59
60
@classmethod
61
def from_json(cls, json: dict[str, Any]) -> "RealmInfo":
62
"""Creates a RealmInfo instance from a dictionary.
63
64
Parameters:
65
-----------
66
json: A dictionary containing the realm information.
67
68
Returns:
69
-------
70
RealmInfo: A new instance of RealmInfo.
71
"""
72
if "realm" not in json:
73
raise ValueError("Missing required field 'realm' in RealmInfo")
74
if "origin" not in json:
75
raise ValueError("Missing required field 'origin' in RealmInfo")
76
if "type" not in json:
77
raise ValueError("Missing required field 'type' in RealmInfo")
78
79
return cls(
80
realm=json["realm"],
81
origin=json["origin"],
82
type=json["type"],
83
context=json.get("context"),
84
sandbox=json.get("sandbox"),
85
)
86
87
88
@dataclass
89
class Source:
90
"""Represents the source of a script message."""
91
92
realm: str
93
context: Optional[str] = None
94
95
@classmethod
96
def from_json(cls, json: dict[str, Any]) -> "Source":
97
"""Creates a Source instance from a dictionary.
98
99
Parameters:
100
-----------
101
json: A dictionary containing the source information.
102
103
Returns:
104
-------
105
Source: A new instance of Source.
106
"""
107
if "realm" not in json:
108
raise ValueError("Missing required field 'realm' in Source")
109
110
return cls(
111
realm=json["realm"],
112
context=json.get("context"),
113
)
114
115
116
@dataclass
117
class EvaluateResult:
118
"""Represents the result of script evaluation."""
119
120
type: str
121
realm: str
122
result: Optional[dict] = None
123
exception_details: Optional[dict] = None
124
125
@classmethod
126
def from_json(cls, json: dict[str, Any]) -> "EvaluateResult":
127
"""Creates an EvaluateResult instance from a dictionary.
128
129
Parameters:
130
-----------
131
json: A dictionary containing the evaluation result.
132
133
Returns:
134
-------
135
EvaluateResult: A new instance of EvaluateResult.
136
"""
137
if "realm" not in json:
138
raise ValueError("Missing required field 'realm' in EvaluateResult")
139
if "type" not in json:
140
raise ValueError("Missing required field 'type' in EvaluateResult")
141
142
return cls(
143
type=json["type"],
144
realm=json["realm"],
145
result=json.get("result"),
146
exception_details=json.get("exceptionDetails"),
147
)
148
149
150
class ScriptMessage:
151
"""Represents a script message event."""
152
153
event_class = "script.message"
154
155
def __init__(self, channel: str, data: dict, source: Source):
156
self.channel = channel
157
self.data = data
158
self.source = source
159
160
@classmethod
161
def from_json(cls, json: dict[str, Any]) -> "ScriptMessage":
162
"""Creates a ScriptMessage instance from a dictionary.
163
164
Parameters:
165
-----------
166
json: A dictionary containing the script message.
167
168
Returns:
169
-------
170
ScriptMessage: A new instance of ScriptMessage.
171
"""
172
if "channel" not in json:
173
raise ValueError("Missing required field 'channel' in ScriptMessage")
174
if "data" not in json:
175
raise ValueError("Missing required field 'data' in ScriptMessage")
176
if "source" not in json:
177
raise ValueError("Missing required field 'source' in ScriptMessage")
178
179
return cls(
180
channel=json["channel"],
181
data=json["data"],
182
source=Source.from_json(json["source"]),
183
)
184
185
186
class RealmCreated:
187
"""Represents a realm created event."""
188
189
event_class = "script.realmCreated"
190
191
def __init__(self, realm_info: RealmInfo):
192
self.realm_info = realm_info
193
194
@classmethod
195
def from_json(cls, json: dict[str, Any]) -> "RealmCreated":
196
"""Creates a RealmCreated instance from a dictionary.
197
198
Parameters:
199
-----------
200
json: A dictionary containing the realm created event.
201
202
Returns:
203
-------
204
RealmCreated: A new instance of RealmCreated.
205
"""
206
return cls(realm_info=RealmInfo.from_json(json))
207
208
209
class RealmDestroyed:
210
"""Represents a realm destroyed event."""
211
212
event_class = "script.realmDestroyed"
213
214
def __init__(self, realm: str):
215
self.realm = realm
216
217
@classmethod
218
def from_json(cls, json: dict[str, Any]) -> "RealmDestroyed":
219
"""Creates a RealmDestroyed instance from a dictionary.
220
221
Parameters:
222
-----------
223
json: A dictionary containing the realm destroyed event.
224
225
Returns:
226
-------
227
RealmDestroyed: A new instance of RealmDestroyed.
228
"""
229
if "realm" not in json:
230
raise ValueError("Missing required field 'realm' in RealmDestroyed")
231
232
return cls(realm=json["realm"])
233
234
235
class Script:
236
"""BiDi implementation of the script module."""
237
238
EVENTS = {
239
"message": "script.message",
240
"realm_created": "script.realmCreated",
241
"realm_destroyed": "script.realmDestroyed",
242
}
243
244
def __init__(self, conn, driver=None):
245
self.conn = conn
246
self.driver = driver
247
self.log_entry_subscribed = False
248
self.subscriptions = {}
249
self.callbacks = {}
250
251
# High-level APIs for SCRIPT module
252
253
def add_console_message_handler(self, handler):
254
self._subscribe_to_log_entries()
255
return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("console", handler))
256
257
def add_javascript_error_handler(self, handler):
258
self._subscribe_to_log_entries()
259
return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("javascript", handler))
260
261
def remove_console_message_handler(self, id):
262
self.conn.remove_callback(LogEntryAdded, id)
263
self._unsubscribe_from_log_entries()
264
265
remove_javascript_error_handler = remove_console_message_handler
266
267
def pin(self, script: str) -> str:
268
"""Pins a script to the current browsing context.
269
270
Parameters:
271
-----------
272
script: The script to pin.
273
274
Returns:
275
-------
276
str: The ID of the pinned script.
277
"""
278
return self._add_preload_script(script)
279
280
def unpin(self, script_id: str) -> None:
281
"""Unpins a script from the current browsing context.
282
283
Parameters:
284
-----------
285
script_id: The ID of the pinned script to unpin.
286
"""
287
self._remove_preload_script(script_id)
288
289
def execute(self, script: str, *args) -> dict:
290
"""Executes a script in the current browsing context.
291
292
Parameters:
293
-----------
294
script: The script function to execute.
295
*args: Arguments to pass to the script function.
296
297
Returns:
298
-------
299
dict: The result value from the script execution.
300
301
Raises:
302
------
303
WebDriverException: If the script execution fails.
304
"""
305
306
if self.driver is None:
307
raise WebDriverException("Driver reference is required for script execution")
308
browsing_context_id = self.driver.current_window_handle
309
310
# Convert arguments to the format expected by BiDi call_function (LocalValue Type)
311
arguments = []
312
for arg in args:
313
arguments.append(self.__convert_to_local_value(arg))
314
315
target = {"context": browsing_context_id}
316
317
result = self._call_function(
318
function_declaration=script, await_promise=True, target=target, arguments=arguments if arguments else None
319
)
320
321
if result.type == "success":
322
return result.result if result.result is not None else {}
323
else:
324
error_message = "Error while executing script"
325
if result.exception_details:
326
if "text" in result.exception_details:
327
error_message += f": {result.exception_details['text']}"
328
elif "message" in result.exception_details:
329
error_message += f": {result.exception_details['message']}"
330
331
raise WebDriverException(error_message)
332
333
def __convert_to_local_value(self, value) -> dict:
334
"""
335
Converts a Python value to BiDi LocalValue format.
336
"""
337
if value is None:
338
return {"type": "null"}
339
elif isinstance(value, bool):
340
return {"type": "boolean", "value": value}
341
elif isinstance(value, (int, float)):
342
if isinstance(value, float):
343
if math.isnan(value):
344
return {"type": "number", "value": "NaN"}
345
elif math.isinf(value):
346
if value > 0:
347
return {"type": "number", "value": "Infinity"}
348
else:
349
return {"type": "number", "value": "-Infinity"}
350
elif value == 0.0 and math.copysign(1.0, value) < 0:
351
return {"type": "number", "value": "-0"}
352
353
JS_MAX_SAFE_INTEGER = 9007199254740991
354
if isinstance(value, int) and (value > JS_MAX_SAFE_INTEGER or value < -JS_MAX_SAFE_INTEGER):
355
return {"type": "bigint", "value": str(value)}
356
357
return {"type": "number", "value": value}
358
359
elif isinstance(value, str):
360
return {"type": "string", "value": value}
361
elif isinstance(value, datetime.datetime):
362
# Convert Python datetime to JavaScript Date (ISO 8601 format)
363
return {"type": "date", "value": value.isoformat() + "Z" if value.tzinfo is None else value.isoformat()}
364
elif isinstance(value, datetime.date):
365
# Convert Python date to JavaScript Date
366
dt = datetime.datetime.combine(value, datetime.time.min).replace(tzinfo=datetime.timezone.utc)
367
return {"type": "date", "value": dt.isoformat()}
368
elif isinstance(value, set):
369
return {"type": "set", "value": [self.__convert_to_local_value(item) for item in value]}
370
elif isinstance(value, (list, tuple)):
371
return {"type": "array", "value": [self.__convert_to_local_value(item) for item in value]}
372
elif isinstance(value, dict):
373
return {
374
"type": "object",
375
"value": [
376
[self.__convert_to_local_value(k), self.__convert_to_local_value(v)] for k, v in value.items()
377
],
378
}
379
else:
380
# For other types, convert to string
381
return {"type": "string", "value": str(value)}
382
383
# low-level APIs for script module
384
def _add_preload_script(
385
self,
386
function_declaration: str,
387
arguments: Optional[list[dict[str, Any]]] = None,
388
contexts: Optional[list[str]] = None,
389
user_contexts: Optional[list[str]] = None,
390
sandbox: Optional[str] = None,
391
) -> str:
392
"""Adds a preload script.
393
394
Parameters:
395
-----------
396
function_declaration: The function declaration to preload.
397
arguments: The arguments to pass to the function.
398
contexts: The browsing context IDs to apply the script to.
399
user_contexts: The user context IDs to apply the script to.
400
sandbox: The sandbox name to apply the script to.
401
402
Returns:
403
-------
404
str: The preload script ID.
405
406
Raises:
407
------
408
ValueError: If both contexts and user_contexts are provided.
409
"""
410
if contexts is not None and user_contexts is not None:
411
raise ValueError("Cannot specify both contexts and user_contexts")
412
413
params: dict[str, Any] = {"functionDeclaration": function_declaration}
414
415
if arguments is not None:
416
params["arguments"] = arguments
417
if contexts is not None:
418
params["contexts"] = contexts
419
if user_contexts is not None:
420
params["userContexts"] = user_contexts
421
if sandbox is not None:
422
params["sandbox"] = sandbox
423
424
result = self.conn.execute(command_builder("script.addPreloadScript", params))
425
return result["script"]
426
427
def _remove_preload_script(self, script_id: str) -> None:
428
"""Removes a preload script.
429
430
Parameters:
431
-----------
432
script_id: The preload script ID to remove.
433
"""
434
params = {"script": script_id}
435
self.conn.execute(command_builder("script.removePreloadScript", params))
436
437
def _disown(self, handles: list[str], target: dict) -> None:
438
"""Disowns the given handles.
439
440
Parameters:
441
-----------
442
handles: The handles to disown.
443
target: The target realm or context.
444
"""
445
params = {
446
"handles": handles,
447
"target": target,
448
}
449
self.conn.execute(command_builder("script.disown", params))
450
451
def _call_function(
452
self,
453
function_declaration: str,
454
await_promise: bool,
455
target: dict,
456
arguments: Optional[list[dict]] = None,
457
result_ownership: Optional[str] = None,
458
serialization_options: Optional[dict] = None,
459
this: Optional[dict] = None,
460
user_activation: bool = False,
461
) -> EvaluateResult:
462
"""Calls a provided function with given arguments in a given realm.
463
464
Parameters:
465
-----------
466
function_declaration: The function declaration to call.
467
await_promise: Whether to await promise resolution.
468
target: The target realm or context.
469
arguments: The arguments to pass to the function.
470
result_ownership: The result ownership type.
471
serialization_options: The serialization options.
472
this: The 'this' value for the function call.
473
user_activation: Whether to trigger user activation.
474
475
Returns:
476
-------
477
EvaluateResult: The result of the function call.
478
"""
479
params = {
480
"functionDeclaration": function_declaration,
481
"awaitPromise": await_promise,
482
"target": target,
483
"userActivation": user_activation,
484
}
485
486
if arguments is not None:
487
params["arguments"] = arguments
488
if result_ownership is not None:
489
params["resultOwnership"] = result_ownership
490
if serialization_options is not None:
491
params["serializationOptions"] = serialization_options
492
if this is not None:
493
params["this"] = this
494
495
result = self.conn.execute(command_builder("script.callFunction", params))
496
return EvaluateResult.from_json(result)
497
498
def _evaluate(
499
self,
500
expression: str,
501
target: dict,
502
await_promise: bool,
503
result_ownership: Optional[str] = None,
504
serialization_options: Optional[dict] = None,
505
user_activation: bool = False,
506
) -> EvaluateResult:
507
"""Evaluates a provided script in a given realm.
508
509
Parameters:
510
-----------
511
expression: The script expression to evaluate.
512
target: The target realm or context.
513
await_promise: Whether to await promise resolution.
514
result_ownership: The result ownership type.
515
serialization_options: The serialization options.
516
user_activation: Whether to trigger user activation.
517
518
Returns:
519
-------
520
EvaluateResult: The result of the script evaluation.
521
"""
522
params = {
523
"expression": expression,
524
"target": target,
525
"awaitPromise": await_promise,
526
"userActivation": user_activation,
527
}
528
529
if result_ownership is not None:
530
params["resultOwnership"] = result_ownership
531
if serialization_options is not None:
532
params["serializationOptions"] = serialization_options
533
534
result = self.conn.execute(command_builder("script.evaluate", params))
535
return EvaluateResult.from_json(result)
536
537
def _get_realms(
538
self,
539
context: Optional[str] = None,
540
type: Optional[str] = None,
541
) -> list[RealmInfo]:
542
"""Returns a list of all realms, optionally filtered.
543
544
Parameters:
545
-----------
546
context: The browsing context ID to filter by.
547
type: The realm type to filter by.
548
549
Returns:
550
-------
551
List[RealmInfo]: A list of realm information.
552
"""
553
params = {}
554
555
if context is not None:
556
params["context"] = context
557
if type is not None:
558
params["type"] = type
559
560
result = self.conn.execute(command_builder("script.getRealms", params))
561
return [RealmInfo.from_json(realm) for realm in result["realms"]]
562
563
def _subscribe_to_log_entries(self):
564
if not self.log_entry_subscribed:
565
session = Session(self.conn)
566
self.conn.execute(session.subscribe(LogEntryAdded.event_class))
567
self.log_entry_subscribed = True
568
569
def _unsubscribe_from_log_entries(self):
570
if self.log_entry_subscribed and LogEntryAdded.event_class not in self.conn.callbacks:
571
session = Session(self.conn)
572
self.conn.execute(session.unsubscribe(LogEntryAdded.event_class))
573
self.log_entry_subscribed = False
574
575
def _handle_log_entry(self, type, handler):
576
def _handle_log_entry(log_entry):
577
if log_entry.type_ == type:
578
handler(log_entry)
579
580
return _handle_log_entry
581
582