Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/private/cdp.py
10192 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
19
import contextvars
20
import importlib
21
import itertools
22
import json
23
import logging
24
import os
25
import pathlib
26
from collections import defaultdict
27
from collections.abc import AsyncGenerator, AsyncIterator, Generator
28
from contextlib import asynccontextmanager, contextmanager
29
from dataclasses import dataclass
30
from typing import Any, TypeVar
31
32
import trio
33
from trio_websocket import ConnectionClosed as WsConnectionClosed
34
from trio_websocket import connect_websocket_url
35
36
logger = logging.getLogger("trio_cdp")
37
T = TypeVar("T")
38
MAX_WS_MESSAGE_SIZE = 2**24
39
40
41
def _resolve_max_message_size(explicit=None):
42
"""Return the WebSocket max message size to use.
43
44
Priority: explicit argument > ``SE_CDP_MAX_WS_MESSAGE_SIZE`` env var > ``MAX_WS_MESSAGE_SIZE``.
45
"""
46
if explicit is not None:
47
return explicit
48
return int(os.environ.get("SE_CDP_MAX_WS_MESSAGE_SIZE", MAX_WS_MESSAGE_SIZE))
49
50
51
devtools = None
52
version = None
53
54
55
def import_devtools(ver):
56
"""Attempt to load the current latest available devtools into the module cache for use later."""
57
global devtools
58
global version
59
version = ver
60
base = "selenium.webdriver.common.devtools.v"
61
try:
62
devtools = importlib.import_module(f"{base}{ver}")
63
return devtools
64
except ModuleNotFoundError:
65
# Attempt to parse and load the 'most recent' devtools module. This is likely
66
# because cdp has been updated but selenium python has not been released yet.
67
devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")
68
versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
69
available_versions = tuple(x for x in versions if x == "latest" or (x.startswith("v") and x[1:].isdigit()))
70
numeric_versions = tuple(x[1:] for x in available_versions if x.startswith("v"))
71
if not numeric_versions:
72
raise
73
latest = max(numeric_versions, key=int)
74
selenium_logger = logging.getLogger(__name__)
75
selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)
76
devtools = importlib.import_module(f"{base}{latest}")
77
return devtools
78
79
80
_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")
81
_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")
82
83
84
def get_connection_context(fn_name):
85
"""Look up the current connection.
86
87
If there is no current connection, raise a ``RuntimeError`` with a
88
helpful message.
89
"""
90
try:
91
return _connection_context.get()
92
except LookupError:
93
raise RuntimeError(f"{fn_name}() must be called in a connection context.")
94
95
96
def get_session_context(fn_name):
97
"""Look up the current session.
98
99
If there is no current session, raise a ``RuntimeError`` with a
100
helpful message.
101
"""
102
try:
103
return _session_context.get()
104
except LookupError:
105
raise RuntimeError(f"{fn_name}() must be called in a session context.")
106
107
108
@contextmanager
109
def connection_context(connection):
110
"""Context manager installs ``connection`` as the session context for the current Trio task."""
111
token = _connection_context.set(connection)
112
try:
113
yield
114
finally:
115
_connection_context.reset(token)
116
117
118
@contextmanager
119
def session_context(session):
120
"""Context manager installs ``session`` as the session context for the current Trio task."""
121
token = _session_context.set(session)
122
try:
123
yield
124
finally:
125
_session_context.reset(token)
126
127
128
def set_global_connection(connection):
129
"""Install ``connection`` in the root context so that it will become the default connection for all tasks.
130
131
This is generally not recommended, except it may be necessary in
132
certain use cases such as running inside Jupyter notebook.
133
"""
134
global _connection_context
135
_connection_context = contextvars.ContextVar("_connection_context", default=connection)
136
137
138
def set_global_session(session):
139
"""Install ``session`` in the root context so that it will become the default session for all tasks.
140
141
This is generally not recommended, except it may be necessary in
142
certain use cases such as running inside Jupyter notebook.
143
"""
144
global _session_context
145
_session_context = contextvars.ContextVar("_session_context", default=session)
146
147
148
class BrowserError(Exception):
149
"""This exception is raised when the browser's response to a command indicates that an error occurred."""
150
151
def __init__(self, obj):
152
self.code = obj.get("code")
153
self.message = obj.get("message")
154
self.detail = obj.get("data")
155
156
def __str__(self):
157
return f"BrowserError<code={self.code} message={self.message}> {self.detail}"
158
159
160
class CdpConnectionClosed(WsConnectionClosed):
161
"""Raised when a public method is called on a closed CDP connection."""
162
163
def __init__(self, reason):
164
"""Constructor.
165
166
Args:
167
reason: wsproto.frame_protocol.CloseReason
168
"""
169
self.reason = reason
170
171
def __repr__(self):
172
"""Return representation."""
173
return f"{self.__class__.__name__}<{self.reason}>"
174
175
176
class InternalError(Exception):
177
"""This exception is only raised when there is faulty logic in TrioCDP or the integration with PyCDP."""
178
179
pass
180
181
182
@dataclass
183
class CmEventProxy:
184
"""A proxy object returned by :meth:`CdpBase.wait_for()``.
185
186
After the context manager executes, this proxy object will have a
187
value set that contains the returned event.
188
"""
189
190
value: Any = None
191
192
193
class CdpBase:
194
def __init__(self, ws, session_id, target_id):
195
self.ws = ws
196
self.session_id = session_id
197
self.target_id = target_id
198
self.channels = defaultdict(set)
199
self.id_iter = itertools.count()
200
self.inflight_cmd = {}
201
self.inflight_result = {}
202
203
async def execute(self, cmd: Generator[dict, T, Any]) -> T:
204
"""Execute a command on the server and wait for the result.
205
206
Args:
207
cmd: any CDP command
208
209
Returns:
210
a CDP result
211
"""
212
cmd_id = next(self.id_iter)
213
cmd_event = trio.Event()
214
self.inflight_cmd[cmd_id] = cmd, cmd_event
215
request = next(cmd)
216
request["id"] = cmd_id
217
if self.session_id:
218
request["sessionId"] = self.session_id
219
request_str = json.dumps(request)
220
if logger.isEnabledFor(logging.DEBUG):
221
logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}")
222
try:
223
await self.ws.send_message(request_str)
224
except WsConnectionClosed as wcc:
225
raise CdpConnectionClosed(wcc.reason) from None
226
await cmd_event.wait()
227
response = self.inflight_result.pop(cmd_id)
228
if logger.isEnabledFor(logging.DEBUG):
229
logger.debug(f"Received CDP message: {response}")
230
if isinstance(response, Exception):
231
if logger.isEnabledFor(logging.DEBUG):
232
logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}")
233
raise response
234
return response
235
236
def listen(self, *event_types, buffer_size=10):
237
"""Listen for events.
238
239
Returns:
240
An async iterator that iterates over events matching the indicated types.
241
"""
242
sender, receiver = trio.open_memory_channel(buffer_size)
243
for event_type in event_types:
244
self.channels[event_type].add(sender)
245
return receiver
246
247
@asynccontextmanager
248
async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]:
249
"""Wait for an event of the given type and return it.
250
251
This is an async context manager, so you should open it inside
252
an async with block. The block will not exit until the indicated
253
event is received.
254
"""
255
sender: trio.MemorySendChannel
256
receiver: trio.MemoryReceiveChannel
257
sender, receiver = trio.open_memory_channel(buffer_size)
258
self.channels[event_type].add(sender)
259
proxy = CmEventProxy()
260
yield proxy
261
async with receiver:
262
event = await receiver.receive()
263
proxy.value = event
264
265
def _handle_data(self, data):
266
"""Handle incoming WebSocket data.
267
268
Args:
269
data: a JSON dictionary
270
"""
271
if "id" in data:
272
self._handle_cmd_response(data)
273
else:
274
self._handle_event(data)
275
276
def _handle_cmd_response(self, data: dict):
277
"""Handle a response to a command.
278
279
This will set an event flag that will return control to the
280
task that called the command.
281
282
Args:
283
data: response as a JSON dictionary
284
"""
285
cmd_id = data["id"]
286
try:
287
cmd, event = self.inflight_cmd.pop(cmd_id)
288
except KeyError:
289
logger.warning("Got a message with a command ID that does not exist: %s", data)
290
return
291
if "error" in data:
292
# If the server reported an error, convert it to an exception and do
293
# not process the response any further.
294
self.inflight_result[cmd_id] = BrowserError(data["error"])
295
else:
296
# Otherwise, continue the generator to parse the JSON result
297
# into a CDP object.
298
try:
299
_ = cmd.send(data["result"])
300
raise InternalError("The command's generator function did not exit when expected!")
301
except StopIteration as exit:
302
return_ = exit.value
303
self.inflight_result[cmd_id] = return_
304
event.set()
305
306
def _handle_event(self, data: dict):
307
"""Handle an event.
308
309
Args:
310
data: event as a JSON dictionary
311
"""
312
global devtools
313
if devtools is None:
314
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
315
event = devtools.util.parse_json_event(data)
316
logger.debug("Received event: %s", event)
317
to_remove = set()
318
for sender in self.channels[type(event)]:
319
try:
320
sender.send_nowait(event)
321
except trio.WouldBlock:
322
logger.error('Unable to send event "%r" due to full channel %s', event, sender)
323
except trio.BrokenResourceError:
324
to_remove.add(sender)
325
if to_remove:
326
self.channels[type(event)] -= to_remove
327
328
329
class CdpSession(CdpBase):
330
"""Contains the state for a CDP session.
331
332
Generally you should not instantiate this object yourself; you should call
333
:meth:`CdpConnection.open_session`.
334
"""
335
336
def __init__(self, ws, session_id, target_id):
337
"""Constructor.
338
339
Args:
340
ws: trio_websocket.WebSocketConnection
341
session_id: devtools.target.SessionID
342
target_id: devtools.target.TargetID
343
"""
344
super().__init__(ws, session_id, target_id)
345
346
self._dom_enable_count = 0
347
self._dom_enable_lock = trio.Lock()
348
self._page_enable_count = 0
349
self._page_enable_lock = trio.Lock()
350
351
@asynccontextmanager
352
async def dom_enable(self):
353
"""Context manager that executes ``dom.enable()`` when it enters and then calls ``dom.disable()``.
354
355
This keeps track of concurrent callers and only disables DOM
356
events when all callers have exited.
357
"""
358
global devtools
359
async with self._dom_enable_lock:
360
self._dom_enable_count += 1
361
if self._dom_enable_count == 1:
362
await self.execute(devtools.dom.enable())
363
364
yield
365
366
async with self._dom_enable_lock:
367
self._dom_enable_count -= 1
368
if self._dom_enable_count == 0:
369
await self.execute(devtools.dom.disable())
370
371
@asynccontextmanager
372
async def page_enable(self):
373
"""Context manager executes ``page.enable()`` when it enters and then calls ``page.disable()`` when it exits.
374
375
This keeps track of concurrent callers and only disables page
376
events when all callers have exited.
377
"""
378
global devtools
379
async with self._page_enable_lock:
380
self._page_enable_count += 1
381
if self._page_enable_count == 1:
382
await self.execute(devtools.page.enable())
383
384
yield
385
386
async with self._page_enable_lock:
387
self._page_enable_count -= 1
388
if self._page_enable_count == 0:
389
await self.execute(devtools.page.disable())
390
391
392
class CdpConnection(CdpBase, trio.abc.AsyncResource):
393
"""Contains the connection state for a Chrome DevTools Protocol server.
394
395
CDP can multiplex multiple "sessions" over a single connection. This
396
class corresponds to the "root" session, i.e. the implicitly created
397
session that has no session ID. This class is responsible for
398
reading incoming WebSocket messages and forwarding them to the
399
corresponding session, as well as handling messages targeted at the
400
root session itself. You should generally call the
401
:func:`open_cdp()` instead of instantiating this class directly.
402
"""
403
404
def __init__(self, ws):
405
"""Constructor.
406
407
Args:
408
ws: trio_websocket.WebSocketConnection
409
"""
410
super().__init__(ws, session_id=None, target_id=None)
411
self.sessions = {}
412
413
async def aclose(self):
414
"""Close the underlying WebSocket connection.
415
416
This will cause the reader task to gracefully exit when it tries
417
to read the next message from the WebSocket. All of the public
418
APIs (``execute()``, ``listen()``, etc.) will raise
419
``CdpConnectionClosed`` after the CDP connection is closed. It
420
is safe to call this multiple times.
421
"""
422
await self.ws.aclose()
423
424
@asynccontextmanager
425
async def open_session(self, target_id) -> AsyncIterator[CdpSession]:
426
"""Context manager opens a session and enables the "simple" style of calling CDP APIs.
427
428
For example, inside a session context, you can call ``await
429
dom.get_document()`` and it will execute on the current session
430
automatically.
431
"""
432
session = await self.connect_session(target_id)
433
with session_context(session):
434
yield session
435
436
async def connect_session(self, target_id) -> "CdpSession":
437
"""Returns a new :class:`CdpSession` connected to the specified target."""
438
global devtools
439
if devtools is None:
440
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
441
session_id = await self.execute(devtools.target.attach_to_target(target_id, True))
442
session = CdpSession(self.ws, session_id, target_id)
443
self.sessions[session_id] = session
444
return session
445
446
async def _reader_task(self):
447
"""Runs in the background and handles incoming messages.
448
449
Dispatches responses to commands and events to listeners.
450
"""
451
global devtools
452
if devtools is None:
453
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
454
while True:
455
try:
456
message = await self.ws.get_message()
457
except WsConnectionClosed:
458
# If the WebSocket is closed, we don't want to throw an
459
# exception from the reader task. Instead we will throw
460
# exceptions from the public API methods, and we can quietly
461
# exit the reader task here.
462
break
463
try:
464
data = json.loads(message)
465
except json.JSONDecodeError:
466
raise BrowserError(
467
{
468
"code": -32700,
469
"message": "Client received invalid JSON",
470
"data": message,
471
}
472
)
473
logger.debug("Received message %r", data)
474
if "sessionId" in data:
475
session_id = devtools.target.SessionID(data["sessionId"])
476
try:
477
session = self.sessions[session_id]
478
except KeyError:
479
raise BrowserError(
480
{
481
"code": -32700,
482
"message": "Browser sent a message for an invalid session",
483
"data": f"{session_id!r}",
484
}
485
)
486
session._handle_data(data)
487
else:
488
self._handle_data(data)
489
490
for _, session in self.sessions.items():
491
for _, senders in session.channels.items():
492
for sender in senders:
493
sender.close()
494
495
496
@asynccontextmanager
497
async def open_cdp(url, max_message_size=None) -> AsyncIterator[CdpConnection]:
498
"""Async context manager opens a connection to the browser then closes the connection when the block exits.
499
500
The context manager also sets the connection as the default
501
connection for the current task, so that commands like ``await
502
target.get_targets()`` will run on this connection automatically. If
503
you want to use multiple connections concurrently, it is recommended
504
to open each on in a separate task.
505
506
Args:
507
url: WebSocket URL of the browser's CDP endpoint.
508
max_message_size: Maximum WebSocket message size in bytes. Defaults to the
509
``SE_CDP_MAX_WS_MESSAGE_SIZE`` environment variable, or 16 MiB if unset.
510
"""
511
async with trio.open_nursery() as nursery:
512
conn = await connect_cdp(nursery, url, max_message_size=max_message_size)
513
try:
514
with connection_context(conn):
515
yield conn
516
finally:
517
await conn.aclose()
518
519
520
async def connect_cdp(nursery, url, max_message_size=None) -> CdpConnection:
521
"""Connect to the browser specified by ``url`` and spawn a background task in the specified nursery.
522
523
The ``open_cdp()`` context manager is preferred in most situations.
524
You should only use this function if you need to specify a custom
525
nursery. This connection is not automatically closed! You can either
526
use the connection object as a context manager (``async with
527
conn:``) or else call ``await conn.aclose()`` on it when you are
528
done with it. If ``set_context`` is True, then the returned
529
connection will be installed as the default connection for the
530
current task. This argument is for unusual use cases, such as
531
running inside of a notebook.
532
533
Args:
534
nursery: Trio nursery to spawn the reader task in.
535
url: WebSocket URL of the browser's CDP endpoint.
536
max_message_size: Maximum WebSocket message size in bytes. Defaults to the
537
``SE_CDP_MAX_WS_MESSAGE_SIZE`` environment variable, or 16 MiB if unset.
538
"""
539
ws = await connect_websocket_url(nursery, url, max_message_size=_resolve_max_message_size(max_message_size))
540
cdp_conn = CdpConnection(ws)
541
nursery.start_soon(cdp_conn._reader_task)
542
return cdp_conn
543
544