Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/bidi/cdp.py
1864 views
1
# The MIT License(MIT)
2
#
3
# Copyright(c) 2018 Hyperion Gray
4
#
5
# Permission is hereby granted, free of charge, to any person obtaining a copy
6
# of this software and associated documentation files(the "Software"), to deal
7
# in the Software without restriction, including without limitation the rights
8
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
9
# copies of the Software, and to permit persons to whom the Software is
10
# furnished to do so, subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be included in
13
# all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21
# THE SOFTWARE.
22
#
23
# This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp
24
25
# flake8: noqa
26
27
import contextvars
28
import importlib
29
import itertools
30
import json
31
import logging
32
import pathlib
33
from collections import defaultdict
34
from contextlib import asynccontextmanager
35
from contextlib import contextmanager
36
from dataclasses import dataclass
37
from typing import Any
38
from collections.abc import AsyncGenerator
39
from collections.abc import AsyncIterator
40
from collections.abc import Generator
41
from typing import Type
42
from typing import TypeVar
43
44
import trio
45
from trio_websocket import ConnectionClosed as WsConnectionClosed
46
from trio_websocket import connect_websocket_url
47
48
logger = logging.getLogger("trio_cdp")
49
T = TypeVar("T")
50
MAX_WS_MESSAGE_SIZE = 2**24
51
52
devtools = None
53
version = None
54
55
56
def import_devtools(ver):
57
"""Attempt to load the current latest available devtools into the module
58
cache for use later."""
59
global devtools
60
global version
61
version = ver
62
base = "selenium.webdriver.common.devtools.v"
63
try:
64
devtools = importlib.import_module(f"{base}{ver}")
65
return devtools
66
except ModuleNotFoundError:
67
# Attempt to parse and load the 'most recent' devtools module. This is likely
68
# because cdp has been updated but selenium python has not been released yet.
69
devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")
70
versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
71
latest = max(int(x[1:]) for x in versions)
72
selenium_logger = logging.getLogger(__name__)
73
selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)
74
devtools = importlib.import_module(f"{base}{latest}")
75
return devtools
76
77
78
_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")
79
_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")
80
81
82
def get_connection_context(fn_name):
83
"""Look up the current connection.
84
85
If there is no current connection, raise a ``RuntimeError`` with a
86
helpful message.
87
"""
88
try:
89
return _connection_context.get()
90
except LookupError:
91
raise RuntimeError(f"{fn_name}() must be called in a connection context.")
92
93
94
def get_session_context(fn_name):
95
"""Look up the current session.
96
97
If there is no current session, raise a ``RuntimeError`` with a
98
helpful message.
99
"""
100
try:
101
return _session_context.get()
102
except LookupError:
103
raise RuntimeError(f"{fn_name}() must be called in a session context.")
104
105
106
@contextmanager
107
def connection_context(connection):
108
"""This context manager installs ``connection`` as the session context for
109
the current Trio task."""
110
token = _connection_context.set(connection)
111
try:
112
yield
113
finally:
114
_connection_context.reset(token)
115
116
117
@contextmanager
118
def session_context(session):
119
"""This context manager installs ``session`` as the session context for the
120
current Trio task."""
121
token = _session_context.set(session)
122
try:
123
yield
124
finally:
125
_session_context.reset(token)
126
127
128
def set_global_connection(connection):
129
"""Install ``connection`` in the root context so that it will become the
130
default connection for all tasks.
131
132
This is generally not recommended, except it may be necessary in
133
certain use cases such as running inside Jupyter notebook.
134
"""
135
global _connection_context
136
_connection_context = contextvars.ContextVar("_connection_context", default=connection)
137
138
139
def set_global_session(session):
140
"""Install ``session`` in the root context so that it will become the
141
default session for all tasks.
142
143
This is generally not recommended, except it may be necessary in
144
certain use cases such as running inside Jupyter notebook.
145
"""
146
global _session_context
147
_session_context = contextvars.ContextVar("_session_context", default=session)
148
149
150
class BrowserError(Exception):
151
"""This exception is raised when the browser's response to a command
152
indicates that an error occurred."""
153
154
def __init__(self, obj):
155
self.code = obj.get("code")
156
self.message = obj.get("message")
157
self.detail = obj.get("data")
158
159
def __str__(self):
160
return f"BrowserError<code={self.code} message={self.message}> {self.detail}"
161
162
163
class CdpConnectionClosed(WsConnectionClosed):
164
"""Raised when a public method is called on a closed CDP connection."""
165
166
def __init__(self, reason):
167
"""Constructor.
168
169
:param reason:
170
:type reason: wsproto.frame_protocol.CloseReason
171
"""
172
self.reason = reason
173
174
def __repr__(self):
175
"""Return representation."""
176
return f"{self.__class__.__name__}<{self.reason}>"
177
178
179
class InternalError(Exception):
180
"""This exception is only raised when there is faulty logic in TrioCDP or
181
the integration with PyCDP."""
182
183
184
@dataclass
185
class CmEventProxy:
186
"""A proxy object returned by :meth:`CdpBase.wait_for()``.
187
188
After the context manager executes, this proxy object will have a
189
value set that contains the returned event.
190
"""
191
192
value: Any = None
193
194
195
class CdpBase:
196
def __init__(self, ws, session_id, target_id):
197
self.ws = ws
198
self.session_id = session_id
199
self.target_id = target_id
200
self.channels = defaultdict(set)
201
self.id_iter = itertools.count()
202
self.inflight_cmd = {}
203
self.inflight_result = {}
204
205
async def execute(self, cmd: Generator[dict, T, Any]) -> T:
206
"""Execute a command on the server and wait for the result.
207
208
:param cmd: any CDP command
209
:returns: a CDP result
210
"""
211
cmd_id = next(self.id_iter)
212
cmd_event = trio.Event()
213
self.inflight_cmd[cmd_id] = cmd, cmd_event
214
request = next(cmd)
215
request["id"] = cmd_id
216
if self.session_id:
217
request["sessionId"] = self.session_id
218
request_str = json.dumps(request)
219
if logger.isEnabledFor(logging.DEBUG):
220
logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}")
221
try:
222
await self.ws.send_message(request_str)
223
except WsConnectionClosed as wcc:
224
raise CdpConnectionClosed(wcc.reason) from None
225
await cmd_event.wait()
226
response = self.inflight_result.pop(cmd_id)
227
if logger.isEnabledFor(logging.DEBUG):
228
logger.debug(f"Received CDP message: {response}")
229
if isinstance(response, Exception):
230
if logger.isEnabledFor(logging.DEBUG):
231
logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}")
232
raise response
233
return response
234
235
def listen(self, *event_types, buffer_size=10):
236
"""Return an async iterator that iterates over events matching the
237
indicated types."""
238
sender, receiver = trio.open_memory_channel(buffer_size)
239
for event_type in event_types:
240
self.channels[event_type].add(sender)
241
return receiver
242
243
@asynccontextmanager
244
async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]:
245
"""Wait for an event of the given type and return it.
246
247
This is an async context manager, so you should open it inside
248
an async with block. The block will not exit until the indicated
249
event is received.
250
"""
251
sender: trio.MemorySendChannel
252
receiver: trio.MemoryReceiveChannel
253
sender, receiver = trio.open_memory_channel(buffer_size)
254
self.channels[event_type].add(sender)
255
proxy = CmEventProxy()
256
yield proxy
257
async with receiver:
258
event = await receiver.receive()
259
proxy.value = event
260
261
def _handle_data(self, data):
262
"""Handle incoming WebSocket data.
263
264
:param dict data: a JSON dictionary
265
"""
266
if "id" in data:
267
self._handle_cmd_response(data)
268
else:
269
self._handle_event(data)
270
271
def _handle_cmd_response(self, data):
272
"""Handle a response to a command. This will set an event flag that
273
will return control to the task that called the command.
274
275
:param dict data: response as a JSON dictionary
276
"""
277
cmd_id = data["id"]
278
try:
279
cmd, event = self.inflight_cmd.pop(cmd_id)
280
except KeyError:
281
logger.warning("Got a message with a command ID that does not exist: %s", data)
282
return
283
if "error" in data:
284
# If the server reported an error, convert it to an exception and do
285
# not process the response any further.
286
self.inflight_result[cmd_id] = BrowserError(data["error"])
287
else:
288
# Otherwise, continue the generator to parse the JSON result
289
# into a CDP object.
290
try:
291
_ = cmd.send(data["result"])
292
raise InternalError("The command's generator function did not exit when expected!")
293
except StopIteration as exit:
294
return_ = exit.value
295
self.inflight_result[cmd_id] = return_
296
event.set()
297
298
def _handle_event(self, data):
299
"""Handle an event.
300
301
:param dict data: event as a JSON dictionary
302
"""
303
global devtools
304
event = devtools.util.parse_json_event(data)
305
logger.debug("Received event: %s", event)
306
to_remove = set()
307
for sender in self.channels[type(event)]:
308
try:
309
sender.send_nowait(event)
310
except trio.WouldBlock:
311
logger.error('Unable to send event "%r" due to full channel %s', event, sender)
312
except trio.BrokenResourceError:
313
to_remove.add(sender)
314
if to_remove:
315
self.channels[type(event)] -= to_remove
316
317
318
class CdpSession(CdpBase):
319
"""Contains the state for a CDP session.
320
321
Generally you should not instantiate this object yourself; you should call
322
:meth:`CdpConnection.open_session`.
323
"""
324
325
def __init__(self, ws, session_id, target_id):
326
"""Constructor.
327
328
:param trio_websocket.WebSocketConnection ws:
329
:param devtools.target.SessionID session_id:
330
:param devtools.target.TargetID target_id:
331
"""
332
super().__init__(ws, session_id, target_id)
333
334
self._dom_enable_count = 0
335
self._dom_enable_lock = trio.Lock()
336
self._page_enable_count = 0
337
self._page_enable_lock = trio.Lock()
338
339
@asynccontextmanager
340
async def dom_enable(self):
341
"""A context manager that executes ``dom.enable()`` when it enters and
342
then calls ``dom.disable()``.
343
344
This keeps track of concurrent callers and only disables DOM
345
events when all callers have exited.
346
"""
347
global devtools
348
async with self._dom_enable_lock:
349
self._dom_enable_count += 1
350
if self._dom_enable_count == 1:
351
await self.execute(devtools.dom.enable())
352
353
yield
354
355
async with self._dom_enable_lock:
356
self._dom_enable_count -= 1
357
if self._dom_enable_count == 0:
358
await self.execute(devtools.dom.disable())
359
360
@asynccontextmanager
361
async def page_enable(self):
362
"""A context manager that executes ``page.enable()`` when it enters and
363
then calls ``page.disable()`` when it exits.
364
365
This keeps track of concurrent callers and only disables page
366
events when all callers have exited.
367
"""
368
global devtools
369
async with self._page_enable_lock:
370
self._page_enable_count += 1
371
if self._page_enable_count == 1:
372
await self.execute(devtools.page.enable())
373
374
yield
375
376
async with self._page_enable_lock:
377
self._page_enable_count -= 1
378
if self._page_enable_count == 0:
379
await self.execute(devtools.page.disable())
380
381
382
class CdpConnection(CdpBase, trio.abc.AsyncResource):
383
"""Contains the connection state for a Chrome DevTools Protocol server.
384
385
CDP can multiplex multiple "sessions" over a single connection. This
386
class corresponds to the "root" session, i.e. the implicitly created
387
session that has no session ID. This class is responsible for
388
reading incoming WebSocket messages and forwarding them to the
389
corresponding session, as well as handling messages targeted at the
390
root session itself. You should generally call the
391
:func:`open_cdp()` instead of instantiating this class directly.
392
"""
393
394
def __init__(self, ws):
395
"""Constructor.
396
397
:param trio_websocket.WebSocketConnection ws:
398
"""
399
super().__init__(ws, session_id=None, target_id=None)
400
self.sessions = {}
401
402
async def aclose(self):
403
"""Close the underlying WebSocket connection.
404
405
This will cause the reader task to gracefully exit when it tries
406
to read the next message from the WebSocket. All of the public
407
APIs (``execute()``, ``listen()``, etc.) will raise
408
``CdpConnectionClosed`` after the CDP connection is closed. It
409
is safe to call this multiple times.
410
"""
411
await self.ws.aclose()
412
413
@asynccontextmanager
414
async def open_session(self, target_id) -> AsyncIterator[CdpSession]:
415
"""This context manager opens a session and enables the "simple" style
416
of calling CDP APIs.
417
418
For example, inside a session context, you can call ``await
419
dom.get_document()`` and it will execute on the current session
420
automatically.
421
"""
422
session = await self.connect_session(target_id)
423
with session_context(session):
424
yield session
425
426
async def connect_session(self, target_id) -> "CdpSession":
427
"""Returns a new :class:`CdpSession` connected to the specified
428
target."""
429
global devtools
430
session_id = await self.execute(devtools.target.attach_to_target(target_id, True))
431
session = CdpSession(self.ws, session_id, target_id)
432
self.sessions[session_id] = session
433
return session
434
435
async def _reader_task(self):
436
"""Runs in the background and handles incoming messages: dispatching
437
responses to commands and events to listeners."""
438
global devtools
439
while True:
440
try:
441
message = await self.ws.get_message()
442
except WsConnectionClosed:
443
# If the WebSocket is closed, we don't want to throw an
444
# exception from the reader task. Instead we will throw
445
# exceptions from the public API methods, and we can quietly
446
# exit the reader task here.
447
break
448
try:
449
data = json.loads(message)
450
except json.JSONDecodeError:
451
raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})
452
logger.debug("Received message %r", data)
453
if "sessionId" in data:
454
session_id = devtools.target.SessionID(data["sessionId"])
455
try:
456
session = self.sessions[session_id]
457
except KeyError:
458
raise BrowserError(
459
{
460
"code": -32700,
461
"message": "Browser sent a message for an invalid session",
462
"data": f"{session_id!r}",
463
}
464
)
465
session._handle_data(data)
466
else:
467
self._handle_data(data)
468
469
for _, session in self.sessions.items():
470
for _, senders in session.channels.items():
471
for sender in senders:
472
sender.close()
473
474
475
@asynccontextmanager
476
async def open_cdp(url) -> AsyncIterator[CdpConnection]:
477
"""This async context manager opens a connection to the browser specified
478
by ``url`` before entering the block, then closes the connection when the
479
block exits.
480
481
The context manager also sets the connection as the default
482
connection for the current task, so that commands like ``await
483
target.get_targets()`` will run on this connection automatically. If
484
you want to use multiple connections concurrently, it is recommended
485
to open each on in a separate task.
486
"""
487
488
async with trio.open_nursery() as nursery:
489
conn = await connect_cdp(nursery, url)
490
try:
491
with connection_context(conn):
492
yield conn
493
finally:
494
await conn.aclose()
495
496
497
async def connect_cdp(nursery, url) -> CdpConnection:
498
"""Connect to the browser specified by ``url`` and spawn a background task
499
in the specified nursery.
500
501
The ``open_cdp()`` context manager is preferred in most situations.
502
You should only use this function if you need to specify a custom
503
nursery. This connection is not automatically closed! You can either
504
use the connection object as a context manager (``async with
505
conn:``) or else call ``await conn.aclose()`` on it when you are
506
done with it. If ``set_context`` is True, then the returned
507
connection will be installed as the default connection for the
508
current task. This argument is for unusual use cases, such as
509
running inside of a notebook.
510
"""
511
ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)
512
cdp_conn = CdpConnection(ws)
513
nursery.start_soon(cdp_conn._reader_task)
514
return cdp_conn
515
516