Path: blob/trunk/py/selenium/webdriver/common/api_request_context.py
10193 views
# Licensed to the Software Freedom Conservancy (SFC) under one1# or more contributor license agreements. See the NOTICE file2# distributed with this work for additional information3# regarding copyright ownership. The SFC licenses this file4# to you under the Apache License, Version 2.0 (the5# "License"); you may not use this file except in compliance6# with the License. You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing,11# software distributed under the License is distributed on an12# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY13# KIND, either express or implied. See the License for the14# specific language governing permissions and limitations15# under the License.1617"""APIRequestContext for making HTTP requests with browser cookie synchronization."""1819import json20import logging21import pathlib22import time23import urllib.parse24from email.utils import parsedate_to_datetime25from http.client import responses as http_status_phrases26from typing import TYPE_CHECKING, Any2728import urllib329from urllib3.util.retry import Retry3031if TYPE_CHECKING:32from selenium.webdriver.remote.webdriver import WebDriver3334logger = logging.getLogger(__name__)353637class APIRequestFailure(Exception):38"""Raised when an API request returns a non-2xx status and fail_on_status_code is True.3940Attributes:41response: The APIResponse that triggered the failure.42"""4344def __init__(self, response: "APIResponse") -> None:45self.response = response46super().__init__(f"{response.status} {response.status_text}: {response.url}")474849class APIResponse:50"""Represents an HTTP response from an API request.5152Attributes:53status: HTTP status code.54status_text: HTTP status text.55headers: Response headers as a dict.56url: The request URL.57"""5859def __init__(self, status: int, status_text: str, headers: dict[str, str], url: str, body: bytes) -> None:60self.status = status61self.status_text = status_text62self.headers = headers63self.url = url64self._body = body6566@property67def ok(self) -> bool:68"""Whether the response status is in the 200-299 range."""69return 200 <= self.status <= 2997071def json(self) -> Any:72"""Parse the response body as JSON.7374Returns:75The parsed JSON object.76"""77return json.loads(self._body)7879def text(self) -> str:80"""Decode the response body as UTF-8 text.8182Returns:83The response body as a string.84"""85return self._body.decode("utf-8")8687def body(self) -> bytes:88"""Return the raw response body bytes.8990Returns:91The response body as bytes.92"""93return self._body9495def dispose(self) -> None:96"""Free the response body memory."""97self._body = b""9899100def _cookie_matches(cookie: dict, url: str, default_domain: str = "") -> bool:101"""Check if a browser cookie should be sent with a request to the given URL.102103Evaluates expiry, domain, path, and secure attribute matching per RFC 6265.104105Args:106cookie: A cookie dict from driver.get_cookies().107url: The target request URL.108default_domain: Fallback domain for host-only cookies (no domain attribute).109When a cookie has no domain, it only matches if the request hostname110equals this value. If empty and cookie has no domain, the cookie is skipped.111112Returns:113True if the cookie matches the URL.114"""115# Expiry check — skip expired cookies116expiry = cookie.get("expiry")117if expiry is not None and expiry <= int(time.time()):118return False119120parsed = urllib.parse.urlparse(url)121hostname = parsed.hostname or ""122path = parsed.path or "/"123scheme = parsed.scheme or "http"124125# Domain matching (RFC 6265 section 5.1.3)126cookie_domain = cookie.get("domain", "")127if not cookie_domain:128# Host-only cookie — must match the origin host exactly129if not default_domain or hostname != default_domain:130return False131elif cookie_domain.startswith("."):132# .example.com matches example.com and sub.example.com133if not (hostname == cookie_domain[1:] or hostname.endswith(cookie_domain)):134return False135else:136if hostname != cookie_domain:137return False138139# Path matching (RFC 6265 section 5.1.4)140cookie_path = cookie.get("path", "/")141if cookie_path == "/":142pass # root path matches everything143elif path != cookie_path and not path.startswith(cookie_path + "/"):144return False145146# Secure matching147if cookie.get("secure", False) and scheme != "https":148return False149150return True151152153def _parse_set_cookie(header_value: str) -> dict:154"""Parse a single Set-Cookie header value into a cookie dict.155156Uses manual parsing instead of http.cookies.SimpleCookie which is too157strict for real-world Set-Cookie headers.158159Args:160header_value: The Set-Cookie header string.161162Returns:163A dict with cookie attributes suitable for driver.add_cookie().164"""165parts = header_value.split(";")166name_value = parts[0].strip()167eq_idx = name_value.find("=")168if eq_idx == -1:169return {}170name = name_value[:eq_idx].strip()171value = name_value[eq_idx + 1 :].strip()172173cookie: dict[str, Any] = {"name": name, "value": value}174has_max_age = False175176for part in parts[1:]:177part = part.strip()178if not part:179continue180if "=" in part:181attr_name, attr_value = part.split("=", 1)182attr_name = attr_name.strip().lower()183attr_value = attr_value.strip()184else:185attr_name = part.strip().lower()186attr_value = ""187188if attr_name == "domain":189cookie["domain"] = attr_value190elif attr_name == "path":191cookie["path"] = attr_value192elif attr_name == "secure":193cookie["secure"] = True194elif attr_name == "httponly":195cookie["httpOnly"] = True196elif attr_name == "samesite":197cookie["sameSite"] = attr_value198elif attr_name == "max-age":199try:200max_age = int(attr_value)201cookie["expiry"] = int(time.time()) + max_age202has_max_age = True203except ValueError:204pass205elif attr_name == "expires" and not has_max_age:206# RFC 6265 §5.3: Max-Age takes precedence over Expires207try:208dt = parsedate_to_datetime(attr_value)209cookie["expiry"] = int(dt.timestamp())210except (ValueError, TypeError):211pass212213return cookie214215216def _get_set_cookie_headers(resp: urllib3.BaseHTTPResponse) -> list[str]:217"""Extract all Set-Cookie header values from a urllib3 response.218219Args:220resp: The urllib3 HTTP response.221222Returns:223A list of Set-Cookie header strings.224"""225if hasattr(resp.headers, "getlist"):226headers = resp.headers.getlist("Set-Cookie")227if headers:228return headers229sc = resp.headers.get("Set-Cookie")230return [sc] if sc else []231232233def _resolve_redirect_url(resp: urllib3.BaseHTTPResponse, original_url: str) -> str:234"""Return the final URL after any redirects.235236urllib3's retry history records each hop. When redirects occurred,237the last entry's redirect_location resolved against its URL gives238the final destination. When no redirects occurred, the original239request URL is returned unchanged.240"""241history = resp.retries.history if resp.retries else ()242if history:243last = history[-1]244if last.url and last.redirect_location:245return urllib.parse.urljoin(last.url, last.redirect_location)246return original_url247248249class _BaseRequestContext:250"""Base class with shared HTTP request logic for API request contexts."""251252def __init__(253self,254base_url: str = "",255extra_headers: dict[str, str] | None = None,256timeout: float = 30.0,257max_redirects: int = 10,258fail_on_status_code: bool = False,259) -> None:260self._base_url = base_url261self._extra_headers = extra_headers or {}262self._timeout = timeout263self._max_redirects = max_redirects264self._fail_on_status_code = fail_on_status_code265self._pool = urllib3.PoolManager()266267def get(self, url: str, **kwargs: Any) -> APIResponse:268"""Send a GET request.269270Args:271url: The request URL (absolute or relative to base_url).272**kwargs: Optional arguments: headers, params, timeout, max_redirects, fail_on_status_code.273274Returns:275An APIResponse object.276"""277return self._fetch(url, "GET", **kwargs)278279def post(self, url: str, **kwargs: Any) -> APIResponse:280"""Send a POST request.281282Args:283url: The request URL (absolute or relative to base_url).284**kwargs: Optional arguments: headers, params, data, form,285json_data, timeout, max_redirects, fail_on_status_code.286287Returns:288An APIResponse object.289"""290return self._fetch(url, "POST", **kwargs)291292def put(self, url: str, **kwargs: Any) -> APIResponse:293"""Send a PUT request.294295Args:296url: The request URL (absolute or relative to base_url).297**kwargs: Optional arguments: headers, params, data, form,298json_data, timeout, max_redirects, fail_on_status_code.299300Returns:301An APIResponse object.302"""303return self._fetch(url, "PUT", **kwargs)304305def patch(self, url: str, **kwargs: Any) -> APIResponse:306"""Send a PATCH request.307308Args:309url: The request URL (absolute or relative to base_url).310**kwargs: Optional arguments: headers, params, data, form,311json_data, timeout, max_redirects, fail_on_status_code.312313Returns:314An APIResponse object.315"""316return self._fetch(url, "PATCH", **kwargs)317318def delete(self, url: str, **kwargs: Any) -> APIResponse:319"""Send a DELETE request.320321Args:322url: The request URL (absolute or relative to base_url).323**kwargs: Optional arguments: headers, params, data, form,324json_data, timeout, max_redirects, fail_on_status_code.325326Returns:327An APIResponse object.328"""329return self._fetch(url, "DELETE", **kwargs)330331def head(self, url: str, **kwargs: Any) -> APIResponse:332"""Send a HEAD request.333334Args:335url: The request URL (absolute or relative to base_url).336**kwargs: Optional arguments: headers, params, timeout,337max_redirects, fail_on_status_code.338339Returns:340An APIResponse object.341"""342return self._fetch(url, "HEAD", **kwargs)343344def fetch(self, url: str, method: str = "GET", **kwargs: Any) -> APIResponse:345"""Send an HTTP request with a custom method.346347Args:348url: The request URL (absolute or relative to base_url).349method: The HTTP method to use.350**kwargs: Optional arguments: headers, params, data, form,351json_data, timeout, max_redirects, fail_on_status_code.352353Returns:354An APIResponse object.355"""356return self._fetch(url, method, **kwargs)357358def dispose(self) -> None:359"""Close the underlying connection pool."""360self._pool.clear()361362def _resolve_url(self, url: str) -> str:363"""Resolve a URL, prepending base_url for relative paths."""364if not url.startswith(("http://", "https://")):365return self._base_url.rstrip("/") + "/" + url.lstrip("/")366return url367368def _build_headers(self, kwargs: dict[str, Any]) -> dict[str, str]:369"""Merge extra_headers with per-request headers."""370headers = dict(self._extra_headers)371if kwargs.get("headers"):372headers.update(kwargs["headers"])373return headers374375def _prepare_body(self, headers: dict[str, str], kwargs: dict[str, Any]) -> bytes | None:376"""Prepare the request body from json_data, form, or data kwargs.377378Priority: json_data > form > data. Only one should be provided.379"""380json_data = kwargs.get("json_data")381form = kwargs.get("form")382data = kwargs.get("data")383384if json_data is not None:385headers.setdefault("Content-Type", "application/json")386return json.dumps(json_data).encode("utf-8")387elif form is not None:388headers.setdefault("Content-Type", "application/x-www-form-urlencoded")389return urllib.parse.urlencode(form).encode("utf-8")390elif data is not None:391if isinstance(data, dict):392headers.setdefault("Content-Type", "application/x-www-form-urlencoded")393return urllib.parse.urlencode(data).encode("utf-8")394elif isinstance(data, str):395return data.encode("utf-8")396elif isinstance(data, bytes):397return data398return None399400def _append_params(self, url: str, kwargs: dict[str, Any]) -> str:401"""Append query parameters to the URL."""402params = kwargs.get("params")403if params:404separator = "&" if "?" in url else "?"405return url + separator + urllib.parse.urlencode(params)406return url407408def _execute_request(409self, method: str, url: str, headers: dict[str, str], body: bytes | None, kwargs: dict[str, Any]410) -> urllib3.BaseHTTPResponse:411"""Execute the HTTP request via urllib3."""412timeout = kwargs.get("timeout", self._timeout)413max_redirects = kwargs.get("max_redirects", self._max_redirects)414415follow = max_redirects > 0416retries = Retry(417connect=0,418read=0,419status=0,420other=0,421redirect=max_redirects if follow else 0,422raise_on_redirect=False,423)424425return self._pool.request(426method,427url,428headers=headers,429body=body,430timeout=timeout,431redirect=follow,432retries=retries,433preload_content=True,434)435436def _build_response(self, resp: urllib3.BaseHTTPResponse, url: str) -> APIResponse:437"""Build an APIResponse from a urllib3 response."""438# Merge duplicate headers per RFC 7230 §3.2.2 (combine with ", ")439resp_headers: dict[str, str] = {}440for k, v in resp.headers.items():441key = k.lower()442if key in resp_headers:443resp_headers[key] = resp_headers[key] + ", " + v444else:445resp_headers[key] = v446# urllib3 2.x removed resp.reason; fall back to stdlib phrase lookup447reason = getattr(resp, "reason", None)448status_text = reason or http_status_phrases.get(resp.status, "")449return APIResponse(450status=resp.status,451status_text=status_text,452headers=resp_headers,453url=url,454body=resp.data,455)456457def _get_cookies_for_request(self, url: str) -> list[dict]:458"""Get cookies that should be sent with the request. Overridden by subclasses."""459return []460461def _handle_response_cookies(self, set_cookie_headers: list[str], url: str) -> None:462"""Process Set-Cookie headers from the response. Overridden by subclasses."""463464def _fetch(self, url: str, method: str, **kwargs: Any) -> APIResponse:465"""Execute an HTTP request with cookie handling.466467Args:468url: The request URL.469method: The HTTP method.470**kwargs: Optional arguments.471472Returns:473An APIResponse object.474"""475url = self._resolve_url(url)476headers = self._build_headers(kwargs)477478# Apply cookies479matching_cookies = self._get_cookies_for_request(url)480if matching_cookies:481cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in matching_cookies)482if "Cookie" in headers:483headers["Cookie"] = headers["Cookie"] + "; " + cookie_header484else:485headers["Cookie"] = cookie_header486487body = self._prepare_body(headers, kwargs)488url = self._append_params(url, kwargs)489resp = self._execute_request(method, url, headers, body, kwargs)490491# After redirects, associate cookies with the final destination's492# origin, not the initial request URL.493final_url = _resolve_redirect_url(resp, url)494495# Process response cookies496set_cookie_headers = _get_set_cookie_headers(resp)497if set_cookie_headers:498self._handle_response_cookies(set_cookie_headers, final_url)499500response = self._build_response(resp, final_url)501502fail = kwargs.get("fail_on_status_code", self._fail_on_status_code)503if fail and not response.ok:504raise APIRequestFailure(response)505506return response507508509class APIRequestContext(_BaseRequestContext):510"""Makes HTTP requests with automatic browser cookie synchronization.511512Cookies from the browser session are sent with API requests, and cookies513from API responses are synced back to the browser.514515Args:516driver: The WebDriver instance to sync cookies with.517base_url: Optional base URL prepended to relative request paths.518extra_headers: Optional headers included in every request.519timeout: Default request timeout in seconds.520max_redirects: Maximum number of redirects to follow.521fail_on_status_code: If True, raise APIRequestFailure for non-2xx responses.522"""523524def __init__(525self,526driver: "WebDriver",527base_url: str = "",528extra_headers: dict[str, str] | None = None,529timeout: float = 30.0,530max_redirects: int = 10,531fail_on_status_code: bool = False,532) -> None:533super().__init__(534base_url=base_url,535extra_headers=extra_headers,536timeout=timeout,537max_redirects=max_redirects,538fail_on_status_code=fail_on_status_code,539)540self._driver = driver541542def new_context(543self,544base_url: str = "",545extra_headers: dict[str, str] | None = None,546storage_state: dict | str | pathlib.Path | None = None,547fail_on_status_code: bool = False,548) -> "_IsolatedAPIRequestContext":549"""Create an isolated API request context that does not sync with the browser.550551Args:552base_url: Optional base URL for this context.553extra_headers: Optional headers for this context.554storage_state: Optional cookies to pre-load, as a dict, JSON file path, or Path.555fail_on_status_code: If True, raise APIRequestFailure for non-2xx responses.556557Returns:558An _IsolatedAPIRequestContext instance.559"""560cookies: list[dict] = []561if storage_state is not None:562if isinstance(storage_state, (str, pathlib.Path)):563file_path = pathlib.Path(storage_state)564if not file_path.exists():565raise FileNotFoundError(f"Storage state file not found: {file_path}")566try:567with open(file_path) as f:568state = json.load(f)569except json.JSONDecodeError as e:570raise ValueError(f"Invalid JSON in storage state file {file_path}: {e}") from e571except OSError as e:572raise OSError(f"Cannot read storage state file {file_path}: {e}") from e573else:574state = storage_state575cookies = list(state.get("cookies", []))576577return _IsolatedAPIRequestContext(578base_url=base_url,579extra_headers=extra_headers,580cookies=cookies,581timeout=self._timeout,582max_redirects=self._max_redirects,583fail_on_status_code=fail_on_status_code,584)585586def get_storage_state(self, path: str | pathlib.Path | None = None) -> dict[str, Any]:587"""Export the current browser cookies as a storage state dict.588589Args:590path: Optional file path to save the storage state as JSON.591592Returns:593A dict with a "cookies" key containing the browser cookies.594"""595cookies = self._driver.get_cookies()596state: dict[str, Any] = {"cookies": cookies}597if path is not None:598file_path = pathlib.Path(path)599try:600with open(file_path, "w") as f:601json.dump(state, f, indent=2)602except OSError as e:603raise OSError(f"Cannot write storage state to {file_path}: {e}") from e604return state605606def _get_cookies_for_request(self, url: str) -> list[dict]:607"""Get matching browser cookies for the request URL."""608try:609browser_cookies = self._driver.get_cookies()610except Exception:611logger.debug("Could not retrieve browser cookies", exc_info=True)612return []613# Derive default domain from the browser's current page for host-only cookies614default_domain = ""615try:616current = self._driver.current_url617if current:618default_domain = urllib.parse.urlparse(current).hostname or ""619except Exception:620logger.debug("Could not get current URL for host-only cookie matching", exc_info=True)621return [c for c in browser_cookies if _cookie_matches(c, url, default_domain)]622623def _handle_response_cookies(self, set_cookie_headers: list[str], url: str) -> None:624"""Sync Set-Cookie headers back to the browser."""625parsed_url = urllib.parse.urlparse(url)626for sc_header in set_cookie_headers:627cookie = _parse_set_cookie(sc_header)628if not cookie.get("name"):629continue630cookie.setdefault("domain", parsed_url.hostname or "")631cookie.setdefault("path", "/")632expiry = cookie.get("expiry")633if expiry is not None and expiry <= int(time.time()):634try:635self._driver.delete_cookie(cookie["name"])636except Exception:637pass638continue639try:640self._driver.add_cookie(cookie)641except Exception:642logger.warning(643"Could not sync cookie '%s' to browser (domain mismatch with current page)",644cookie.get("name"),645exc_info=True,646)647648649class _IsolatedAPIRequestContext(_BaseRequestContext):650"""An isolated API request context that maintains its own cookie jar.651652Does not synchronize cookies with any browser session.653"""654655def __init__(656self,657base_url: str = "",658extra_headers: dict[str, str] | None = None,659cookies: list[dict] | None = None,660timeout: float = 30.0,661max_redirects: int = 10,662fail_on_status_code: bool = False,663) -> None:664super().__init__(665base_url=base_url,666extra_headers=extra_headers,667timeout=timeout,668max_redirects=max_redirects,669fail_on_status_code=fail_on_status_code,670)671self._cookies: list[dict] = cookies or []672673def get_storage_state(self) -> dict[str, Any]:674"""Return the current cookies as a storage state dict."""675return {"cookies": list(self._cookies)}676677def _get_cookies_for_request(self, url: str) -> list[dict]:678"""Get matching cookies from the internal jar."""679# For isolated contexts, use the request hostname as default domain680default_domain = urllib.parse.urlparse(url).hostname or ""681return [c for c in self._cookies if _cookie_matches(c, url, default_domain)]682683def _handle_response_cookies(self, set_cookie_headers: list[str], url: str) -> None:684"""Store Set-Cookie headers in the internal jar."""685parsed_url = urllib.parse.urlparse(url)686now = int(time.time())687for sc_header in set_cookie_headers:688cookie = _parse_set_cookie(sc_header)689if not cookie.get("name"):690continue691cookie.setdefault("domain", parsed_url.hostname or "")692cookie.setdefault("path", "/")693# Cookies are unique by (name, domain, path)694key = (cookie["name"], cookie.get("domain", ""), cookie.get("path", "/"))695# Remove existing cookie with same key696self._cookies = [697c for c in self._cookies if (c.get("name"), c.get("domain", ""), c.get("path", "/")) != key698]699# Only store if not expired (Max-Age=0 or negative means delete)700expiry = cookie.get("expiry")701if expiry is not None and expiry <= now:702continue703self._cookies.append(cookie)704705706