Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/argprocess.py
2630 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
"""Module for processing CLI args."""
14
15
import logging
16
import os
17
18
from botocore.compat import OrderedDict, json
19
from botocore.utils import is_json_value_header
20
21
from awscli import COMPLEX_TYPES, SCALAR_TYPES, shorthand
22
from awscli.utils import (
23
find_service_and_method_in_event_name,
24
is_document_type,
25
is_document_type_container,
26
)
27
28
LOG = logging.getLogger('awscli.argprocess')
29
30
31
class ParamError(Exception):
32
def __init__(self, cli_name, message):
33
"""
34
35
:type cli_name: string
36
:param cli_name: The complete cli argument name,
37
e.g. "--foo-bar". It should include the leading
38
hyphens if that's how a user would specify the name.
39
40
:type message: string
41
:param message: The error message to display to the user.
42
43
"""
44
full_message = "Error parsing parameter '%s': %s" % (cli_name, message)
45
super().__init__(full_message)
46
self.cli_name = cli_name
47
self.message = message
48
49
50
class ParamSyntaxError(Exception):
51
pass
52
53
54
class ParamUnknownKeyError(Exception):
55
def __init__(self, key, valid_keys):
56
valid_keys = ', '.join(valid_keys)
57
full_message = (
58
f"Unknown key '{key}', valid choices are: {valid_key}"
59
)
60
super().__init__(full_message)
61
62
63
class TooComplexError(Exception):
64
pass
65
66
67
def unpack_argument(
68
session, service_name, operation_name, cli_argument, value, parsed_globals
69
):
70
"""
71
Unpack an argument's value from the commandline. This is part one of a two
72
step process in handling commandline arguments. Emits the load-cli-arg
73
event with service, operation, and parameter names. Example::
74
75
load-cli-arg.ec2.describe-instances.foo
76
77
"""
78
param_name = getattr(cli_argument, 'name', 'anonymous')
79
80
value_override = session.emit_first_non_none_response(
81
f'load-cli-arg.{service_name}.{operation_name}.{param_name}',
82
param=cli_argument,
83
value=value,
84
service_name=service_name,
85
operation_name=operation_name,
86
parsed_globals=parsed_globals,
87
)
88
89
if value_override is not None:
90
value = value_override
91
92
return value
93
94
95
def detect_shape_structure(param):
96
stack = []
97
return _detect_shape_structure(param, stack)
98
99
100
def _detect_shape_structure(param, stack):
101
if param.name in stack:
102
return 'recursive'
103
else:
104
stack.append(param.name)
105
try:
106
if param.type_name in SCALAR_TYPES:
107
return 'scalar'
108
elif param.type_name == 'structure':
109
sub_types = [
110
_detect_shape_structure(p, stack)
111
for p in param.members.values()
112
]
113
# We're distinguishing between structure(scalar)
114
# and structure(scalars), because for the case of
115
# a single scalar in a structure we can simplify
116
# more than a structure(scalars).
117
if len(sub_types) == 1 and all(p == 'scalar' for p in sub_types):
118
return 'structure(scalar)'
119
elif len(sub_types) > 1 and all(p == 'scalar' for p in sub_types):
120
return 'structure(scalars)'
121
else:
122
return 'structure(%s)' % ', '.join(sorted(set(sub_types)))
123
elif param.type_name == 'list':
124
return 'list-%s' % _detect_shape_structure(param.member, stack)
125
elif param.type_name == 'map':
126
if param.value.type_name in SCALAR_TYPES:
127
return 'map-scalar'
128
else:
129
return 'map-%s' % _detect_shape_structure(param.value, stack)
130
finally:
131
stack.pop()
132
133
134
def unpack_cli_arg(cli_argument, value):
135
"""
136
Parses and unpacks the encoded string command line parameter
137
and returns native Python data structures that can be passed
138
to the Operation.
139
140
:type cli_argument: :class:`awscli.arguments.BaseCLIArgument`
141
:param cli_argument: The CLI argument object.
142
143
:param value: The value of the parameter. This can be a number of
144
different python types (str, list, etc). This is the value as
145
it's specified on the command line.
146
147
:return: The "unpacked" argument than can be sent to the `Operation`
148
object in python.
149
"""
150
return _unpack_cli_arg(
151
cli_argument.argument_model, value, cli_argument.cli_name
152
)
153
154
155
def _special_type(model):
156
# check if model is jsonvalue header and that value is serializable
157
if (
158
model.serialization.get('jsonvalue')
159
and model.serialization.get('location') == 'header'
160
and model.type_name == 'string'
161
):
162
return True
163
return False
164
165
166
def _unpack_cli_arg(argument_model, value, cli_name):
167
if is_json_value_header(argument_model) or is_document_type(
168
argument_model
169
):
170
return _unpack_json_cli_arg(argument_model, value, cli_name)
171
elif argument_model.type_name in SCALAR_TYPES:
172
return unpack_scalar_cli_arg(argument_model, value, cli_name)
173
elif argument_model.type_name in COMPLEX_TYPES:
174
return _unpack_complex_cli_arg(argument_model, value, cli_name)
175
else:
176
return str(value)
177
178
179
def _unpack_json_cli_arg(argument_model, value, cli_name):
180
try:
181
return json.loads(value, object_pairs_hook=OrderedDict)
182
except ValueError as e:
183
raise ParamError(
184
cli_name, f"Invalid JSON: {e}\nJSON received: {value}"
185
)
186
187
188
def _unpack_complex_cli_arg(argument_model, value, cli_name):
189
type_name = argument_model.type_name
190
if type_name == 'structure' or type_name == 'map':
191
if value.lstrip()[0] == '{':
192
return _unpack_json_cli_arg(argument_model, value, cli_name)
193
raise ParamError(cli_name, f"Invalid JSON:\n{value}")
194
elif type_name == 'list':
195
if isinstance(value, str):
196
if value.lstrip()[0] == '[':
197
return _unpack_json_cli_arg(argument_model, value, cli_name)
198
elif isinstance(value, list) and len(value) == 1:
199
single_value = value[0].strip()
200
if single_value and single_value[0] == '[':
201
return _unpack_json_cli_arg(argument_model, value[0], cli_name)
202
try:
203
# There's a couple of cases remaining here.
204
# 1. It's possible that this is just a list of strings, i.e
205
# --security-group-ids sg-1 sg-2 sg-3 => ['sg-1', 'sg-2', 'sg-3']
206
# 2. It's possible this is a list of json objects:
207
# --filters '{"Name": ..}' '{"Name": ...}'
208
member_shape_model = argument_model.member
209
return [
210
_unpack_cli_arg(member_shape_model, v, cli_name) for v in value
211
]
212
except (ValueError, TypeError):
213
# The list params don't have a name/cli_name attached to them
214
# so they will have bad error messages. We're going to
215
# attach the parent parameter to this error message to provide
216
# a more helpful error message.
217
raise ParamError(cli_name, value[0])
218
219
220
def unpack_scalar_cli_arg(argument_model, value, cli_name=''):
221
# Note the cli_name is used strictly for error reporting. It's
222
# not required to use unpack_scalar_cli_arg
223
if (
224
argument_model.type_name == 'integer'
225
or argument_model.type_name == 'long'
226
):
227
return int(value)
228
elif (
229
argument_model.type_name == 'float'
230
or argument_model.type_name == 'double'
231
):
232
# TODO: losing precision on double types
233
return float(value)
234
elif (
235
argument_model.type_name == 'blob'
236
and argument_model.serialization.get('streaming')
237
):
238
file_path = os.path.expandvars(value)
239
file_path = os.path.expanduser(file_path)
240
if not os.path.isfile(file_path):
241
msg = 'Blob values must be a path to a file.'
242
raise ParamError(cli_name, msg)
243
return open(file_path, 'rb')
244
elif argument_model.type_name == 'boolean':
245
if isinstance(value, str) and value.lower() == 'false':
246
return False
247
return bool(value)
248
else:
249
return value
250
251
252
def _supports_shorthand_syntax(model):
253
# Shorthand syntax is only supported if:
254
#
255
# 1. The argument is not a document type nor is a wrapper around a document
256
# type (e.g. is a list of document types or a map of document types). These
257
# should all be expressed as JSON input.
258
#
259
# 2. The argument is sufficiently complex, that is, it's base type is
260
# a complex type *and* if it's a list, then it can't be a list of
261
# scalar types.
262
if is_document_type_container(model):
263
return False
264
return _is_complex_shape(model)
265
266
267
def _is_complex_shape(model):
268
if model.type_name not in ['structure', 'list', 'map']:
269
return False
270
elif model.type_name == 'list':
271
if model.member.type_name not in ['structure', 'list', 'map']:
272
return False
273
return True
274
275
276
class ParamShorthand:
277
def _uses_old_list_case(self, service_id, operation_name, argument_name):
278
"""
279
Determines whether a given operation for a service needs to use the
280
deprecated shorthand parsing case for lists of structures that only have
281
a single member.
282
"""
283
cases = {
284
'firehose': {'put-record-batch': ['records']},
285
'workspaces': {
286
'reboot-workspaces': ['reboot-workspace-requests'],
287
'rebuild-workspaces': ['rebuild-workspace-requests'],
288
'terminate-workspaces': ['terminate-workspace-requests'],
289
},
290
'elastic-load-balancing': {
291
'remove-tags': ['tags'],
292
'describe-instance-health': ['instances'],
293
'deregister-instances-from-load-balancer': ['instances'],
294
'register-instances-with-load-balancer': ['instances'],
295
},
296
}
297
cases = cases.get(service_id, {}).get(operation_name, [])
298
return argument_name in cases
299
300
301
class ParamShorthandParser(ParamShorthand):
302
def __init__(self):
303
self._parser = shorthand.ShorthandParser()
304
self._visitor = shorthand.BackCompatVisitor()
305
306
def __call__(self, cli_argument, value, event_name, **kwargs):
307
"""Attempt to parse shorthand syntax for values.
308
309
This is intended to be hooked up as an event handler (hence the
310
**kwargs). Given ``param`` object and its string ``value``,
311
figure out if we can parse it. If we can parse it, we return
312
the parsed value (typically some sort of python dict).
313
314
:type cli_argument: :class:`awscli.arguments.BaseCLIArgument`
315
:param cli_argument: The CLI argument object.
316
317
:type param: :class:`botocore.parameters.Parameter`
318
:param param: The parameter object (includes various metadata
319
about the parameter).
320
321
:type value: str
322
:param value: The value for the parameter type on the command
323
line, e.g ``--foo this_value``, value would be ``"this_value"``.
324
325
:returns: If we can parse the value we return the parsed value.
326
If it looks like JSON, we return None (which tells the event
327
emitter to use the default ``unpack_cli_arg`` provided that
328
no other event handlers can parsed the value). If we
329
run into an error parsing the value, a ``ParamError`` will
330
be raised.
331
332
"""
333
334
if not self._should_parse_as_shorthand(cli_argument, value):
335
return
336
else:
337
service_id, operation_name = find_service_and_method_in_event_name(
338
event_name
339
)
340
return self._parse_as_shorthand(
341
cli_argument, value, service_id, operation_name
342
)
343
344
def _parse_as_shorthand(
345
self, cli_argument, value, service_id, operation_name
346
):
347
try:
348
LOG.debug("Parsing param %s as shorthand", cli_argument.cli_name)
349
handled_value = self._handle_special_cases(
350
cli_argument, value, service_id, operation_name
351
)
352
if handled_value is not None:
353
return handled_value
354
if isinstance(value, list):
355
# Because of how we're using argparse, list shapes
356
# are configured with nargs='+' which means the ``value``
357
# is given to us "conveniently" as a list. When
358
# this happens we need to parse each list element
359
# individually.
360
parsed = [self._parser.parse(v) for v in value]
361
self._visitor.visit(parsed, cli_argument.argument_model)
362
else:
363
# Otherwise value is just a string.
364
parsed = self._parser.parse(value)
365
self._visitor.visit(parsed, cli_argument.argument_model)
366
except shorthand.ShorthandParseError as e:
367
raise ParamError(cli_argument.cli_name, str(e))
368
except (ParamError, ParamUnknownKeyError) as e:
369
# The shorthand parse methods don't have the cli_name,
370
# so any ParamError won't have this value. To accommodate
371
# this, ParamErrors are caught and reraised with the cli_name
372
# injected.
373
raise ParamError(cli_argument.cli_name, str(e))
374
return parsed
375
376
def _handle_special_cases(
377
self, cli_argument, value, service_id, operation_name
378
):
379
# We need to handle a few special cases that the previous
380
# parser handled in order to stay backwards compatible.
381
model = cli_argument.argument_model
382
if (
383
model.type_name == 'list'
384
and model.member.type_name == 'structure'
385
and len(model.member.members) == 1
386
and self._uses_old_list_case(
387
service_id, operation_name, cli_argument.name
388
)
389
):
390
# First special case is handling a list of structures
391
# of a single element such as:
392
#
393
# --instance-ids id-1 id-2 id-3
394
#
395
# gets parsed as:
396
#
397
# [{"InstanceId": "id-1"}, {"InstanceId": "id-2"},
398
# {"InstanceId": "id-3"}]
399
key_name = list(model.member.members.keys())[0]
400
new_values = [{key_name: v} for v in value]
401
return new_values
402
elif (
403
model.type_name == 'structure'
404
and len(model.members) == 1
405
and 'Value' in model.members
406
and model.members['Value'].type_name == 'string'
407
and '=' not in value
408
):
409
# Second special case is where a structure of a single
410
# value whose member name is "Value" can be specified
411
# as:
412
# --instance-terminate-behavior shutdown
413
#
414
# gets parsed as:
415
# {"Value": "shutdown"}
416
return {'Value': value}
417
418
def _should_parse_as_shorthand(self, cli_argument, value):
419
# We first need to make sure this is a parameter that qualifies
420
# for simplification. The first short-circuit case is if it looks
421
# like json we immediately return.
422
if value and isinstance(value, list):
423
check_val = value[0]
424
else:
425
check_val = value
426
if isinstance(check_val, str) and check_val.strip().startswith(
427
('[', '{')
428
):
429
LOG.debug(
430
"Param %s looks like JSON, not considered for "
431
"param shorthand.",
432
cli_argument.py_name,
433
)
434
return False
435
model = cli_argument.argument_model
436
return _supports_shorthand_syntax(model)
437
438
439
class ParamShorthandDocGen(ParamShorthand):
440
"""Documentation generator for param shorthand syntax."""
441
442
_DONT_DOC = object()
443
_MAX_STACK = 3
444
445
def supports_shorthand(self, argument_model):
446
"""Checks if a CLI argument supports shorthand syntax."""
447
if argument_model is not None:
448
return _supports_shorthand_syntax(argument_model)
449
return False
450
451
def generate_shorthand_example(
452
self, cli_argument, service_id, operation_name
453
):
454
"""Generate documentation for a CLI argument.
455
456
:type cli_argument: awscli.arguments.BaseCLIArgument
457
:param cli_argument: The CLI argument which to generate
458
documentation for.
459
460
:return: Returns either a string or ``None``. If a string
461
is returned, it is the generated shorthand example.
462
If a value of ``None`` is returned then this indicates
463
that no shorthand syntax is available for the provided
464
``argument_model``.
465
466
"""
467
docstring = self._handle_special_cases(
468
cli_argument, service_id, operation_name
469
)
470
if docstring is self._DONT_DOC:
471
return None
472
elif docstring:
473
return docstring
474
475
# Otherwise we fall back to the normal docgen for shorthand
476
# syntax.
477
stack = []
478
try:
479
if cli_argument.argument_model.type_name == 'list':
480
argument_model = cli_argument.argument_model.member
481
return self._shorthand_docs(argument_model, stack) + ' ...'
482
else:
483
return self._shorthand_docs(cli_argument.argument_model, stack)
484
except TooComplexError:
485
return ''
486
487
def _handle_special_cases(self, cli_argument, service_id, operation_name):
488
model = cli_argument.argument_model
489
if (
490
model.type_name == 'list'
491
and model.member.type_name == 'structure'
492
and len(model.member.members) == 1
493
and self._uses_old_list_case(
494
service_id, operation_name, cli_argument.name
495
)
496
):
497
member_name = list(model.member.members)[0]
498
# Handle special case where the min/max is exactly one.
499
metadata = model.metadata
500
cli_name = cli_argument.cli_name
501
if metadata.get('min') == 1 and metadata.get('max') == 1:
502
return f'{cli_name} {member_name}1'
503
return f'{cli_name} {member_name}1 {member_name}2 {member_name}3'
504
elif (
505
model.type_name == 'structure'
506
and len(model.members) == 1
507
and 'Value' in model.members
508
and model.members['Value'].type_name == 'string'
509
):
510
return self._DONT_DOC
511
return ''
512
513
def _shorthand_docs(self, argument_model, stack):
514
if len(stack) > self._MAX_STACK:
515
raise TooComplexError()
516
if argument_model.type_name == 'structure':
517
return self._structure_docs(argument_model, stack)
518
elif argument_model.type_name == 'list':
519
return self._list_docs(argument_model, stack)
520
elif argument_model.type_name == 'map':
521
return self._map_docs(argument_model, stack)
522
else:
523
return argument_model.type_name
524
525
def _list_docs(self, argument_model, stack):
526
list_member = argument_model.member
527
stack.append(list_member.name)
528
try:
529
element_docs = self._shorthand_docs(argument_model.member, stack)
530
finally:
531
stack.pop()
532
if list_member.type_name in COMPLEX_TYPES or len(stack) > 1:
533
return '[%s,%s]' % (element_docs, element_docs)
534
else:
535
return '%s,%s' % (element_docs, element_docs)
536
537
def _map_docs(self, argument_model, stack):
538
k = argument_model.key
539
stack.append(argument_model.value.name)
540
try:
541
value_docs = self._shorthand_docs(argument_model.value, stack)
542
finally:
543
stack.pop()
544
start = 'KeyName1=%s,KeyName2=%s' % (value_docs, value_docs)
545
if k.enum and not stack:
546
start += '\n\nWhere valid key names are:\n'
547
for enum in k.enum:
548
start += ' %s\n' % enum
549
elif stack:
550
start = '{%s}' % start
551
return start
552
553
def _structure_docs(self, argument_model, stack):
554
parts = []
555
for name, member_shape in argument_model.members.items():
556
if is_document_type_container(member_shape):
557
continue
558
parts.append(self._member_docs(name, member_shape, stack))
559
inner_part = ','.join(parts)
560
if not stack:
561
return inner_part
562
return '{%s}' % inner_part
563
564
def _member_docs(self, name, shape, stack):
565
if stack.count(shape.name) > 0:
566
return '( ... recursive ... )'
567
stack.append(shape.name)
568
try:
569
value_doc = self._shorthand_docs(shape, stack)
570
finally:
571
stack.pop()
572
return '%s=%s' % (name, value_doc)
573
574