Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
hhhrrrttt222111
GitHub Repository: hhhrrrttt222111/Dorkify
Path: blob/master/venv/Lib/site-packages/pip/_internal/utils/filesystem.py
811 views
1
import errno
2
import fnmatch
3
import os
4
import os.path
5
import random
6
import shutil
7
import stat
8
import sys
9
from contextlib import contextmanager
10
from tempfile import NamedTemporaryFile
11
12
# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
13
# why we ignore the type on this import.
14
from pip._vendor.retrying import retry # type: ignore
15
from pip._vendor.six import PY2
16
17
from pip._internal.utils.compat import get_path_uid
18
from pip._internal.utils.misc import format_size
19
from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
20
21
if MYPY_CHECK_RUNNING:
22
from typing import Any, BinaryIO, Iterator, List, Union
23
24
class NamedTemporaryFileResult(BinaryIO):
25
@property
26
def file(self):
27
# type: () -> BinaryIO
28
pass
29
30
31
def check_path_owner(path):
32
# type: (str) -> bool
33
# If we don't have a way to check the effective uid of this process, then
34
# we'll just assume that we own the directory.
35
if sys.platform == "win32" or not hasattr(os, "geteuid"):
36
return True
37
38
assert os.path.isabs(path)
39
40
previous = None
41
while path != previous:
42
if os.path.lexists(path):
43
# Check if path is writable by current user.
44
if os.geteuid() == 0:
45
# Special handling for root user in order to handle properly
46
# cases where users use sudo without -H flag.
47
try:
48
path_uid = get_path_uid(path)
49
except OSError:
50
return False
51
return path_uid == 0
52
else:
53
return os.access(path, os.W_OK)
54
else:
55
previous, path = path, os.path.dirname(path)
56
return False # assume we don't own the path
57
58
59
def copy2_fixed(src, dest):
60
# type: (str, str) -> None
61
"""Wrap shutil.copy2() but map errors copying socket files to
62
SpecialFileError as expected.
63
64
See also https://bugs.python.org/issue37700.
65
"""
66
try:
67
shutil.copy2(src, dest)
68
except (OSError, IOError):
69
for f in [src, dest]:
70
try:
71
is_socket_file = is_socket(f)
72
except OSError:
73
# An error has already occurred. Another error here is not
74
# a problem and we can ignore it.
75
pass
76
else:
77
if is_socket_file:
78
raise shutil.SpecialFileError(
79
"`{f}` is a socket".format(**locals()))
80
81
raise
82
83
84
def is_socket(path):
85
# type: (str) -> bool
86
return stat.S_ISSOCK(os.lstat(path).st_mode)
87
88
89
@contextmanager
90
def adjacent_tmp_file(path, **kwargs):
91
# type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
92
"""Return a file-like object pointing to a tmp file next to path.
93
94
The file is created securely and is ensured to be written to disk
95
after the context reaches its end.
96
97
kwargs will be passed to tempfile.NamedTemporaryFile to control
98
the way the temporary file will be opened.
99
"""
100
with NamedTemporaryFile(
101
delete=False,
102
dir=os.path.dirname(path),
103
prefix=os.path.basename(path),
104
suffix='.tmp',
105
**kwargs
106
) as f:
107
result = cast('NamedTemporaryFileResult', f)
108
try:
109
yield result
110
finally:
111
result.file.flush()
112
os.fsync(result.file.fileno())
113
114
115
_replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
116
117
if PY2:
118
@_replace_retry
119
def replace(src, dest):
120
# type: (str, str) -> None
121
try:
122
os.rename(src, dest)
123
except OSError:
124
os.remove(dest)
125
os.rename(src, dest)
126
127
else:
128
replace = _replace_retry(os.replace)
129
130
131
# test_writable_dir and _test_writable_dir_win are copied from Flit,
132
# with the author's agreement to also place them under pip's license.
133
def test_writable_dir(path):
134
# type: (str) -> bool
135
"""Check if a directory is writable.
136
137
Uses os.access() on POSIX, tries creating files on Windows.
138
"""
139
# If the directory doesn't exist, find the closest parent that does.
140
while not os.path.isdir(path):
141
parent = os.path.dirname(path)
142
if parent == path:
143
break # Should never get here, but infinite loops are bad
144
path = parent
145
146
if os.name == 'posix':
147
return os.access(path, os.W_OK)
148
149
return _test_writable_dir_win(path)
150
151
152
def _test_writable_dir_win(path):
153
# type: (str) -> bool
154
# os.access doesn't work on Windows: http://bugs.python.org/issue2528
155
# and we can't use tempfile: http://bugs.python.org/issue22107
156
basename = 'accesstest_deleteme_fishfingers_custard_'
157
alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
158
for i in range(10):
159
name = basename + ''.join(random.choice(alphabet) for _ in range(6))
160
file = os.path.join(path, name)
161
try:
162
fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
163
# Python 2 doesn't support FileExistsError and PermissionError.
164
except OSError as e:
165
# exception FileExistsError
166
if e.errno == errno.EEXIST:
167
continue
168
# exception PermissionError
169
if e.errno == errno.EPERM or e.errno == errno.EACCES:
170
# This could be because there's a directory with the same name.
171
# But it's highly unlikely there's a directory called that,
172
# so we'll assume it's because the parent dir is not writable.
173
return False
174
raise
175
else:
176
os.close(fd)
177
os.unlink(file)
178
return True
179
180
# This should never be reached
181
raise EnvironmentError(
182
'Unexpected condition testing for writable directory'
183
)
184
185
186
def find_files(path, pattern):
187
# type: (str, str) -> List[str]
188
"""Returns a list of absolute paths of files beneath path, recursively,
189
with filenames which match the UNIX-style shell glob pattern."""
190
result = [] # type: List[str]
191
for root, dirs, files in os.walk(path):
192
matches = fnmatch.filter(files, pattern)
193
result.extend(os.path.join(root, f) for f in matches)
194
return result
195
196
197
def file_size(path):
198
# type: (str) -> Union[int, float]
199
# If it's a symlink, return 0.
200
if os.path.islink(path):
201
return 0
202
return os.path.getsize(path)
203
204
205
def format_file_size(path):
206
# type: (str) -> str
207
return format_size(file_size(path))
208
209
210
def directory_size(path):
211
# type: (str) -> Union[int, float]
212
size = 0.0
213
for root, _dirs, files in os.walk(path):
214
for filename in files:
215
file_path = os.path.join(root, filename)
216
size += file_size(file_path)
217
return size
218
219
220
def format_directory_size(path):
221
# type: (str) -> str
222
return format_size(directory_size(path))
223
224