Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/compat.py
1566 views
1
# Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import collections.abc as collections_abc
15
import contextlib
16
import datetime
17
import io
18
import locale
19
import os
20
import os.path
21
import queue
22
import re
23
import shlex
24
import signal
25
import urllib.parse as urlparse
26
from configparser import RawConfigParser
27
from urllib.error import URLError
28
from urllib.request import urlopen
29
30
from botocore.compat import six, OrderedDict
31
32
import sys
33
import zipfile
34
from functools import partial
35
36
# Backwards compatible definitions from six
37
PY3 = sys.version_info[0] == 3
38
advance_iterator = next
39
shlex_quote = shlex.quote
40
StringIO = io.StringIO
41
BytesIO = io.BytesIO
42
binary_type = bytes
43
raw_input = input
44
45
46
# Most, but not all, python installations will have zlib. This is required to
47
# compress any files we send via a push. If we can't compress, we can still
48
# package the files in a zip container.
49
try:
50
import zlib
51
52
ZIP_COMPRESSION_MODE = zipfile.ZIP_DEFLATED
53
except ImportError:
54
ZIP_COMPRESSION_MODE = zipfile.ZIP_STORED
55
56
57
try:
58
import sqlite3
59
except ImportError:
60
sqlite3 = None
61
62
63
is_windows = sys.platform == 'win32'
64
65
is_macos = sys.platform == 'darwin'
66
67
68
if is_windows:
69
default_pager = 'more'
70
else:
71
default_pager = 'less -R'
72
73
74
class StdinMissingError(Exception):
75
def __init__(self):
76
message = 'stdin is required for this operation, but is not available.'
77
super(StdinMissingError, self).__init__(message)
78
79
80
class NonTranslatedStdout:
81
"""This context manager sets the line-end translation mode for stdout.
82
83
It is deliberately set to binary mode so that `\r` does not get added to
84
the line ending. This can be useful when printing commands where a
85
windows style line ending would cause errors.
86
"""
87
88
def __enter__(self):
89
if sys.platform == "win32":
90
import msvcrt
91
92
self.previous_mode = msvcrt.setmode(
93
sys.stdout.fileno(), os.O_BINARY
94
)
95
return sys.stdout
96
97
def __exit__(self, type, value, traceback):
98
if sys.platform == "win32":
99
import msvcrt
100
101
msvcrt.setmode(sys.stdout.fileno(), self.previous_mode)
102
103
104
def ensure_text_type(s):
105
if isinstance(s, str):
106
return s
107
if isinstance(s, bytes):
108
return s.decode('utf-8')
109
raise ValueError("Expected str, unicode or bytes, received %s." % type(s))
110
111
112
def get_binary_stdin():
113
if sys.stdin is None:
114
raise StdinMissingError()
115
return sys.stdin.buffer
116
117
118
def get_binary_stdout():
119
return sys.stdout.buffer
120
121
122
def _get_text_writer(stream, errors):
123
return stream
124
125
126
def bytes_print(statement, stdout=None):
127
"""
128
This function is used to write raw bytes to stdout.
129
"""
130
if stdout is None:
131
stdout = sys.stdout
132
133
if getattr(stdout, 'buffer', None):
134
stdout.buffer.write(statement)
135
else:
136
# If it is not possible to write to the standard out buffer.
137
# The next best option is to decode and write to standard out.
138
stdout.write(statement.decode('utf-8'))
139
140
141
def compat_open(filename, mode='r', encoding=None, access_permissions=None):
142
"""Back-port open() that accepts an encoding argument.
143
144
In python3 this uses the built in open() and in python2 this
145
uses the io.open() function.
146
147
If the file is not being opened in binary mode, then we'll
148
use locale.getpreferredencoding() to find the preferred
149
encoding.
150
151
"""
152
opener = os.open
153
if access_permissions is not None:
154
opener = partial(os.open, mode=access_permissions)
155
if 'b' not in mode:
156
encoding = locale.getpreferredencoding()
157
return open(filename, mode, encoding=encoding, opener=opener)
158
159
160
def get_stdout_text_writer():
161
return _get_text_writer(sys.stdout, errors="strict")
162
163
164
def get_stderr_text_writer():
165
return _get_text_writer(sys.stderr, errors="replace")
166
167
168
def get_stderr_encoding():
169
encoding = getattr(sys.__stderr__, 'encoding', None)
170
if encoding is None:
171
encoding = 'utf-8'
172
return encoding
173
174
175
def compat_input(prompt):
176
"""
177
Cygwin's pty's are based on pipes. Therefore, when it interacts with a Win32
178
program (such as Win32 python), what that program sees is a pipe instead of
179
a console. This is important because python buffers pipes, and so on a
180
pty-based terminal, text will not necessarily appear immediately. In most
181
cases, this isn't a big deal. But when we're doing an interactive prompt,
182
the result is that the prompts won't display until we fill the buffer. Since
183
raw_input does not flush the prompt, we need to manually write and flush it.
184
185
See https://github.com/mintty/mintty/issues/56 for more details.
186
"""
187
sys.stdout.write(prompt)
188
sys.stdout.flush()
189
return raw_input()
190
191
192
def compat_shell_quote(s, platform=None):
193
"""Return a shell-escaped version of the string *s*
194
195
Unfortunately `shlex.quote` doesn't support Windows, so this method
196
provides that functionality.
197
"""
198
if platform is None:
199
platform = sys.platform
200
201
if platform == "win32":
202
return _windows_shell_quote(s)
203
else:
204
return shlex.quote(s)
205
206
207
def _windows_shell_quote(s):
208
"""Return a Windows shell-escaped version of the string *s*
209
210
Windows has potentially bizarre rules depending on where you look. When
211
spawning a process via the Windows C runtime the rules are as follows:
212
213
https://docs.microsoft.com/en-us/cpp/cpp/parsing-cpp-command-line-arguments
214
215
To summarize the relevant bits:
216
217
* Only space and tab are valid delimiters
218
* Double quotes are the only valid quotes
219
* Backslash is interpreted literally unless it is part of a chain that
220
leads up to a double quote. Then the backslashes escape the backslashes,
221
and if there is an odd number the final backslash escapes the quote.
222
223
:param s: A string to escape
224
:return: An escaped string
225
"""
226
if not s:
227
return '""'
228
229
buff = []
230
num_backspaces = 0
231
for character in s:
232
if character == '\\':
233
# We can't simply append backslashes because we don't know if
234
# they will need to be escaped. Instead we separately keep track
235
# of how many we've seen.
236
num_backspaces += 1
237
elif character == '"':
238
if num_backspaces > 0:
239
# The backslashes are part of a chain that lead up to a
240
# double quote, so they need to be escaped.
241
buff.append('\\' * (num_backspaces * 2))
242
num_backspaces = 0
243
244
# The double quote also needs to be escaped. The fact that we're
245
# seeing it at all means that it must have been escaped in the
246
# original source.
247
buff.append('\\"')
248
else:
249
if num_backspaces > 0:
250
# The backslashes aren't part of a chain leading up to a
251
# double quote, so they can be inserted directly without
252
# being escaped.
253
buff.append('\\' * num_backspaces)
254
num_backspaces = 0
255
buff.append(character)
256
257
# There may be some leftover backspaces if they were on the trailing
258
# end, so they're added back in here.
259
if num_backspaces > 0:
260
buff.append('\\' * num_backspaces)
261
262
new_s = ''.join(buff)
263
if ' ' in new_s or '\t' in new_s:
264
# If there are any spaces or tabs then the string needs to be double
265
# quoted.
266
return '"%s"' % new_s
267
return new_s
268
269
270
def get_popen_kwargs_for_pager_cmd(pager_cmd=None):
271
"""Returns the default pager to use dependent on platform
272
273
:rtype: str
274
:returns: A string represent the paging command to run based on the
275
platform being used.
276
"""
277
popen_kwargs = {}
278
if pager_cmd is None:
279
pager_cmd = default_pager
280
# Similar to what we do with the help command, we need to specify
281
# shell as True to make it work in the pager for Windows
282
if is_windows:
283
popen_kwargs = {'shell': True}
284
else:
285
pager_cmd = shlex.split(pager_cmd)
286
popen_kwargs['args'] = pager_cmd
287
return popen_kwargs
288
289
290
@contextlib.contextmanager
291
def ignore_user_entered_signals():
292
"""
293
Ignores user entered signals to avoid process getting killed.
294
"""
295
if is_windows:
296
signal_list = [signal.SIGINT]
297
else:
298
signal_list = [signal.SIGINT, signal.SIGQUIT, signal.SIGTSTP]
299
actual_signals = []
300
for user_signal in signal_list:
301
actual_signals.append(signal.signal(user_signal, signal.SIG_IGN))
302
try:
303
yield
304
finally:
305
for sig, user_signal in enumerate(signal_list):
306
signal.signal(user_signal, actual_signals[sig])
307
308
309
# linux_distribution is used by the CodeDeploy customization. Python 3.8
310
# removed it from the stdlib, so it is vendored here in the case where the
311
# import fails.
312
try:
313
from platform import linux_distribution
314
except ImportError:
315
_UNIXCONFDIR = '/etc'
316
317
def _dist_try_harder(distname, version, id):
318
"""Tries some special tricks to get the distribution
319
information in case the default method fails.
320
Currently supports older SuSE Linux, Caldera OpenLinux and
321
Slackware Linux distributions.
322
"""
323
if os.path.exists('/var/adm/inst-log/info'):
324
# SuSE Linux stores distribution information in that file
325
distname = 'SuSE'
326
with open('/var/adm/inst-log/info') as f:
327
for line in f:
328
tv = line.split()
329
if len(tv) == 2:
330
tag, value = tv
331
else:
332
continue
333
if tag == 'MIN_DIST_VERSION':
334
version = value.strip()
335
elif tag == 'DIST_IDENT':
336
values = value.split('-')
337
id = values[2]
338
return distname, version, id
339
340
if os.path.exists('/etc/.installed'):
341
# Caldera OpenLinux has some infos in that file (thanks to Colin Kong)
342
with open('/etc/.installed') as f:
343
for line in f:
344
pkg = line.split('-')
345
if len(pkg) >= 2 and pkg[0] == 'OpenLinux':
346
# XXX does Caldera support non Intel platforms ? If yes,
347
# where can we find the needed id ?
348
return 'OpenLinux', pkg[1], id
349
350
if os.path.isdir('/usr/lib/setup'):
351
# Check for slackware version tag file (thanks to Greg Andruk)
352
verfiles = os.listdir('/usr/lib/setup')
353
for n in range(len(verfiles) - 1, -1, -1):
354
if verfiles[n][:14] != 'slack-version-':
355
del verfiles[n]
356
if verfiles:
357
verfiles.sort()
358
distname = 'slackware'
359
version = verfiles[-1][14:]
360
return distname, version, id
361
362
return distname, version, id
363
364
_release_filename = re.compile(r'(\w+)[-_](release|version)', re.ASCII)
365
_lsb_release_version = re.compile(
366
r'(.+) release ([\d.]+)[^(]*(?:\((.+)\))?', re.ASCII
367
)
368
_release_version = re.compile(
369
r'([^0-9]+)(?: release )?([\d.]+)[^(]*(?:\((.+)\))?',
370
re.ASCII,
371
)
372
373
# See also http://www.novell.com/coolsolutions/feature/11251.html
374
# and http://linuxmafia.com/faq/Admin/release-files.html
375
# and http://data.linux-ntfs.org/rpm/whichrpm
376
# and http://www.die.net/doc/linux/man/man1/lsb_release.1.html
377
378
_supported_dists = (
379
'SuSE',
380
'debian',
381
'fedora',
382
'redhat',
383
'centos',
384
'mandrake',
385
'mandriva',
386
'rocks',
387
'slackware',
388
'yellowdog',
389
'gentoo',
390
'UnitedLinux',
391
'turbolinux',
392
'arch',
393
'mageia',
394
)
395
396
def _parse_release_file(firstline):
397
# Default to empty 'version' and 'id' strings. Both defaults are used
398
# when 'firstline' is empty. 'id' defaults to empty when an id can not
399
# be deduced.
400
version = ''
401
id = ''
402
403
# Parse the first line
404
m = _lsb_release_version.match(firstline)
405
if m is not None:
406
# LSB format: "distro release x.x (codename)"
407
return tuple(m.groups())
408
409
# Pre-LSB format: "distro x.x (codename)"
410
m = _release_version.match(firstline)
411
if m is not None:
412
return tuple(m.groups())
413
414
# Unknown format... take the first two words
415
l = firstline.strip().split()
416
if l:
417
version = l[0]
418
if len(l) > 1:
419
id = l[1]
420
return '', version, id
421
422
_distributor_id_file_re = re.compile(r"(?:DISTRIB_ID\s*=)\s*(.*)", re.I)
423
_release_file_re = re.compile(r"(?:DISTRIB_RELEASE\s*=)\s*(.*)", re.I)
424
_codename_file_re = re.compile(r"(?:DISTRIB_CODENAME\s*=)\s*(.*)", re.I)
425
426
def linux_distribution(
427
distname='',
428
version='',
429
id='',
430
supported_dists=_supported_dists,
431
full_distribution_name=1,
432
):
433
return _linux_distribution(
434
distname, version, id, supported_dists, full_distribution_name
435
)
436
437
def _linux_distribution(
438
distname, version, id, supported_dists, full_distribution_name
439
):
440
"""Tries to determine the name of the Linux OS distribution name.
441
The function first looks for a distribution release file in
442
/etc and then reverts to _dist_try_harder() in case no
443
suitable files are found.
444
supported_dists may be given to define the set of Linux
445
distributions to look for. It defaults to a list of currently
446
supported Linux distributions identified by their release file
447
name.
448
If full_distribution_name is true (default), the full
449
distribution read from the OS is returned. Otherwise the short
450
name taken from supported_dists is used.
451
Returns a tuple (distname, version, id) which default to the
452
args given as parameters.
453
"""
454
# check for the Debian/Ubuntu /etc/lsb-release file first, needed so
455
# that the distribution doesn't get identified as Debian.
456
# https://bugs.python.org/issue9514
457
try:
458
with open("/etc/lsb-release") as etclsbrel:
459
for line in etclsbrel:
460
m = _distributor_id_file_re.search(line)
461
if m:
462
_u_distname = m.group(1).strip()
463
m = _release_file_re.search(line)
464
if m:
465
_u_version = m.group(1).strip()
466
m = _codename_file_re.search(line)
467
if m:
468
_u_id = m.group(1).strip()
469
if _u_distname and _u_version:
470
return (_u_distname, _u_version, _u_id)
471
except (OSError, UnboundLocalError):
472
pass
473
474
try:
475
etc = os.listdir(_UNIXCONFDIR)
476
except OSError:
477
# Probably not a Unix system
478
return distname, version, id
479
etc.sort()
480
for file in etc:
481
m = _release_filename.match(file)
482
if m is not None:
483
_distname, dummy = m.groups()
484
if _distname in supported_dists:
485
distname = _distname
486
break
487
else:
488
return _dist_try_harder(distname, version, id)
489
490
# Read the first line
491
with open(
492
os.path.join(_UNIXCONFDIR, file),
493
encoding='utf-8',
494
errors='surrogateescape',
495
) as f:
496
firstline = f.readline()
497
_distname, _version, _id = _parse_release_file(firstline)
498
499
if _distname and full_distribution_name:
500
distname = _distname
501
if _version:
502
version = _version
503
if _id:
504
id = _id
505
return distname, version, id
506
507
508
def get_current_datetime(remove_tzinfo=True):
509
# TODO: Consolidate to botocore.compat.get_current_datetime
510
# after it's had time to bake to avoid import errors with
511
# mismatched versions.
512
datetime_now = datetime.datetime.now(datetime.timezone.utc)
513
if remove_tzinfo:
514
datetime_now = datetime_now.replace(tzinfo=None)
515
return datetime_now
516
517