Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/remote/client_config.py
3991 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import base64
19
import os
20
import socket
21
from enum import Enum
22
from urllib import parse
23
24
import certifi
25
26
from selenium.webdriver.common.proxy import Proxy, ProxyType
27
28
29
class AuthType(Enum):
30
BASIC = "Basic"
31
BEARER = "Bearer"
32
X_API_KEY = "X-API-Key"
33
34
35
class _ClientConfigDescriptor:
36
def __init__(self, name):
37
self.name = name
38
39
def __get__(self, obj, cls):
40
return obj.__dict__[self.name]
41
42
def __set__(self, obj, value) -> None:
43
obj.__dict__[self.name] = value
44
45
46
class ClientConfig:
47
remote_server_addr = _ClientConfigDescriptor("_remote_server_addr")
48
"""Gets and Sets Remote Server."""
49
keep_alive = _ClientConfigDescriptor("_keep_alive")
50
"""Gets and Sets Keep Alive value."""
51
proxy = _ClientConfigDescriptor("_proxy")
52
"""Gets and Sets the proxy used for communicating with the driver/server."""
53
ignore_certificates = _ClientConfigDescriptor("_ignore_certificates")
54
"""Gets and Sets the ignore certificate check value."""
55
init_args_for_pool_manager = _ClientConfigDescriptor("_init_args_for_pool_manager")
56
"""Gets and Sets the ignore certificate check."""
57
timeout = _ClientConfigDescriptor("_timeout")
58
"""Gets and Sets the timeout (in seconds) used for communicating with the driver/server."""
59
ca_certs = _ClientConfigDescriptor("_ca_certs")
60
"""Gets and Sets the path to bundle of CA certificates."""
61
username = _ClientConfigDescriptor("_username")
62
"""Gets and Sets the username used for basic authentication to the remote."""
63
password = _ClientConfigDescriptor("_password")
64
"""Gets and Sets the password used for basic authentication to the remote."""
65
auth_type = _ClientConfigDescriptor("_auth_type")
66
"""Gets and Sets the type of authentication to the remote server."""
67
token = _ClientConfigDescriptor("_token")
68
"""Gets and Sets the token used for authentication to the remote server."""
69
user_agent = _ClientConfigDescriptor("_user_agent")
70
"""Gets and Sets user agent to be added to the request headers."""
71
extra_headers = _ClientConfigDescriptor("_extra_headers")
72
"""Gets and Sets extra headers to be added to the request."""
73
websocket_timeout = _ClientConfigDescriptor("_websocket_timeout")
74
"""Gets and Sets the WebSocket response wait timeout (in seconds) used for communicating with the browser."""
75
websocket_interval = _ClientConfigDescriptor("_websocket_interval")
76
"""Gets and Sets the WebSocket response wait interval (in seconds) used for communicating with the browser."""
77
78
def __init__(
79
self,
80
remote_server_addr: str,
81
keep_alive: bool | None = True,
82
proxy: Proxy | None = Proxy(raw={"proxyType": ProxyType.SYSTEM}),
83
ignore_certificates: bool | None = False,
84
init_args_for_pool_manager: dict | None = None,
85
timeout: int | None = None,
86
ca_certs: str | None = None,
87
username: str | None = None,
88
password: str | None = None,
89
auth_type: AuthType | None = AuthType.BASIC,
90
token: str | None = None,
91
user_agent: str | None = None,
92
extra_headers: dict | None = None,
93
websocket_timeout: float | None = 30.0,
94
websocket_interval: float | None = 0.1,
95
) -> None:
96
self.remote_server_addr = remote_server_addr
97
self.keep_alive = keep_alive
98
self.proxy = proxy
99
self.ignore_certificates = ignore_certificates
100
self.init_args_for_pool_manager = init_args_for_pool_manager or {}
101
self.timeout = socket.getdefaulttimeout() if timeout is None else timeout
102
self.username = username
103
self.password = password
104
self.auth_type = auth_type
105
self.token = token
106
self.user_agent = user_agent
107
self.extra_headers = extra_headers
108
self.websocket_timeout = websocket_timeout
109
self.websocket_interval = websocket_interval
110
111
self.ca_certs = (
112
(os.getenv("REQUESTS_CA_BUNDLE") if "REQUESTS_CA_BUNDLE" in os.environ else certifi.where())
113
if ca_certs is None
114
else ca_certs
115
)
116
117
def reset_timeout(self) -> None:
118
"""Resets the timeout to the default value of socket."""
119
self._timeout = socket.getdefaulttimeout()
120
121
def get_proxy_url(self) -> str | None:
122
"""Returns the proxy URL to use for the connection."""
123
proxy_type = self.proxy.proxy_type
124
remote_add = parse.urlparse(self.remote_server_addr)
125
if proxy_type is ProxyType.DIRECT:
126
return None
127
if proxy_type is ProxyType.SYSTEM:
128
_no_proxy = os.environ.get("no_proxy", os.environ.get("NO_PROXY"))
129
if _no_proxy:
130
for entry in map(str.strip, _no_proxy.split(",")):
131
if entry == "*":
132
return None
133
n_url = parse.urlparse(entry)
134
if n_url.netloc and remote_add.netloc == n_url.netloc:
135
return None
136
if n_url.path in remote_add.netloc:
137
return None
138
return os.environ.get(
139
"https_proxy" if self.remote_server_addr.startswith("https://") else "http_proxy",
140
os.environ.get("HTTPS_PROXY" if self.remote_server_addr.startswith("https://") else "HTTP_PROXY"),
141
)
142
if proxy_type is ProxyType.MANUAL:
143
return self.proxy.sslProxy if self.remote_server_addr.startswith("https://") else self.proxy.http_proxy
144
return None
145
146
def get_auth_header(self) -> dict | None:
147
"""Returns the authorization to add to the request headers."""
148
if self.auth_type is AuthType.BASIC and self.username and self.password:
149
credentials = f"{self.username}:{self.password}"
150
encoded_credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
151
return {"Authorization": f"{AuthType.BASIC.value} {encoded_credentials}"}
152
if self.auth_type is AuthType.BEARER and self.token:
153
return {"Authorization": f"{AuthType.BEARER.value} {self.token}"}
154
if self.auth_type is AuthType.X_API_KEY and self.token:
155
return {f"{AuthType.X_API_KEY.value}": f"{self.token}"}
156
return None
157
158