Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/utils.py
2624 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import base64
14
import contextlib
15
import csv
16
import datetime
17
import os
18
import signal
19
import subprocess
20
import sys
21
22
from awscli.compat import (
23
StringIO,
24
get_binary_stdout,
25
get_popen_kwargs_for_pager_cmd,
26
)
27
28
29
def split_on_commas(value):
30
if not any(char in value for char in ['"', '\\', "'", ']', '[']):
31
# No quotes or escaping, just use a simple split.
32
return value.split(',')
33
elif not any(char in value for char in ['"', "'", '[', ']']):
34
# Simple escaping, let the csv module handle it.
35
return list(csv.reader(StringIO(value), escapechar='\\'))[0]
36
else:
37
# If there's quotes for the values, we have to handle this
38
# ourselves.
39
return _split_with_quotes(value)
40
41
42
def _split_with_quotes(value):
43
try:
44
parts = list(csv.reader(StringIO(value), escapechar='\\'))[0]
45
except csv.Error:
46
raise ValueError("Bad csv value: %s" % value)
47
iter_parts = iter(parts)
48
new_parts = []
49
for part in iter_parts:
50
# Find the first quote
51
quote_char = _find_quote_char_in_part(part)
52
53
# Find an opening list bracket
54
list_start = part.find('=[')
55
56
if (
57
list_start >= 0
58
and value.find(']') != -1
59
and (quote_char is None or part.find(quote_char) > list_start)
60
):
61
# This is a list, eat all the items until the end
62
if ']' in part:
63
# Short circuit for only one item
64
new_chunk = part
65
else:
66
new_chunk = _eat_items(value, iter_parts, part, ']')
67
list_items = _split_with_quotes(new_chunk[list_start + 2 : -1])
68
new_chunk = new_chunk[: list_start + 1] + ','.join(list_items)
69
new_parts.append(new_chunk)
70
continue
71
elif quote_char is None:
72
new_parts.append(part)
73
continue
74
elif part.count(quote_char) == 2:
75
# Starting and ending quote are in this part.
76
# While it's not needed right now, this will
77
# break down if we ever need to escape quotes while
78
# quoting a value.
79
new_parts.append(part.replace(quote_char, ''))
80
continue
81
# Now that we've found a starting quote char, we
82
# need to combine the parts until we encounter an end quote.
83
new_chunk = _eat_items(value, iter_parts, part, quote_char, quote_char)
84
new_parts.append(new_chunk)
85
return new_parts
86
87
88
def _eat_items(value, iter_parts, part, end_char, replace_char=''):
89
"""
90
Eat items from an iterator, optionally replacing characters with
91
a blank and stopping when the end_char has been reached.
92
"""
93
current = part
94
chunks = [current.replace(replace_char, '')]
95
while True:
96
try:
97
current = next(iter_parts)
98
except StopIteration:
99
raise ValueError(value)
100
chunks.append(current.replace(replace_char, ''))
101
if current.endswith(end_char):
102
break
103
return ','.join(chunks)
104
105
106
def _find_quote_char_in_part(part):
107
"""
108
Returns a single or double quote character, whichever appears first in the
109
given string. None is returned if the given string doesn't have a single or
110
double quote character.
111
"""
112
quote_char = None
113
for ch in part:
114
if ch in ('"', "'"):
115
quote_char = ch
116
break
117
return quote_char
118
119
120
def find_service_and_method_in_event_name(event_name):
121
"""
122
Grabs the service id and the operation name from an event name.
123
This is making the assumption that the event name is in the form
124
event.service.operation.
125
"""
126
split_event = event_name.split('.')[1:]
127
service_name = None
128
if len(split_event) > 0:
129
service_name = split_event[0]
130
131
operation_name = None
132
if len(split_event) > 1:
133
operation_name = split_event[1]
134
return service_name, operation_name
135
136
137
def resolve_v2_debug_mode(args):
138
# Resolve whether v2-debug mode is enabled,
139
# following the correct precedence order.
140
if args is None:
141
return False
142
if getattr(args, 'v2_debug', False):
143
return True
144
if os.environ.get('AWS_CLI_UPGRADE_DEBUG_MODE', '').lower() == 'true':
145
return True
146
return False
147
148
149
def is_document_type(shape):
150
"""Check if shape is a document type"""
151
return getattr(shape, 'is_document_type', False)
152
153
154
def is_document_type_container(shape):
155
"""Check if the shape is a document type or wraps document types
156
157
This is helpful to determine if a shape purely deals with document types
158
whether the shape is a document type or it is lists or maps whose base
159
values are document types.
160
"""
161
if not shape:
162
return False
163
recording_visitor = ShapeRecordingVisitor()
164
ShapeWalker().walk(shape, recording_visitor)
165
end_shape = recording_visitor.visited.pop()
166
if not is_document_type(end_shape):
167
return False
168
for shape in recording_visitor.visited:
169
if shape.type_name not in ['list', 'map']:
170
return False
171
return True
172
173
174
def is_streaming_blob_type(shape):
175
"""Check if the shape is a streaming blob type."""
176
return (
177
shape
178
and shape.type_name == 'blob'
179
and shape.serialization.get('streaming', False)
180
)
181
182
183
def is_tagged_union_type(shape):
184
"""Check if the shape is a tagged union structure."""
185
return getattr(shape, 'is_tagged_union', False)
186
187
188
def operation_uses_document_types(operation_model):
189
"""Check if document types are ever used in the operation"""
190
recording_visitor = ShapeRecordingVisitor()
191
walker = ShapeWalker()
192
walker.walk(operation_model.input_shape, recording_visitor)
193
walker.walk(operation_model.output_shape, recording_visitor)
194
for visited_shape in recording_visitor.visited:
195
if is_document_type(visited_shape):
196
return True
197
return False
198
199
200
def json_encoder(obj):
201
"""JSON encoder that formats datetimes as ISO8601 format
202
and encodes bytes to UTF-8 Base64 string."""
203
if isinstance(obj, datetime.datetime):
204
return obj.isoformat()
205
elif isinstance(obj, bytes):
206
return base64.b64encode(obj).decode("utf-8")
207
else:
208
raise TypeError('Encountered unrecognized type in JSON encoder.')
209
210
211
@contextlib.contextmanager
212
def ignore_ctrl_c():
213
original = signal.signal(signal.SIGINT, signal.SIG_IGN)
214
try:
215
yield
216
finally:
217
signal.signal(signal.SIGINT, original)
218
219
220
def emit_top_level_args_parsed_event(session, args, remaining=None):
221
session.emit(
222
'top-level-args-parsed',
223
parsed_args=args,
224
session=session,
225
remaining_args=remaining,
226
)
227
228
229
def is_a_tty():
230
try:
231
return os.isatty(sys.stdout.fileno())
232
except Exception:
233
return False
234
235
236
class OutputStreamFactory:
237
def __init__(self, popen=None):
238
self._popen = popen
239
if popen is None:
240
self._popen = subprocess.Popen
241
242
@contextlib.contextmanager
243
def get_pager_stream(self, preferred_pager=None):
244
popen_kwargs = self._get_process_pager_kwargs(preferred_pager)
245
try:
246
process = self._popen(**popen_kwargs)
247
yield process.stdin
248
except OSError:
249
# Ignore IOError since this can commonly be raised when a pager
250
# is closed abruptly and causes a broken pipe.
251
pass
252
finally:
253
process.communicate()
254
255
@contextlib.contextmanager
256
def get_stdout_stream(self):
257
yield get_binary_stdout()
258
259
def _get_process_pager_kwargs(self, pager_cmd):
260
kwargs = get_popen_kwargs_for_pager_cmd(pager_cmd)
261
kwargs['stdin'] = subprocess.PIPE
262
return kwargs
263
264
265
def write_exception(ex, outfile):
266
outfile.write("\n")
267
outfile.write(str(ex))
268
outfile.write("\n")
269
270
271
class ShapeWalker:
272
def walk(self, shape, visitor):
273
"""Walk through and visit shapes for introspection
274
275
:type shape: botocore.model.Shape
276
:param shape: Shape to walk
277
278
:type visitor: BaseShapeVisitor
279
:param visitor: The visitor to call when walking a shape
280
"""
281
282
if shape is None:
283
return
284
stack = []
285
return self._walk(shape, visitor, stack)
286
287
def _walk(self, shape, visitor, stack):
288
if shape.name in stack:
289
return
290
stack.append(shape.name)
291
getattr(self, '_walk_%s' % shape.type_name, self._default_scalar_walk)(
292
shape, visitor, stack
293
)
294
stack.pop()
295
296
def _walk_structure(self, shape, visitor, stack):
297
self._do_shape_visit(shape, visitor)
298
for _, member_shape in shape.members.items():
299
self._walk(member_shape, visitor, stack)
300
301
def _walk_list(self, shape, visitor, stack):
302
self._do_shape_visit(shape, visitor)
303
self._walk(shape.member, visitor, stack)
304
305
def _walk_map(self, shape, visitor, stack):
306
self._do_shape_visit(shape, visitor)
307
self._walk(shape.value, visitor, stack)
308
309
def _default_scalar_walk(self, shape, visitor, stack):
310
self._do_shape_visit(shape, visitor)
311
312
def _do_shape_visit(self, shape, visitor):
313
visitor.visit_shape(shape)
314
315
316
class BaseShapeVisitor:
317
"""Visit shape encountered by ShapeWalker"""
318
319
def visit_shape(self, shape):
320
pass
321
322
323
class ShapeRecordingVisitor(BaseShapeVisitor):
324
"""Record shapes visited by ShapeWalker"""
325
326
def __init__(self):
327
self.visited = []
328
329
def visit_shape(self, shape):
330
self.visited.append(shape)
331
332
# NOTE: The following interfaces are considered private and are subject
333
# to abrupt breaking changes. Please do not use them directly.
334
335
try:
336
from botocore.utils import create_nested_client as create_client
337
except ImportError:
338
339
def create_client(session, service_name, **kwargs):
340
return session.create_client(service_name, **kwargs)
341
342
343
def create_nested_client(session, service_name, **kwargs):
344
return create_client(session, service_name, **kwargs)
345
346