Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/common/bidi/cdp.py
4041 views
1
# The MIT License(MIT)
2
#
3
# Copyright(c) 2018 Hyperion Gray
4
#
5
# Permission is hereby granted, free of charge, to any person obtaining a copy
6
# of this software and associated documentation files(the "Software"), to deal
7
# in the Software without restriction, including without limitation the rights
8
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
9
# copies of the Software, and to permit persons to whom the Software is
10
# furnished to do so, subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be included in
13
# all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21
# THE SOFTWARE.
22
#
23
# This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp
24
25
import contextvars
26
import importlib
27
import itertools
28
import json
29
import logging
30
import pathlib
31
from collections import defaultdict
32
from collections.abc import AsyncGenerator, AsyncIterator, Generator
33
from contextlib import asynccontextmanager, contextmanager
34
from dataclasses import dataclass
35
from typing import Any, TypeVar
36
37
import trio
38
from trio_websocket import ConnectionClosed as WsConnectionClosed
39
from trio_websocket import connect_websocket_url
40
41
logger = logging.getLogger("trio_cdp")
42
T = TypeVar("T")
43
MAX_WS_MESSAGE_SIZE = 2**24
44
45
devtools = None
46
version = None
47
48
49
def import_devtools(ver):
50
"""Attempt to load the current latest available devtools into the module cache for use later."""
51
global devtools
52
global version
53
version = ver
54
base = "selenium.webdriver.common.devtools.v"
55
try:
56
devtools = importlib.import_module(f"{base}{ver}")
57
return devtools
58
except ModuleNotFoundError:
59
# Attempt to parse and load the 'most recent' devtools module. This is likely
60
# because cdp has been updated but selenium python has not been released yet.
61
devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")
62
versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
63
latest = max(int(x[1:]) for x in versions)
64
selenium_logger = logging.getLogger(__name__)
65
selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)
66
devtools = importlib.import_module(f"{base}{latest}")
67
return devtools
68
69
70
_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")
71
_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")
72
73
74
def get_connection_context(fn_name):
75
"""Look up the current connection.
76
77
If there is no current connection, raise a ``RuntimeError`` with a
78
helpful message.
79
"""
80
try:
81
return _connection_context.get()
82
except LookupError:
83
raise RuntimeError(f"{fn_name}() must be called in a connection context.")
84
85
86
def get_session_context(fn_name):
87
"""Look up the current session.
88
89
If there is no current session, raise a ``RuntimeError`` with a
90
helpful message.
91
"""
92
try:
93
return _session_context.get()
94
except LookupError:
95
raise RuntimeError(f"{fn_name}() must be called in a session context.")
96
97
98
@contextmanager
99
def connection_context(connection):
100
"""Context manager installs ``connection`` as the session context for the current Trio task."""
101
token = _connection_context.set(connection)
102
try:
103
yield
104
finally:
105
_connection_context.reset(token)
106
107
108
@contextmanager
109
def session_context(session):
110
"""Context manager installs ``session`` as the session context for the current Trio task."""
111
token = _session_context.set(session)
112
try:
113
yield
114
finally:
115
_session_context.reset(token)
116
117
118
def set_global_connection(connection):
119
"""Install ``connection`` in the root context so that it will become the default connection for all tasks.
120
121
This is generally not recommended, except it may be necessary in
122
certain use cases such as running inside Jupyter notebook.
123
"""
124
global _connection_context
125
_connection_context = contextvars.ContextVar("_connection_context", default=connection)
126
127
128
def set_global_session(session):
129
"""Install ``session`` in the root context so that it will become the default session for all tasks.
130
131
This is generally not recommended, except it may be necessary in
132
certain use cases such as running inside Jupyter notebook.
133
"""
134
global _session_context
135
_session_context = contextvars.ContextVar("_session_context", default=session)
136
137
138
class BrowserError(Exception):
139
"""This exception is raised when the browser's response to a command indicates that an error occurred."""
140
141
def __init__(self, obj):
142
self.code = obj.get("code")
143
self.message = obj.get("message")
144
self.detail = obj.get("data")
145
146
def __str__(self):
147
return f"BrowserError<code={self.code} message={self.message}> {self.detail}"
148
149
150
class CdpConnectionClosed(WsConnectionClosed):
151
"""Raised when a public method is called on a closed CDP connection."""
152
153
def __init__(self, reason):
154
"""Constructor.
155
156
Args:
157
reason: wsproto.frame_protocol.CloseReason
158
"""
159
self.reason = reason
160
161
def __repr__(self):
162
"""Return representation."""
163
return f"{self.__class__.__name__}<{self.reason}>"
164
165
166
class InternalError(Exception):
167
"""This exception is only raised when there is faulty logic in TrioCDP or the integration with PyCDP."""
168
169
pass
170
171
172
@dataclass
173
class CmEventProxy:
174
"""A proxy object returned by :meth:`CdpBase.wait_for()``.
175
176
After the context manager executes, this proxy object will have a
177
value set that contains the returned event.
178
"""
179
180
value: Any = None
181
182
183
class CdpBase:
184
def __init__(self, ws, session_id, target_id):
185
self.ws = ws
186
self.session_id = session_id
187
self.target_id = target_id
188
self.channels = defaultdict(set)
189
self.id_iter = itertools.count()
190
self.inflight_cmd = {}
191
self.inflight_result = {}
192
193
async def execute(self, cmd: Generator[dict, T, Any]) -> T:
194
"""Execute a command on the server and wait for the result.
195
196
Args:
197
cmd: any CDP command
198
199
Returns:
200
a CDP result
201
"""
202
cmd_id = next(self.id_iter)
203
cmd_event = trio.Event()
204
self.inflight_cmd[cmd_id] = cmd, cmd_event
205
request = next(cmd)
206
request["id"] = cmd_id
207
if self.session_id:
208
request["sessionId"] = self.session_id
209
request_str = json.dumps(request)
210
if logger.isEnabledFor(logging.DEBUG):
211
logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}")
212
try:
213
await self.ws.send_message(request_str)
214
except WsConnectionClosed as wcc:
215
raise CdpConnectionClosed(wcc.reason) from None
216
await cmd_event.wait()
217
response = self.inflight_result.pop(cmd_id)
218
if logger.isEnabledFor(logging.DEBUG):
219
logger.debug(f"Received CDP message: {response}")
220
if isinstance(response, Exception):
221
if logger.isEnabledFor(logging.DEBUG):
222
logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}")
223
raise response
224
return response
225
226
def listen(self, *event_types, buffer_size=10):
227
"""Listen for events.
228
229
Returns:
230
An async iterator that iterates over events matching the indicated types.
231
"""
232
sender, receiver = trio.open_memory_channel(buffer_size)
233
for event_type in event_types:
234
self.channels[event_type].add(sender)
235
return receiver
236
237
@asynccontextmanager
238
async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]:
239
"""Wait for an event of the given type and return it.
240
241
This is an async context manager, so you should open it inside
242
an async with block. The block will not exit until the indicated
243
event is received.
244
"""
245
sender: trio.MemorySendChannel
246
receiver: trio.MemoryReceiveChannel
247
sender, receiver = trio.open_memory_channel(buffer_size)
248
self.channels[event_type].add(sender)
249
proxy = CmEventProxy()
250
yield proxy
251
async with receiver:
252
event = await receiver.receive()
253
proxy.value = event
254
255
def _handle_data(self, data):
256
"""Handle incoming WebSocket data.
257
258
Args:
259
data: a JSON dictionary
260
"""
261
if "id" in data:
262
self._handle_cmd_response(data)
263
else:
264
self._handle_event(data)
265
266
def _handle_cmd_response(self, data: dict):
267
"""Handle a response to a command.
268
269
This will set an event flag that will return control to the
270
task that called the command.
271
272
Args:
273
data: response as a JSON dictionary
274
"""
275
cmd_id = data["id"]
276
try:
277
cmd, event = self.inflight_cmd.pop(cmd_id)
278
except KeyError:
279
logger.warning("Got a message with a command ID that does not exist: %s", data)
280
return
281
if "error" in data:
282
# If the server reported an error, convert it to an exception and do
283
# not process the response any further.
284
self.inflight_result[cmd_id] = BrowserError(data["error"])
285
else:
286
# Otherwise, continue the generator to parse the JSON result
287
# into a CDP object.
288
try:
289
_ = cmd.send(data["result"])
290
raise InternalError("The command's generator function did not exit when expected!")
291
except StopIteration as exit:
292
return_ = exit.value
293
self.inflight_result[cmd_id] = return_
294
event.set()
295
296
def _handle_event(self, data: dict):
297
"""Handle an event.
298
299
Args:
300
data: event as a JSON dictionary
301
"""
302
global devtools
303
if devtools is None:
304
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
305
event = devtools.util.parse_json_event(data)
306
logger.debug("Received event: %s", event)
307
to_remove = set()
308
for sender in self.channels[type(event)]:
309
try:
310
sender.send_nowait(event)
311
except trio.WouldBlock:
312
logger.error('Unable to send event "%r" due to full channel %s', event, sender)
313
except trio.BrokenResourceError:
314
to_remove.add(sender)
315
if to_remove:
316
self.channels[type(event)] -= to_remove
317
318
319
class CdpSession(CdpBase):
320
"""Contains the state for a CDP session.
321
322
Generally you should not instantiate this object yourself; you should call
323
:meth:`CdpConnection.open_session`.
324
"""
325
326
def __init__(self, ws, session_id, target_id):
327
"""Constructor.
328
329
Args:
330
ws: trio_websocket.WebSocketConnection
331
session_id: devtools.target.SessionID
332
target_id: devtools.target.TargetID
333
"""
334
super().__init__(ws, session_id, target_id)
335
336
self._dom_enable_count = 0
337
self._dom_enable_lock = trio.Lock()
338
self._page_enable_count = 0
339
self._page_enable_lock = trio.Lock()
340
341
@asynccontextmanager
342
async def dom_enable(self):
343
"""Context manager that executes ``dom.enable()`` when it enters and then calls ``dom.disable()``.
344
345
This keeps track of concurrent callers and only disables DOM
346
events when all callers have exited.
347
"""
348
global devtools
349
async with self._dom_enable_lock:
350
self._dom_enable_count += 1
351
if self._dom_enable_count == 1:
352
await self.execute(devtools.dom.enable())
353
354
yield
355
356
async with self._dom_enable_lock:
357
self._dom_enable_count -= 1
358
if self._dom_enable_count == 0:
359
await self.execute(devtools.dom.disable())
360
361
@asynccontextmanager
362
async def page_enable(self):
363
"""Context manager executes ``page.enable()`` when it enters and then calls ``page.disable()`` when it exits.
364
365
This keeps track of concurrent callers and only disables page
366
events when all callers have exited.
367
"""
368
global devtools
369
async with self._page_enable_lock:
370
self._page_enable_count += 1
371
if self._page_enable_count == 1:
372
await self.execute(devtools.page.enable())
373
374
yield
375
376
async with self._page_enable_lock:
377
self._page_enable_count -= 1
378
if self._page_enable_count == 0:
379
await self.execute(devtools.page.disable())
380
381
382
class CdpConnection(CdpBase, trio.abc.AsyncResource):
383
"""Contains the connection state for a Chrome DevTools Protocol server.
384
385
CDP can multiplex multiple "sessions" over a single connection. This
386
class corresponds to the "root" session, i.e. the implicitly created
387
session that has no session ID. This class is responsible for
388
reading incoming WebSocket messages and forwarding them to the
389
corresponding session, as well as handling messages targeted at the
390
root session itself. You should generally call the
391
:func:`open_cdp()` instead of instantiating this class directly.
392
"""
393
394
def __init__(self, ws):
395
"""Constructor.
396
397
Args:
398
ws: trio_websocket.WebSocketConnection
399
"""
400
super().__init__(ws, session_id=None, target_id=None)
401
self.sessions = {}
402
403
async def aclose(self):
404
"""Close the underlying WebSocket connection.
405
406
This will cause the reader task to gracefully exit when it tries
407
to read the next message from the WebSocket. All of the public
408
APIs (``execute()``, ``listen()``, etc.) will raise
409
``CdpConnectionClosed`` after the CDP connection is closed. It
410
is safe to call this multiple times.
411
"""
412
await self.ws.aclose()
413
414
@asynccontextmanager
415
async def open_session(self, target_id) -> AsyncIterator[CdpSession]:
416
"""Context manager opens a session and enables the "simple" style of calling CDP APIs.
417
418
For example, inside a session context, you can call ``await
419
dom.get_document()`` and it will execute on the current session
420
automatically.
421
"""
422
session = await self.connect_session(target_id)
423
with session_context(session):
424
yield session
425
426
async def connect_session(self, target_id) -> "CdpSession":
427
"""Returns a new :class:`CdpSession` connected to the specified target."""
428
global devtools
429
if devtools is None:
430
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
431
session_id = await self.execute(devtools.target.attach_to_target(target_id, True))
432
session = CdpSession(self.ws, session_id, target_id)
433
self.sessions[session_id] = session
434
return session
435
436
async def _reader_task(self):
437
"""Runs in the background and handles incoming messages.
438
439
Dispatches responses to commands and events to listeners.
440
"""
441
global devtools
442
if devtools is None:
443
raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.")
444
while True:
445
try:
446
message = await self.ws.get_message()
447
except WsConnectionClosed:
448
# If the WebSocket is closed, we don't want to throw an
449
# exception from the reader task. Instead we will throw
450
# exceptions from the public API methods, and we can quietly
451
# exit the reader task here.
452
break
453
try:
454
data = json.loads(message)
455
except json.JSONDecodeError:
456
raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})
457
logger.debug("Received message %r", data)
458
if "sessionId" in data:
459
session_id = devtools.target.SessionID(data["sessionId"])
460
try:
461
session = self.sessions[session_id]
462
except KeyError:
463
raise BrowserError(
464
{
465
"code": -32700,
466
"message": "Browser sent a message for an invalid session",
467
"data": f"{session_id!r}",
468
}
469
)
470
session._handle_data(data)
471
else:
472
self._handle_data(data)
473
474
for _, session in self.sessions.items():
475
for _, senders in session.channels.items():
476
for sender in senders:
477
sender.close()
478
479
480
@asynccontextmanager
481
async def open_cdp(url) -> AsyncIterator[CdpConnection]:
482
"""Async context manager opens a connection to the browser then closes the connection when the block exits.
483
484
The context manager also sets the connection as the default
485
connection for the current task, so that commands like ``await
486
target.get_targets()`` will run on this connection automatically. If
487
you want to use multiple connections concurrently, it is recommended
488
to open each on in a separate task.
489
"""
490
async with trio.open_nursery() as nursery:
491
conn = await connect_cdp(nursery, url)
492
try:
493
with connection_context(conn):
494
yield conn
495
finally:
496
await conn.aclose()
497
498
499
async def connect_cdp(nursery, url) -> CdpConnection:
500
"""Connect to the browser specified by ``url`` and spawn a background task in the specified nursery.
501
502
The ``open_cdp()`` context manager is preferred in most situations.
503
You should only use this function if you need to specify a custom
504
nursery. This connection is not automatically closed! You can either
505
use the connection object as a context manager (``async with
506
conn:``) or else call ``await conn.aclose()`` on it when you are
507
done with it. If ``set_context`` is True, then the returned
508
connection will be installed as the default connection for the
509
current task. This argument is for unusual use cases, such as
510
running inside of a notebook.
511
"""
512
ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)
513
cdp_conn = CdpConnection(ws)
514
nursery.start_soon(cdp_conn._reader_task)
515
return cdp_conn
516
517