Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/argprocess.py
1566 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
"""Module for processing CLI args."""
14
15
import logging
16
import os
17
18
from botocore.compat import OrderedDict, json
19
from botocore.utils import is_json_value_header
20
21
from awscli import COMPLEX_TYPES, SCALAR_TYPES, shorthand
22
from awscli.utils import (
23
find_service_and_method_in_event_name,
24
is_document_type,
25
is_document_type_container,
26
)
27
28
LOG = logging.getLogger('awscli.argprocess')
29
30
31
class ParamError(Exception):
32
def __init__(self, cli_name, message):
33
"""
34
35
:type cli_name: string
36
:param cli_name: The complete cli argument name,
37
e.g. "--foo-bar". It should include the leading
38
hyphens if that's how a user would specify the name.
39
40
:type message: string
41
:param message: The error message to display to the user.
42
43
"""
44
full_message = "Error parsing parameter '%s': %s" % (cli_name, message)
45
super().__init__(full_message)
46
self.cli_name = cli_name
47
self.message = message
48
49
50
class ParamSyntaxError(Exception):
51
pass
52
53
54
class ParamUnknownKeyError(Exception):
55
def __init__(self, key, valid_keys):
56
valid_keys = ', '.join(valid_keys)
57
full_message = (
58
f"Unknown key '{key}', valid choices are: {valid_key}"
59
)
60
super().__init__(full_message)
61
62
63
class TooComplexError(Exception):
64
pass
65
66
67
def unpack_argument(
68
session, service_name, operation_name, cli_argument, value
69
):
70
"""
71
Unpack an argument's value from the commandline. This is part one of a two
72
step process in handling commandline arguments. Emits the load-cli-arg
73
event with service, operation, and parameter names. Example::
74
75
load-cli-arg.ec2.describe-instances.foo
76
77
"""
78
param_name = getattr(cli_argument, 'name', 'anonymous')
79
80
value_override = session.emit_first_non_none_response(
81
f'load-cli-arg.{service_name}.{operation_name}.{param_name}',
82
param=cli_argument,
83
value=value,
84
service_name=service_name,
85
operation_name=operation_name,
86
)
87
88
if value_override is not None:
89
value = value_override
90
91
return value
92
93
94
def detect_shape_structure(param):
95
stack = []
96
return _detect_shape_structure(param, stack)
97
98
99
def _detect_shape_structure(param, stack):
100
if param.name in stack:
101
return 'recursive'
102
else:
103
stack.append(param.name)
104
try:
105
if param.type_name in SCALAR_TYPES:
106
return 'scalar'
107
elif param.type_name == 'structure':
108
sub_types = [
109
_detect_shape_structure(p, stack)
110
for p in param.members.values()
111
]
112
# We're distinguishing between structure(scalar)
113
# and structure(scalars), because for the case of
114
# a single scalar in a structure we can simplify
115
# more than a structure(scalars).
116
if len(sub_types) == 1 and all(p == 'scalar' for p in sub_types):
117
return 'structure(scalar)'
118
elif len(sub_types) > 1 and all(p == 'scalar' for p in sub_types):
119
return 'structure(scalars)'
120
else:
121
return 'structure(%s)' % ', '.join(sorted(set(sub_types)))
122
elif param.type_name == 'list':
123
return 'list-%s' % _detect_shape_structure(param.member, stack)
124
elif param.type_name == 'map':
125
if param.value.type_name in SCALAR_TYPES:
126
return 'map-scalar'
127
else:
128
return 'map-%s' % _detect_shape_structure(param.value, stack)
129
finally:
130
stack.pop()
131
132
133
def unpack_cli_arg(cli_argument, value):
134
"""
135
Parses and unpacks the encoded string command line parameter
136
and returns native Python data structures that can be passed
137
to the Operation.
138
139
:type cli_argument: :class:`awscli.arguments.BaseCLIArgument`
140
:param cli_argument: The CLI argument object.
141
142
:param value: The value of the parameter. This can be a number of
143
different python types (str, list, etc). This is the value as
144
it's specified on the command line.
145
146
:return: The "unpacked" argument than can be sent to the `Operation`
147
object in python.
148
"""
149
return _unpack_cli_arg(
150
cli_argument.argument_model, value, cli_argument.cli_name
151
)
152
153
154
def _special_type(model):
155
# check if model is jsonvalue header and that value is serializable
156
if (
157
model.serialization.get('jsonvalue')
158
and model.serialization.get('location') == 'header'
159
and model.type_name == 'string'
160
):
161
return True
162
return False
163
164
165
def _unpack_cli_arg(argument_model, value, cli_name):
166
if is_json_value_header(argument_model) or is_document_type(
167
argument_model
168
):
169
return _unpack_json_cli_arg(argument_model, value, cli_name)
170
elif argument_model.type_name in SCALAR_TYPES:
171
return unpack_scalar_cli_arg(argument_model, value, cli_name)
172
elif argument_model.type_name in COMPLEX_TYPES:
173
return _unpack_complex_cli_arg(argument_model, value, cli_name)
174
else:
175
return str(value)
176
177
178
def _unpack_json_cli_arg(argument_model, value, cli_name):
179
try:
180
return json.loads(value, object_pairs_hook=OrderedDict)
181
except ValueError as e:
182
raise ParamError(
183
cli_name, f"Invalid JSON: {e}\nJSON received: {value}"
184
)
185
186
187
def _unpack_complex_cli_arg(argument_model, value, cli_name):
188
type_name = argument_model.type_name
189
if type_name == 'structure' or type_name == 'map':
190
if value.lstrip()[0] == '{':
191
return _unpack_json_cli_arg(argument_model, value, cli_name)
192
raise ParamError(cli_name, f"Invalid JSON:\n{value}")
193
elif type_name == 'list':
194
if isinstance(value, str):
195
if value.lstrip()[0] == '[':
196
return _unpack_json_cli_arg(argument_model, value, cli_name)
197
elif isinstance(value, list) and len(value) == 1:
198
single_value = value[0].strip()
199
if single_value and single_value[0] == '[':
200
return _unpack_json_cli_arg(argument_model, value[0], cli_name)
201
try:
202
# There's a couple of cases remaining here.
203
# 1. It's possible that this is just a list of strings, i.e
204
# --security-group-ids sg-1 sg-2 sg-3 => ['sg-1', 'sg-2', 'sg-3']
205
# 2. It's possible this is a list of json objects:
206
# --filters '{"Name": ..}' '{"Name": ...}'
207
member_shape_model = argument_model.member
208
return [
209
_unpack_cli_arg(member_shape_model, v, cli_name) for v in value
210
]
211
except (ValueError, TypeError):
212
# The list params don't have a name/cli_name attached to them
213
# so they will have bad error messages. We're going to
214
# attach the parent parameter to this error message to provide
215
# a more helpful error message.
216
raise ParamError(cli_name, value[0])
217
218
219
def unpack_scalar_cli_arg(argument_model, value, cli_name=''):
220
# Note the cli_name is used strictly for error reporting. It's
221
# not required to use unpack_scalar_cli_arg
222
if (
223
argument_model.type_name == 'integer'
224
or argument_model.type_name == 'long'
225
):
226
return int(value)
227
elif (
228
argument_model.type_name == 'float'
229
or argument_model.type_name == 'double'
230
):
231
# TODO: losing precision on double types
232
return float(value)
233
elif (
234
argument_model.type_name == 'blob'
235
and argument_model.serialization.get('streaming')
236
):
237
file_path = os.path.expandvars(value)
238
file_path = os.path.expanduser(file_path)
239
if not os.path.isfile(file_path):
240
msg = 'Blob values must be a path to a file.'
241
raise ParamError(cli_name, msg)
242
return open(file_path, 'rb')
243
elif argument_model.type_name == 'boolean':
244
if isinstance(value, str) and value.lower() == 'false':
245
return False
246
return bool(value)
247
else:
248
return value
249
250
251
def _supports_shorthand_syntax(model):
252
# Shorthand syntax is only supported if:
253
#
254
# 1. The argument is not a document type nor is a wrapper around a document
255
# type (e.g. is a list of document types or a map of document types). These
256
# should all be expressed as JSON input.
257
#
258
# 2. The argument is sufficiently complex, that is, it's base type is
259
# a complex type *and* if it's a list, then it can't be a list of
260
# scalar types.
261
if is_document_type_container(model):
262
return False
263
return _is_complex_shape(model)
264
265
266
def _is_complex_shape(model):
267
if model.type_name not in ['structure', 'list', 'map']:
268
return False
269
elif model.type_name == 'list':
270
if model.member.type_name not in ['structure', 'list', 'map']:
271
return False
272
return True
273
274
275
class ParamShorthand:
276
def _uses_old_list_case(self, service_id, operation_name, argument_name):
277
"""
278
Determines whether a given operation for a service needs to use the
279
deprecated shorthand parsing case for lists of structures that only have
280
a single member.
281
"""
282
cases = {
283
'firehose': {'put-record-batch': ['records']},
284
'workspaces': {
285
'reboot-workspaces': ['reboot-workspace-requests'],
286
'rebuild-workspaces': ['rebuild-workspace-requests'],
287
'terminate-workspaces': ['terminate-workspace-requests'],
288
},
289
'elastic-load-balancing': {
290
'remove-tags': ['tags'],
291
'describe-instance-health': ['instances'],
292
'deregister-instances-from-load-balancer': ['instances'],
293
'register-instances-with-load-balancer': ['instances'],
294
},
295
}
296
cases = cases.get(service_id, {}).get(operation_name, [])
297
return argument_name in cases
298
299
300
class ParamShorthandParser(ParamShorthand):
301
def __init__(self):
302
self._parser = shorthand.ShorthandParser()
303
self._visitor = shorthand.BackCompatVisitor()
304
305
def __call__(self, cli_argument, value, event_name, **kwargs):
306
"""Attempt to parse shorthand syntax for values.
307
308
This is intended to be hooked up as an event handler (hence the
309
**kwargs). Given ``param`` object and its string ``value``,
310
figure out if we can parse it. If we can parse it, we return
311
the parsed value (typically some sort of python dict).
312
313
:type cli_argument: :class:`awscli.arguments.BaseCLIArgument`
314
:param cli_argument: The CLI argument object.
315
316
:type param: :class:`botocore.parameters.Parameter`
317
:param param: The parameter object (includes various metadata
318
about the parameter).
319
320
:type value: str
321
:param value: The value for the parameter type on the command
322
line, e.g ``--foo this_value``, value would be ``"this_value"``.
323
324
:returns: If we can parse the value we return the parsed value.
325
If it looks like JSON, we return None (which tells the event
326
emitter to use the default ``unpack_cli_arg`` provided that
327
no other event handlers can parsed the value). If we
328
run into an error parsing the value, a ``ParamError`` will
329
be raised.
330
331
"""
332
333
if not self._should_parse_as_shorthand(cli_argument, value):
334
return
335
else:
336
service_id, operation_name = find_service_and_method_in_event_name(
337
event_name
338
)
339
return self._parse_as_shorthand(
340
cli_argument, value, service_id, operation_name
341
)
342
343
def _parse_as_shorthand(
344
self, cli_argument, value, service_id, operation_name
345
):
346
try:
347
LOG.debug("Parsing param %s as shorthand", cli_argument.cli_name)
348
handled_value = self._handle_special_cases(
349
cli_argument, value, service_id, operation_name
350
)
351
if handled_value is not None:
352
return handled_value
353
if isinstance(value, list):
354
# Because of how we're using argparse, list shapes
355
# are configured with nargs='+' which means the ``value``
356
# is given to us "conveniently" as a list. When
357
# this happens we need to parse each list element
358
# individually.
359
parsed = [self._parser.parse(v) for v in value]
360
self._visitor.visit(parsed, cli_argument.argument_model)
361
else:
362
# Otherwise value is just a string.
363
parsed = self._parser.parse(value)
364
self._visitor.visit(parsed, cli_argument.argument_model)
365
except shorthand.ShorthandParseError as e:
366
raise ParamError(cli_argument.cli_name, str(e))
367
except (ParamError, ParamUnknownKeyError) as e:
368
# The shorthand parse methods don't have the cli_name,
369
# so any ParamError won't have this value. To accommodate
370
# this, ParamErrors are caught and reraised with the cli_name
371
# injected.
372
raise ParamError(cli_argument.cli_name, str(e))
373
return parsed
374
375
def _handle_special_cases(
376
self, cli_argument, value, service_id, operation_name
377
):
378
# We need to handle a few special cases that the previous
379
# parser handled in order to stay backwards compatible.
380
model = cli_argument.argument_model
381
if (
382
model.type_name == 'list'
383
and model.member.type_name == 'structure'
384
and len(model.member.members) == 1
385
and self._uses_old_list_case(
386
service_id, operation_name, cli_argument.name
387
)
388
):
389
# First special case is handling a list of structures
390
# of a single element such as:
391
#
392
# --instance-ids id-1 id-2 id-3
393
#
394
# gets parsed as:
395
#
396
# [{"InstanceId": "id-1"}, {"InstanceId": "id-2"},
397
# {"InstanceId": "id-3"}]
398
key_name = list(model.member.members.keys())[0]
399
new_values = [{key_name: v} for v in value]
400
return new_values
401
elif (
402
model.type_name == 'structure'
403
and len(model.members) == 1
404
and 'Value' in model.members
405
and model.members['Value'].type_name == 'string'
406
and '=' not in value
407
):
408
# Second special case is where a structure of a single
409
# value whose member name is "Value" can be specified
410
# as:
411
# --instance-terminate-behavior shutdown
412
#
413
# gets parsed as:
414
# {"Value": "shutdown"}
415
return {'Value': value}
416
417
def _should_parse_as_shorthand(self, cli_argument, value):
418
# We first need to make sure this is a parameter that qualifies
419
# for simplification. The first short-circuit case is if it looks
420
# like json we immediately return.
421
if value and isinstance(value, list):
422
check_val = value[0]
423
else:
424
check_val = value
425
if isinstance(check_val, str) and check_val.strip().startswith(
426
('[', '{')
427
):
428
LOG.debug(
429
"Param %s looks like JSON, not considered for "
430
"param shorthand.",
431
cli_argument.py_name,
432
)
433
return False
434
model = cli_argument.argument_model
435
return _supports_shorthand_syntax(model)
436
437
438
class ParamShorthandDocGen(ParamShorthand):
439
"""Documentation generator for param shorthand syntax."""
440
441
_DONT_DOC = object()
442
_MAX_STACK = 3
443
444
def supports_shorthand(self, argument_model):
445
"""Checks if a CLI argument supports shorthand syntax."""
446
if argument_model is not None:
447
return _supports_shorthand_syntax(argument_model)
448
return False
449
450
def generate_shorthand_example(
451
self, cli_argument, service_id, operation_name
452
):
453
"""Generate documentation for a CLI argument.
454
455
:type cli_argument: awscli.arguments.BaseCLIArgument
456
:param cli_argument: The CLI argument which to generate
457
documentation for.
458
459
:return: Returns either a string or ``None``. If a string
460
is returned, it is the generated shorthand example.
461
If a value of ``None`` is returned then this indicates
462
that no shorthand syntax is available for the provided
463
``argument_model``.
464
465
"""
466
docstring = self._handle_special_cases(
467
cli_argument, service_id, operation_name
468
)
469
if docstring is self._DONT_DOC:
470
return None
471
elif docstring:
472
return docstring
473
474
# Otherwise we fall back to the normal docgen for shorthand
475
# syntax.
476
stack = []
477
try:
478
if cli_argument.argument_model.type_name == 'list':
479
argument_model = cli_argument.argument_model.member
480
return self._shorthand_docs(argument_model, stack) + ' ...'
481
else:
482
return self._shorthand_docs(cli_argument.argument_model, stack)
483
except TooComplexError:
484
return ''
485
486
def _handle_special_cases(self, cli_argument, service_id, operation_name):
487
model = cli_argument.argument_model
488
if (
489
model.type_name == 'list'
490
and model.member.type_name == 'structure'
491
and len(model.member.members) == 1
492
and self._uses_old_list_case(
493
service_id, operation_name, cli_argument.name
494
)
495
):
496
member_name = list(model.member.members)[0]
497
# Handle special case where the min/max is exactly one.
498
metadata = model.metadata
499
cli_name = cli_argument.cli_name
500
if metadata.get('min') == 1 and metadata.get('max') == 1:
501
return f'{cli_name} {member_name}1'
502
return f'{cli_name} {member_name}1 {member_name}2 {member_name}3'
503
elif (
504
model.type_name == 'structure'
505
and len(model.members) == 1
506
and 'Value' in model.members
507
and model.members['Value'].type_name == 'string'
508
):
509
return self._DONT_DOC
510
return ''
511
512
def _shorthand_docs(self, argument_model, stack):
513
if len(stack) > self._MAX_STACK:
514
raise TooComplexError()
515
if argument_model.type_name == 'structure':
516
return self._structure_docs(argument_model, stack)
517
elif argument_model.type_name == 'list':
518
return self._list_docs(argument_model, stack)
519
elif argument_model.type_name == 'map':
520
return self._map_docs(argument_model, stack)
521
else:
522
return argument_model.type_name
523
524
def _list_docs(self, argument_model, stack):
525
list_member = argument_model.member
526
stack.append(list_member.name)
527
try:
528
element_docs = self._shorthand_docs(argument_model.member, stack)
529
finally:
530
stack.pop()
531
if list_member.type_name in COMPLEX_TYPES or len(stack) > 1:
532
return '[%s,%s]' % (element_docs, element_docs)
533
else:
534
return '%s,%s' % (element_docs, element_docs)
535
536
def _map_docs(self, argument_model, stack):
537
k = argument_model.key
538
value_docs = self._shorthand_docs(argument_model.value, stack)
539
start = 'KeyName1=%s,KeyName2=%s' % (value_docs, value_docs)
540
if k.enum and not stack:
541
start += '\n\nWhere valid key names are:\n'
542
for enum in k.enum:
543
start += ' %s\n' % enum
544
elif stack:
545
start = '{%s}' % start
546
return start
547
548
def _structure_docs(self, argument_model, stack):
549
parts = []
550
for name, member_shape in argument_model.members.items():
551
if is_document_type_container(member_shape):
552
continue
553
parts.append(self._member_docs(name, member_shape, stack))
554
inner_part = ','.join(parts)
555
if not stack:
556
return inner_part
557
return '{%s}' % inner_part
558
559
def _member_docs(self, name, shape, stack):
560
if stack.count(shape.name) > 0:
561
return '( ... recursive ... )'
562
stack.append(shape.name)
563
try:
564
value_doc = self._shorthand_docs(shape, stack)
565
finally:
566
stack.pop()
567
return '%s=%s' % (name, value_doc)
568
569