Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/utils/filesystem.py
4804 views
import fnmatch1import os2import os.path3import random4import sys5from contextlib import contextmanager6from tempfile import NamedTemporaryFile7from typing import Any, BinaryIO, Generator, List, Union, cast89from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed1011from pip._internal.utils.compat import get_path_uid12from pip._internal.utils.misc import format_size131415def check_path_owner(path: str) -> bool:16# If we don't have a way to check the effective uid of this process, then17# we'll just assume that we own the directory.18if sys.platform == "win32" or not hasattr(os, "geteuid"):19return True2021assert os.path.isabs(path)2223previous = None24while path != previous:25if os.path.lexists(path):26# Check if path is writable by current user.27if os.geteuid() == 0:28# Special handling for root user in order to handle properly29# cases where users use sudo without -H flag.30try:31path_uid = get_path_uid(path)32except OSError:33return False34return path_uid == 035else:36return os.access(path, os.W_OK)37else:38previous, path = path, os.path.dirname(path)39return False # assume we don't own the path404142@contextmanager43def adjacent_tmp_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:44"""Return a file-like object pointing to a tmp file next to path.4546The file is created securely and is ensured to be written to disk47after the context reaches its end.4849kwargs will be passed to tempfile.NamedTemporaryFile to control50the way the temporary file will be opened.51"""52with NamedTemporaryFile(53delete=False,54dir=os.path.dirname(path),55prefix=os.path.basename(path),56suffix=".tmp",57**kwargs,58) as f:59result = cast(BinaryIO, f)60try:61yield result62finally:63result.flush()64os.fsync(result.fileno())656667# Tenacity raises RetryError by default, explicitly raise the original exception68_replace_retry = retry(reraise=True, stop=stop_after_delay(1), wait=wait_fixed(0.25))6970replace = _replace_retry(os.replace)717273# test_writable_dir and _test_writable_dir_win are copied from Flit,74# with the author's agreement to also place them under pip's license.75def test_writable_dir(path: str) -> bool:76"""Check if a directory is writable.7778Uses os.access() on POSIX, tries creating files on Windows.79"""80# If the directory doesn't exist, find the closest parent that does.81while not os.path.isdir(path):82parent = os.path.dirname(path)83if parent == path:84break # Should never get here, but infinite loops are bad85path = parent8687if os.name == "posix":88return os.access(path, os.W_OK)8990return _test_writable_dir_win(path)919293def _test_writable_dir_win(path: str) -> bool:94# os.access doesn't work on Windows: http://bugs.python.org/issue252895# and we can't use tempfile: http://bugs.python.org/issue2210796basename = "accesstest_deleteme_fishfingers_custard_"97alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"98for _ in range(10):99name = basename + "".join(random.choice(alphabet) for _ in range(6))100file = os.path.join(path, name)101try:102fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)103except FileExistsError:104pass105except PermissionError:106# This could be because there's a directory with the same name.107# But it's highly unlikely there's a directory called that,108# so we'll assume it's because the parent dir is not writable.109# This could as well be because the parent dir is not readable,110# due to non-privileged user access.111return False112else:113os.close(fd)114os.unlink(file)115return True116117# This should never be reached118raise OSError("Unexpected condition testing for writable directory")119120121def find_files(path: str, pattern: str) -> List[str]:122"""Returns a list of absolute paths of files beneath path, recursively,123with filenames which match the UNIX-style shell glob pattern."""124result: List[str] = []125for root, _, files in os.walk(path):126matches = fnmatch.filter(files, pattern)127result.extend(os.path.join(root, f) for f in matches)128return result129130131def file_size(path: str) -> Union[int, float]:132# If it's a symlink, return 0.133if os.path.islink(path):134return 0135return os.path.getsize(path)136137138def format_file_size(path: str) -> str:139return format_size(file_size(path))140141142def directory_size(path: str) -> Union[int, float]:143size = 0.0144for root, _dirs, files in os.walk(path):145for filename in files:146file_path = os.path.join(root, filename)147size += file_size(file_path)148return size149150151def format_directory_size(path: str) -> str:152return format_size(directory_size(path))153154155