Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/bidi/script.py
3992 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import datetime
19
import math
20
from dataclasses import dataclass
21
from typing import Any
22
23
from selenium.common.exceptions import WebDriverException
24
from selenium.webdriver.common.bidi.common import command_builder
25
from selenium.webdriver.common.bidi.log import LogEntryAdded
26
from selenium.webdriver.common.bidi.session import Session
27
28
29
class ResultOwnership:
30
"""Represents the possible result ownership types."""
31
32
NONE = "none"
33
ROOT = "root"
34
35
36
class RealmType:
37
"""Represents the possible realm types."""
38
39
WINDOW = "window"
40
DEDICATED_WORKER = "dedicated-worker"
41
SHARED_WORKER = "shared-worker"
42
SERVICE_WORKER = "service-worker"
43
WORKER = "worker"
44
PAINT_WORKLET = "paint-worklet"
45
AUDIO_WORKLET = "audio-worklet"
46
WORKLET = "worklet"
47
48
49
@dataclass
50
class RealmInfo:
51
"""Represents information about a realm."""
52
53
realm: str
54
origin: str
55
type: str
56
context: str | None = None
57
sandbox: str | None = None
58
59
@classmethod
60
def from_json(cls, json: dict[str, Any]) -> "RealmInfo":
61
"""Creates a RealmInfo instance from a dictionary.
62
63
Args:
64
json: A dictionary containing the realm information.
65
66
Returns:
67
RealmInfo: A new instance of RealmInfo.
68
"""
69
if "realm" not in json:
70
raise ValueError("Missing required field 'realm' in RealmInfo")
71
if "origin" not in json:
72
raise ValueError("Missing required field 'origin' in RealmInfo")
73
if "type" not in json:
74
raise ValueError("Missing required field 'type' in RealmInfo")
75
76
return cls(
77
realm=json["realm"],
78
origin=json["origin"],
79
type=json["type"],
80
context=json.get("context"),
81
sandbox=json.get("sandbox"),
82
)
83
84
85
@dataclass
86
class Source:
87
"""Represents the source of a script message."""
88
89
realm: str
90
context: str | None = None
91
92
@classmethod
93
def from_json(cls, json: dict[str, Any]) -> "Source":
94
"""Creates a Source instance from a dictionary.
95
96
Args:
97
json: A dictionary containing the source information.
98
99
Returns:
100
Source: A new instance of Source.
101
"""
102
if "realm" not in json:
103
raise ValueError("Missing required field 'realm' in Source")
104
105
return cls(
106
realm=json["realm"],
107
context=json.get("context"),
108
)
109
110
111
@dataclass
112
class EvaluateResult:
113
"""Represents the result of script evaluation."""
114
115
type: str
116
realm: str
117
result: dict | None = None
118
exception_details: dict | None = None
119
120
@classmethod
121
def from_json(cls, json: dict[str, Any]) -> "EvaluateResult":
122
"""Creates an EvaluateResult instance from a dictionary.
123
124
Args:
125
json: A dictionary containing the evaluation result.
126
127
Returns:
128
EvaluateResult: A new instance of EvaluateResult.
129
"""
130
if "realm" not in json:
131
raise ValueError("Missing required field 'realm' in EvaluateResult")
132
if "type" not in json:
133
raise ValueError("Missing required field 'type' in EvaluateResult")
134
135
return cls(
136
type=json["type"],
137
realm=json["realm"],
138
result=json.get("result"),
139
exception_details=json.get("exceptionDetails"),
140
)
141
142
143
class ScriptMessage:
144
"""Represents a script message event."""
145
146
event_class = "script.message"
147
148
def __init__(self, channel: str, data: dict, source: Source):
149
self.channel = channel
150
self.data = data
151
self.source = source
152
153
@classmethod
154
def from_json(cls, json: dict[str, Any]) -> "ScriptMessage":
155
"""Creates a ScriptMessage instance from a dictionary.
156
157
Args:
158
json: A dictionary containing the script message.
159
160
Returns:
161
ScriptMessage: A new instance of ScriptMessage.
162
"""
163
if "channel" not in json:
164
raise ValueError("Missing required field 'channel' in ScriptMessage")
165
if "data" not in json:
166
raise ValueError("Missing required field 'data' in ScriptMessage")
167
if "source" not in json:
168
raise ValueError("Missing required field 'source' in ScriptMessage")
169
170
return cls(
171
channel=json["channel"],
172
data=json["data"],
173
source=Source.from_json(json["source"]),
174
)
175
176
177
class RealmCreated:
178
"""Represents a realm created event."""
179
180
event_class = "script.realmCreated"
181
182
def __init__(self, realm_info: RealmInfo):
183
self.realm_info = realm_info
184
185
@classmethod
186
def from_json(cls, json: dict[str, Any]) -> "RealmCreated":
187
"""Creates a RealmCreated instance from a dictionary.
188
189
Args:
190
json: A dictionary containing the realm created event.
191
192
Returns:
193
RealmCreated: A new instance of RealmCreated.
194
"""
195
return cls(realm_info=RealmInfo.from_json(json))
196
197
198
class RealmDestroyed:
199
"""Represents a realm destroyed event."""
200
201
event_class = "script.realmDestroyed"
202
203
def __init__(self, realm: str):
204
self.realm = realm
205
206
@classmethod
207
def from_json(cls, json: dict[str, Any]) -> "RealmDestroyed":
208
"""Creates a RealmDestroyed instance from a dictionary.
209
210
Args:
211
json: A dictionary containing the realm destroyed event.
212
213
Returns:
214
RealmDestroyed: A new instance of RealmDestroyed.
215
"""
216
if "realm" not in json:
217
raise ValueError("Missing required field 'realm' in RealmDestroyed")
218
219
return cls(realm=json["realm"])
220
221
222
class Script:
223
"""BiDi implementation of the script module."""
224
225
EVENTS = {
226
"message": "script.message",
227
"realm_created": "script.realmCreated",
228
"realm_destroyed": "script.realmDestroyed",
229
}
230
231
def __init__(self, conn, driver=None):
232
self.conn = conn
233
self.driver = driver
234
self.log_entry_subscribed = False
235
self.subscriptions = {}
236
self.callbacks = {}
237
238
# High-level APIs for SCRIPT module
239
240
def add_console_message_handler(self, handler):
241
self._subscribe_to_log_entries()
242
return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("console", handler))
243
244
def add_javascript_error_handler(self, handler):
245
self._subscribe_to_log_entries()
246
return self.conn.add_callback(LogEntryAdded, self._handle_log_entry("javascript", handler))
247
248
def remove_console_message_handler(self, id):
249
self.conn.remove_callback(LogEntryAdded, id)
250
self._unsubscribe_from_log_entries()
251
252
remove_javascript_error_handler = remove_console_message_handler
253
254
def pin(self, script: str) -> str:
255
"""Pins a script to the current browsing context.
256
257
Args:
258
script: The script to pin.
259
260
Returns:
261
str: The ID of the pinned script.
262
"""
263
return self._add_preload_script(script)
264
265
def unpin(self, script_id: str) -> None:
266
"""Unpins a script from the current browsing context.
267
268
Args:
269
script_id: The ID of the pinned script to unpin.
270
"""
271
self._remove_preload_script(script_id)
272
273
def execute(self, script: str, *args) -> dict:
274
"""Executes a script in the current browsing context.
275
276
Args:
277
script: The script function to execute.
278
*args: Arguments to pass to the script function.
279
280
Returns:
281
dict: The result value from the script execution.
282
283
Raises:
284
WebDriverException: If the script execution fails.
285
"""
286
if self.driver is None:
287
raise WebDriverException("Driver reference is required for script execution")
288
browsing_context_id = self.driver.current_window_handle
289
290
# Convert arguments to the format expected by BiDi call_function (LocalValue Type)
291
arguments = []
292
for arg in args:
293
arguments.append(self.__convert_to_local_value(arg))
294
295
target = {"context": browsing_context_id}
296
297
result = self._call_function(
298
function_declaration=script, await_promise=True, target=target, arguments=arguments if arguments else None
299
)
300
301
if result.type == "success":
302
return result.result if result.result is not None else {}
303
else:
304
error_message = "Error while executing script"
305
if result.exception_details:
306
if "text" in result.exception_details:
307
error_message += f": {result.exception_details['text']}"
308
elif "message" in result.exception_details:
309
error_message += f": {result.exception_details['message']}"
310
311
raise WebDriverException(error_message)
312
313
def __convert_to_local_value(self, value) -> dict:
314
"""Converts a Python value to BiDi LocalValue format."""
315
if value is None:
316
return {"type": "null"}
317
elif isinstance(value, bool):
318
return {"type": "boolean", "value": value}
319
elif isinstance(value, (int, float)):
320
if isinstance(value, float):
321
if math.isnan(value):
322
return {"type": "number", "value": "NaN"}
323
elif math.isinf(value):
324
if value > 0:
325
return {"type": "number", "value": "Infinity"}
326
else:
327
return {"type": "number", "value": "-Infinity"}
328
elif value == 0.0 and math.copysign(1.0, value) < 0:
329
return {"type": "number", "value": "-0"}
330
331
JS_MAX_SAFE_INTEGER = 9007199254740991
332
if isinstance(value, int) and (value > JS_MAX_SAFE_INTEGER or value < -JS_MAX_SAFE_INTEGER):
333
return {"type": "bigint", "value": str(value)}
334
335
return {"type": "number", "value": value}
336
337
elif isinstance(value, str):
338
return {"type": "string", "value": value}
339
elif isinstance(value, datetime.datetime):
340
# Convert Python datetime to JavaScript Date (ISO 8601 format)
341
return {"type": "date", "value": value.isoformat() + "Z" if value.tzinfo is None else value.isoformat()}
342
elif isinstance(value, datetime.date):
343
# Convert Python date to JavaScript Date
344
dt = datetime.datetime.combine(value, datetime.time.min).replace(tzinfo=datetime.timezone.utc)
345
return {"type": "date", "value": dt.isoformat()}
346
elif isinstance(value, set):
347
return {"type": "set", "value": [self.__convert_to_local_value(item) for item in value]}
348
elif isinstance(value, (list, tuple)):
349
return {"type": "array", "value": [self.__convert_to_local_value(item) for item in value]}
350
elif isinstance(value, dict):
351
return {
352
"type": "object",
353
"value": [
354
[self.__convert_to_local_value(k), self.__convert_to_local_value(v)] for k, v in value.items()
355
],
356
}
357
else:
358
# For other types, convert to string
359
return {"type": "string", "value": str(value)}
360
361
# low-level APIs for script module
362
def _add_preload_script(
363
self,
364
function_declaration: str,
365
arguments: list[dict[str, Any]] | None = None,
366
contexts: list[str] | None = None,
367
user_contexts: list[str] | None = None,
368
sandbox: str | None = None,
369
) -> str:
370
"""Adds a preload script.
371
372
Args:
373
function_declaration: The function declaration to preload.
374
arguments: The arguments to pass to the function.
375
contexts: The browsing context IDs to apply the script to.
376
user_contexts: The user context IDs to apply the script to.
377
sandbox: The sandbox name to apply the script to.
378
379
Returns:
380
str: The preload script ID.
381
382
Raises:
383
ValueError: If both contexts and user_contexts are provided.
384
"""
385
if contexts is not None and user_contexts is not None:
386
raise ValueError("Cannot specify both contexts and user_contexts")
387
388
params: dict[str, Any] = {"functionDeclaration": function_declaration}
389
390
if arguments is not None:
391
params["arguments"] = arguments
392
if contexts is not None:
393
params["contexts"] = contexts
394
if user_contexts is not None:
395
params["userContexts"] = user_contexts
396
if sandbox is not None:
397
params["sandbox"] = sandbox
398
399
result = self.conn.execute(command_builder("script.addPreloadScript", params))
400
return result["script"]
401
402
def _remove_preload_script(self, script_id: str) -> None:
403
"""Removes a preload script.
404
405
Args:
406
script_id: The preload script ID to remove.
407
"""
408
params = {"script": script_id}
409
self.conn.execute(command_builder("script.removePreloadScript", params))
410
411
def _disown(self, handles: list[str], target: dict) -> None:
412
"""Disowns the given handles.
413
414
Args:
415
handles: The handles to disown.
416
target: The target realm or context.
417
"""
418
params = {
419
"handles": handles,
420
"target": target,
421
}
422
self.conn.execute(command_builder("script.disown", params))
423
424
def _call_function(
425
self,
426
function_declaration: str,
427
await_promise: bool,
428
target: dict,
429
arguments: list[dict] | None = None,
430
result_ownership: str | None = None,
431
serialization_options: dict | None = None,
432
this: dict | None = None,
433
user_activation: bool = False,
434
) -> EvaluateResult:
435
"""Calls a provided function with given arguments in a given realm.
436
437
Args:
438
function_declaration: The function declaration to call.
439
await_promise: Whether to await promise resolution.
440
target: The target realm or context.
441
arguments: The arguments to pass to the function.
442
result_ownership: The result ownership type.
443
serialization_options: The serialization options.
444
this: The 'this' value for the function call.
445
user_activation: Whether to trigger user activation.
446
447
Returns:
448
EvaluateResult: The result of the function call.
449
"""
450
params = {
451
"functionDeclaration": function_declaration,
452
"awaitPromise": await_promise,
453
"target": target,
454
"userActivation": user_activation,
455
}
456
457
if arguments is not None:
458
params["arguments"] = arguments
459
if result_ownership is not None:
460
params["resultOwnership"] = result_ownership
461
if serialization_options is not None:
462
params["serializationOptions"] = serialization_options
463
if this is not None:
464
params["this"] = this
465
466
result = self.conn.execute(command_builder("script.callFunction", params))
467
return EvaluateResult.from_json(result)
468
469
def _evaluate(
470
self,
471
expression: str,
472
target: dict,
473
await_promise: bool,
474
result_ownership: str | None = None,
475
serialization_options: dict | None = None,
476
user_activation: bool = False,
477
) -> EvaluateResult:
478
"""Evaluates a provided script in a given realm.
479
480
Args:
481
expression: The script expression to evaluate.
482
target: The target realm or context.
483
await_promise: Whether to await promise resolution.
484
result_ownership: The result ownership type.
485
serialization_options: The serialization options.
486
user_activation: Whether to trigger user activation.
487
488
Returns:
489
EvaluateResult: The result of the script evaluation.
490
"""
491
params = {
492
"expression": expression,
493
"target": target,
494
"awaitPromise": await_promise,
495
"userActivation": user_activation,
496
}
497
498
if result_ownership is not None:
499
params["resultOwnership"] = result_ownership
500
if serialization_options is not None:
501
params["serializationOptions"] = serialization_options
502
503
result = self.conn.execute(command_builder("script.evaluate", params))
504
return EvaluateResult.from_json(result)
505
506
def _get_realms(
507
self,
508
context: str | None = None,
509
type: str | None = None,
510
) -> list[RealmInfo]:
511
"""Returns a list of all realms, optionally filtered.
512
513
Args:
514
context: The browsing context ID to filter by.
515
type: The realm type to filter by.
516
517
Returns:
518
List[RealmInfo]: A list of realm information.
519
"""
520
params = {}
521
522
if context is not None:
523
params["context"] = context
524
if type is not None:
525
params["type"] = type
526
527
result = self.conn.execute(command_builder("script.getRealms", params))
528
return [RealmInfo.from_json(realm) for realm in result["realms"]]
529
530
def _subscribe_to_log_entries(self):
531
if not self.log_entry_subscribed:
532
session = Session(self.conn)
533
self.conn.execute(session.subscribe(LogEntryAdded.event_class))
534
self.log_entry_subscribed = True
535
536
def _unsubscribe_from_log_entries(self):
537
if self.log_entry_subscribed and LogEntryAdded.event_class not in self.conn.callbacks:
538
session = Session(self.conn)
539
self.conn.execute(session.unsubscribe(LogEntryAdded.event_class))
540
self.log_entry_subscribed = False
541
542
def _handle_log_entry(self, type, handler):
543
def _handle_log_entry(log_entry):
544
if log_entry.type_ == type:
545
handler(log_entry)
546
547
return _handle_log_entry
548
549