Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/py/selenium/webdriver/remote/client_config.py
1864 views
1
# Licensed to the Software Freedom Conservancy (SFC) under one
2
# or more contributor license agreements. See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership. The SFC licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License. You may obtain a copy of the License at
8
#
9
# http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied. See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
18
import base64
19
import os
20
import socket
21
from enum import Enum
22
from typing import Optional
23
from urllib import parse
24
25
import certifi
26
27
from selenium.webdriver.common.proxy import Proxy, ProxyType
28
29
30
class AuthType(Enum):
31
BASIC = "Basic"
32
BEARER = "Bearer"
33
X_API_KEY = "X-API-Key"
34
35
36
class _ClientConfigDescriptor:
37
def __init__(self, name):
38
self.name = name
39
40
def __get__(self, obj, cls):
41
return obj.__dict__[self.name]
42
43
def __set__(self, obj, value) -> None:
44
obj.__dict__[self.name] = value
45
46
47
class ClientConfig:
48
remote_server_addr = _ClientConfigDescriptor("_remote_server_addr")
49
"""Gets and Sets Remote Server."""
50
keep_alive = _ClientConfigDescriptor("_keep_alive")
51
"""Gets and Sets Keep Alive value."""
52
proxy = _ClientConfigDescriptor("_proxy")
53
"""Gets and Sets the proxy used for communicating to the driver/server."""
54
ignore_certificates = _ClientConfigDescriptor("_ignore_certificates")
55
"""Gets and Sets the ignore certificate check value."""
56
init_args_for_pool_manager = _ClientConfigDescriptor("_init_args_for_pool_manager")
57
"""Gets and Sets the ignore certificate check."""
58
timeout = _ClientConfigDescriptor("_timeout")
59
"""Gets and Sets the timeout (in seconds) used for communicating to the
60
driver/server."""
61
ca_certs = _ClientConfigDescriptor("_ca_certs")
62
"""Gets and Sets the path to bundle of CA certificates."""
63
username = _ClientConfigDescriptor("_username")
64
"""Gets and Sets the username used for basic authentication to the
65
remote."""
66
password = _ClientConfigDescriptor("_password")
67
"""Gets and Sets the password used for basic authentication to the
68
remote."""
69
auth_type = _ClientConfigDescriptor("_auth_type")
70
"""Gets and Sets the type of authentication to the remote server."""
71
token = _ClientConfigDescriptor("_token")
72
"""Gets and Sets the token used for authentication to the remote server."""
73
user_agent = _ClientConfigDescriptor("_user_agent")
74
"""Gets and Sets user agent to be added to the request headers."""
75
extra_headers = _ClientConfigDescriptor("_extra_headers")
76
"""Gets and Sets extra headers to be added to the request."""
77
78
def __init__(
79
self,
80
remote_server_addr: str,
81
keep_alive: Optional[bool] = True,
82
proxy: Optional[Proxy] = Proxy(raw={"proxyType": ProxyType.SYSTEM}),
83
ignore_certificates: Optional[bool] = False,
84
init_args_for_pool_manager: Optional[dict] = None,
85
timeout: Optional[int] = None,
86
ca_certs: Optional[str] = None,
87
username: Optional[str] = None,
88
password: Optional[str] = None,
89
auth_type: Optional[AuthType] = AuthType.BASIC,
90
token: Optional[str] = None,
91
user_agent: Optional[str] = None,
92
extra_headers: Optional[dict] = None,
93
) -> None:
94
self.remote_server_addr = remote_server_addr
95
self.keep_alive = keep_alive
96
self.proxy = proxy
97
self.ignore_certificates = ignore_certificates
98
self.init_args_for_pool_manager = init_args_for_pool_manager or {}
99
self.timeout = socket.getdefaulttimeout() if timeout is None else timeout
100
self.username = username
101
self.password = password
102
self.auth_type = auth_type
103
self.token = token
104
self.user_agent = user_agent
105
self.extra_headers = extra_headers
106
107
self.ca_certs = (
108
(os.getenv("REQUESTS_CA_BUNDLE") if "REQUESTS_CA_BUNDLE" in os.environ else certifi.where())
109
if ca_certs is None
110
else ca_certs
111
)
112
113
def reset_timeout(self) -> None:
114
"""Resets the timeout to the default value of socket."""
115
self._timeout = socket.getdefaulttimeout()
116
117
def get_proxy_url(self) -> Optional[str]:
118
"""Returns the proxy URL to use for the connection."""
119
proxy_type = self.proxy.proxy_type
120
remote_add = parse.urlparse(self.remote_server_addr)
121
if proxy_type is ProxyType.DIRECT:
122
return None
123
if proxy_type is ProxyType.SYSTEM:
124
_no_proxy = os.environ.get("no_proxy", os.environ.get("NO_PROXY"))
125
if _no_proxy:
126
for entry in map(str.strip, _no_proxy.split(",")):
127
if entry == "*":
128
return None
129
n_url = parse.urlparse(entry)
130
if n_url.netloc and remote_add.netloc == n_url.netloc:
131
return None
132
if n_url.path in remote_add.netloc:
133
return None
134
return os.environ.get(
135
"https_proxy" if self.remote_server_addr.startswith("https://") else "http_proxy",
136
os.environ.get("HTTPS_PROXY" if self.remote_server_addr.startswith("https://") else "HTTP_PROXY"),
137
)
138
if proxy_type is ProxyType.MANUAL:
139
return self.proxy.sslProxy if self.remote_server_addr.startswith("https://") else self.proxy.http_proxy
140
return None
141
142
def get_auth_header(self) -> Optional[dict]:
143
"""Returns the authorization to add to the request headers."""
144
if self.auth_type is AuthType.BASIC and self.username and self.password:
145
credentials = f"{self.username}:{self.password}"
146
encoded_credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
147
return {"Authorization": f"{AuthType.BASIC.value} {encoded_credentials}"}
148
if self.auth_type is AuthType.BEARER and self.token:
149
return {"Authorization": f"{AuthType.BEARER.value} {self.token}"}
150
if self.auth_type is AuthType.X_API_KEY and self.token:
151
return {f"{AuthType.X_API_KEY.value}": f"{self.token}"}
152
return None
153
154