Path: blob/master/venv/Lib/site-packages/pip/_internal/utils/filesystem.py
811 views
import errno1import fnmatch2import os3import os.path4import random5import shutil6import stat7import sys8from contextlib import contextmanager9from tempfile import NamedTemporaryFile1011# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is12# why we ignore the type on this import.13from pip._vendor.retrying import retry # type: ignore14from pip._vendor.six import PY21516from pip._internal.utils.compat import get_path_uid17from pip._internal.utils.misc import format_size18from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast1920if MYPY_CHECK_RUNNING:21from typing import Any, BinaryIO, Iterator, List, Union2223class NamedTemporaryFileResult(BinaryIO):24@property25def file(self):26# type: () -> BinaryIO27pass282930def check_path_owner(path):31# type: (str) -> bool32# If we don't have a way to check the effective uid of this process, then33# we'll just assume that we own the directory.34if sys.platform == "win32" or not hasattr(os, "geteuid"):35return True3637assert os.path.isabs(path)3839previous = None40while path != previous:41if os.path.lexists(path):42# Check if path is writable by current user.43if os.geteuid() == 0:44# Special handling for root user in order to handle properly45# cases where users use sudo without -H flag.46try:47path_uid = get_path_uid(path)48except OSError:49return False50return path_uid == 051else:52return os.access(path, os.W_OK)53else:54previous, path = path, os.path.dirname(path)55return False # assume we don't own the path565758def copy2_fixed(src, dest):59# type: (str, str) -> None60"""Wrap shutil.copy2() but map errors copying socket files to61SpecialFileError as expected.6263See also https://bugs.python.org/issue37700.64"""65try:66shutil.copy2(src, dest)67except (OSError, IOError):68for f in [src, dest]:69try:70is_socket_file = is_socket(f)71except OSError:72# An error has already occurred. Another error here is not73# a problem and we can ignore it.74pass75else:76if is_socket_file:77raise shutil.SpecialFileError(78"`{f}` is a socket".format(**locals()))7980raise818283def is_socket(path):84# type: (str) -> bool85return stat.S_ISSOCK(os.lstat(path).st_mode)868788@contextmanager89def adjacent_tmp_file(path, **kwargs):90# type: (str, **Any) -> Iterator[NamedTemporaryFileResult]91"""Return a file-like object pointing to a tmp file next to path.9293The file is created securely and is ensured to be written to disk94after the context reaches its end.9596kwargs will be passed to tempfile.NamedTemporaryFile to control97the way the temporary file will be opened.98"""99with NamedTemporaryFile(100delete=False,101dir=os.path.dirname(path),102prefix=os.path.basename(path),103suffix='.tmp',104**kwargs105) as f:106result = cast('NamedTemporaryFileResult', f)107try:108yield result109finally:110result.file.flush()111os.fsync(result.file.fileno())112113114_replace_retry = retry(stop_max_delay=1000, wait_fixed=250)115116if PY2:117@_replace_retry118def replace(src, dest):119# type: (str, str) -> None120try:121os.rename(src, dest)122except OSError:123os.remove(dest)124os.rename(src, dest)125126else:127replace = _replace_retry(os.replace)128129130# test_writable_dir and _test_writable_dir_win are copied from Flit,131# with the author's agreement to also place them under pip's license.132def test_writable_dir(path):133# type: (str) -> bool134"""Check if a directory is writable.135136Uses os.access() on POSIX, tries creating files on Windows.137"""138# If the directory doesn't exist, find the closest parent that does.139while not os.path.isdir(path):140parent = os.path.dirname(path)141if parent == path:142break # Should never get here, but infinite loops are bad143path = parent144145if os.name == 'posix':146return os.access(path, os.W_OK)147148return _test_writable_dir_win(path)149150151def _test_writable_dir_win(path):152# type: (str) -> bool153# os.access doesn't work on Windows: http://bugs.python.org/issue2528154# and we can't use tempfile: http://bugs.python.org/issue22107155basename = 'accesstest_deleteme_fishfingers_custard_'156alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'157for i in range(10):158name = basename + ''.join(random.choice(alphabet) for _ in range(6))159file = os.path.join(path, name)160try:161fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)162# Python 2 doesn't support FileExistsError and PermissionError.163except OSError as e:164# exception FileExistsError165if e.errno == errno.EEXIST:166continue167# exception PermissionError168if e.errno == errno.EPERM or e.errno == errno.EACCES:169# This could be because there's a directory with the same name.170# But it's highly unlikely there's a directory called that,171# so we'll assume it's because the parent dir is not writable.172return False173raise174else:175os.close(fd)176os.unlink(file)177return True178179# This should never be reached180raise EnvironmentError(181'Unexpected condition testing for writable directory'182)183184185def find_files(path, pattern):186# type: (str, str) -> List[str]187"""Returns a list of absolute paths of files beneath path, recursively,188with filenames which match the UNIX-style shell glob pattern."""189result = [] # type: List[str]190for root, dirs, files in os.walk(path):191matches = fnmatch.filter(files, pattern)192result.extend(os.path.join(root, f) for f in matches)193return result194195196def file_size(path):197# type: (str) -> Union[int, float]198# If it's a symlink, return 0.199if os.path.islink(path):200return 0201return os.path.getsize(path)202203204def format_file_size(path):205# type: (str) -> str206return format_size(file_size(path))207208209def directory_size(path):210# type: (str) -> Union[int, float]211size = 0.0212for root, _dirs, files in os.walk(path):213for filename in files:214file_path = os.path.join(root, filename)215size += file_size(file_path)216return size217218219def format_directory_size(path):220# type: (str) -> str221return format_size(directory_size(path))222223224