Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/utils.py
1566 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import base64
14
import contextlib
15
import csv
16
import datetime
17
import os
18
import signal
19
import subprocess
20
import sys
21
22
from awscli.compat import (
23
StringIO,
24
get_binary_stdout,
25
get_popen_kwargs_for_pager_cmd,
26
)
27
28
29
def split_on_commas(value):
30
if not any(char in value for char in ['"', '\\', "'", ']', '[']):
31
# No quotes or escaping, just use a simple split.
32
return value.split(',')
33
elif not any(char in value for char in ['"', "'", '[', ']']):
34
# Simple escaping, let the csv module handle it.
35
return list(csv.reader(StringIO(value), escapechar='\\'))[0]
36
else:
37
# If there's quotes for the values, we have to handle this
38
# ourselves.
39
return _split_with_quotes(value)
40
41
42
def _split_with_quotes(value):
43
try:
44
parts = list(csv.reader(StringIO(value), escapechar='\\'))[0]
45
except csv.Error:
46
raise ValueError("Bad csv value: %s" % value)
47
iter_parts = iter(parts)
48
new_parts = []
49
for part in iter_parts:
50
# Find the first quote
51
quote_char = _find_quote_char_in_part(part)
52
53
# Find an opening list bracket
54
list_start = part.find('=[')
55
56
if (
57
list_start >= 0
58
and value.find(']') != -1
59
and (quote_char is None or part.find(quote_char) > list_start)
60
):
61
# This is a list, eat all the items until the end
62
if ']' in part:
63
# Short circuit for only one item
64
new_chunk = part
65
else:
66
new_chunk = _eat_items(value, iter_parts, part, ']')
67
list_items = _split_with_quotes(new_chunk[list_start + 2 : -1])
68
new_chunk = new_chunk[: list_start + 1] + ','.join(list_items)
69
new_parts.append(new_chunk)
70
continue
71
elif quote_char is None:
72
new_parts.append(part)
73
continue
74
elif part.count(quote_char) == 2:
75
# Starting and ending quote are in this part.
76
# While it's not needed right now, this will
77
# break down if we ever need to escape quotes while
78
# quoting a value.
79
new_parts.append(part.replace(quote_char, ''))
80
continue
81
# Now that we've found a starting quote char, we
82
# need to combine the parts until we encounter an end quote.
83
new_chunk = _eat_items(value, iter_parts, part, quote_char, quote_char)
84
new_parts.append(new_chunk)
85
return new_parts
86
87
88
def _eat_items(value, iter_parts, part, end_char, replace_char=''):
89
"""
90
Eat items from an iterator, optionally replacing characters with
91
a blank and stopping when the end_char has been reached.
92
"""
93
current = part
94
chunks = [current.replace(replace_char, '')]
95
while True:
96
try:
97
current = next(iter_parts)
98
except StopIteration:
99
raise ValueError(value)
100
chunks.append(current.replace(replace_char, ''))
101
if current.endswith(end_char):
102
break
103
return ','.join(chunks)
104
105
106
def _find_quote_char_in_part(part):
107
"""
108
Returns a single or double quote character, whichever appears first in the
109
given string. None is returned if the given string doesn't have a single or
110
double quote character.
111
"""
112
quote_char = None
113
for ch in part:
114
if ch in ('"', "'"):
115
quote_char = ch
116
break
117
return quote_char
118
119
120
def find_service_and_method_in_event_name(event_name):
121
"""
122
Grabs the service id and the operation name from an event name.
123
This is making the assumption that the event name is in the form
124
event.service.operation.
125
"""
126
split_event = event_name.split('.')[1:]
127
service_name = None
128
if len(split_event) > 0:
129
service_name = split_event[0]
130
131
operation_name = None
132
if len(split_event) > 1:
133
operation_name = split_event[1]
134
return service_name, operation_name
135
136
137
def is_document_type(shape):
138
"""Check if shape is a document type"""
139
return getattr(shape, 'is_document_type', False)
140
141
142
def is_document_type_container(shape):
143
"""Check if the shape is a document type or wraps document types
144
145
This is helpful to determine if a shape purely deals with document types
146
whether the shape is a document type or it is lists or maps whose base
147
values are document types.
148
"""
149
if not shape:
150
return False
151
recording_visitor = ShapeRecordingVisitor()
152
ShapeWalker().walk(shape, recording_visitor)
153
end_shape = recording_visitor.visited.pop()
154
if not is_document_type(end_shape):
155
return False
156
for shape in recording_visitor.visited:
157
if shape.type_name not in ['list', 'map']:
158
return False
159
return True
160
161
162
def is_streaming_blob_type(shape):
163
"""Check if the shape is a streaming blob type."""
164
return (
165
shape
166
and shape.type_name == 'blob'
167
and shape.serialization.get('streaming', False)
168
)
169
170
171
def is_tagged_union_type(shape):
172
"""Check if the shape is a tagged union structure."""
173
return getattr(shape, 'is_tagged_union', False)
174
175
176
def operation_uses_document_types(operation_model):
177
"""Check if document types are ever used in the operation"""
178
recording_visitor = ShapeRecordingVisitor()
179
walker = ShapeWalker()
180
walker.walk(operation_model.input_shape, recording_visitor)
181
walker.walk(operation_model.output_shape, recording_visitor)
182
for visited_shape in recording_visitor.visited:
183
if is_document_type(visited_shape):
184
return True
185
return False
186
187
188
def json_encoder(obj):
189
"""JSON encoder that formats datetimes as ISO8601 format
190
and encodes bytes to UTF-8 Base64 string."""
191
if isinstance(obj, datetime.datetime):
192
return obj.isoformat()
193
elif isinstance(obj, bytes):
194
return base64.b64encode(obj).decode("utf-8")
195
else:
196
raise TypeError('Encountered unrecognized type in JSON encoder.')
197
198
199
@contextlib.contextmanager
200
def ignore_ctrl_c():
201
original = signal.signal(signal.SIGINT, signal.SIG_IGN)
202
try:
203
yield
204
finally:
205
signal.signal(signal.SIGINT, original)
206
207
208
def emit_top_level_args_parsed_event(session, args):
209
session.emit('top-level-args-parsed', parsed_args=args, session=session)
210
211
212
def is_a_tty():
213
try:
214
return os.isatty(sys.stdout.fileno())
215
except Exception:
216
return False
217
218
219
class OutputStreamFactory:
220
def __init__(self, popen=None):
221
self._popen = popen
222
if popen is None:
223
self._popen = subprocess.Popen
224
225
@contextlib.contextmanager
226
def get_pager_stream(self, preferred_pager=None):
227
popen_kwargs = self._get_process_pager_kwargs(preferred_pager)
228
try:
229
process = self._popen(**popen_kwargs)
230
yield process.stdin
231
except OSError:
232
# Ignore IOError since this can commonly be raised when a pager
233
# is closed abruptly and causes a broken pipe.
234
pass
235
finally:
236
process.communicate()
237
238
@contextlib.contextmanager
239
def get_stdout_stream(self):
240
yield get_binary_stdout()
241
242
def _get_process_pager_kwargs(self, pager_cmd):
243
kwargs = get_popen_kwargs_for_pager_cmd(pager_cmd)
244
kwargs['stdin'] = subprocess.PIPE
245
return kwargs
246
247
248
def write_exception(ex, outfile):
249
outfile.write("\n")
250
outfile.write(str(ex))
251
outfile.write("\n")
252
253
254
class ShapeWalker:
255
def walk(self, shape, visitor):
256
"""Walk through and visit shapes for introspection
257
258
:type shape: botocore.model.Shape
259
:param shape: Shape to walk
260
261
:type visitor: BaseShapeVisitor
262
:param visitor: The visitor to call when walking a shape
263
"""
264
265
if shape is None:
266
return
267
stack = []
268
return self._walk(shape, visitor, stack)
269
270
def _walk(self, shape, visitor, stack):
271
if shape.name in stack:
272
return
273
stack.append(shape.name)
274
getattr(self, '_walk_%s' % shape.type_name, self._default_scalar_walk)(
275
shape, visitor, stack
276
)
277
stack.pop()
278
279
def _walk_structure(self, shape, visitor, stack):
280
self._do_shape_visit(shape, visitor)
281
for _, member_shape in shape.members.items():
282
self._walk(member_shape, visitor, stack)
283
284
def _walk_list(self, shape, visitor, stack):
285
self._do_shape_visit(shape, visitor)
286
self._walk(shape.member, visitor, stack)
287
288
def _walk_map(self, shape, visitor, stack):
289
self._do_shape_visit(shape, visitor)
290
self._walk(shape.value, visitor, stack)
291
292
def _default_scalar_walk(self, shape, visitor, stack):
293
self._do_shape_visit(shape, visitor)
294
295
def _do_shape_visit(self, shape, visitor):
296
visitor.visit_shape(shape)
297
298
299
class BaseShapeVisitor:
300
"""Visit shape encountered by ShapeWalker"""
301
302
def visit_shape(self, shape):
303
pass
304
305
306
class ShapeRecordingVisitor(BaseShapeVisitor):
307
"""Record shapes visited by ShapeWalker"""
308
309
def __init__(self):
310
self.visited = []
311
312
def visit_shape(self, shape):
313
self.visited.append(shape)
314
315
# NOTE: The following interfaces are considered private and are subject
316
# to abrupt breaking changes. Please do not use them directly.
317
318
try:
319
from botocore.utils import create_nested_client as create_client
320
except ImportError:
321
322
def create_client(session, service_name, **kwargs):
323
return session.create_client(service_name, **kwargs)
324
325
326
def create_nested_client(session, service_name, **kwargs):
327
return create_client(session, service_name, **kwargs)
328
329