Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/arguments.py
1566 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import os
14
import re
15
16
from awscli.arguments import CustomArgument
17
from awscli.compat import compat_open
18
import jmespath
19
20
21
def resolve_given_outfile_path(path):
22
"""Asserts that a path is writable and returns the expanded path"""
23
if path is None:
24
return
25
outfile = os.path.expanduser(os.path.expandvars(path))
26
if not os.access(os.path.dirname(os.path.abspath(outfile)), os.W_OK):
27
raise ValueError('Unable to write to file: %s' % outfile)
28
return outfile
29
30
31
def is_parsed_result_successful(parsed_result):
32
"""Returns True if a parsed result is successful"""
33
return parsed_result['ResponseMetadata']['HTTPStatusCode'] < 300
34
35
36
class OverrideRequiredArgsArgument(CustomArgument):
37
"""An argument that if specified makes all other arguments not required
38
39
By not required, it refers to not having an error thrown when the
40
parser does not find an argument that is required on the command line.
41
To obtain this argument's property of ignoring required arguments,
42
subclass from this class and fill out the ``ARG_DATA`` parameter as
43
described below. Note this class is really only useful for subclassing.
44
"""
45
46
# ``ARG_DATA`` follows the same format as a member of ``ARG_TABLE`` in
47
# ``BasicCommand`` class as specified in
48
# ``awscli/customizations/commands.py``.
49
#
50
# For example, an ``ARG_DATA`` variable would be filled out as:
51
#
52
# ARG_DATA =
53
# {'name': 'my-argument',
54
# 'help_text': 'This is argument ensures the argument is specified'
55
# 'no other arguments are required'}
56
ARG_DATA = {'name': 'no-required-args'}
57
58
def __init__(self, session):
59
self._session = session
60
self._register_argument_action()
61
super(OverrideRequiredArgsArgument, self).__init__(**self.ARG_DATA)
62
63
def _register_argument_action(self):
64
self._session.register('before-building-argument-table-parser',
65
self.override_required_args)
66
67
def override_required_args(self, argument_table, args, **kwargs):
68
name_in_cmdline = '--' + self.name
69
# Set all ``Argument`` objects in ``argument_table`` to not required
70
# if this argument's name is present in the command line.
71
if name_in_cmdline in args:
72
for arg_name in argument_table.keys():
73
argument_table[arg_name].required = False
74
75
76
class StatefulArgument(CustomArgument):
77
"""An argument that maintains a stateful value"""
78
79
def __init__(self, *args, **kwargs):
80
super(StatefulArgument, self).__init__(*args, **kwargs)
81
self._value = None
82
83
def add_to_params(self, parameters, value):
84
super(StatefulArgument, self).add_to_params(parameters, value)
85
self._value = value
86
87
@property
88
def value(self):
89
return self._value
90
91
92
class QueryOutFileArgument(StatefulArgument):
93
"""An argument that write a JMESPath query result to a file"""
94
95
def __init__(self, session, name, query, after_call_event, perm,
96
*args, **kwargs):
97
self._session = session
98
self._query = query
99
self._after_call_event = after_call_event
100
self._perm = perm
101
# Generate default help_text if text was not provided.
102
if 'help_text' not in kwargs:
103
kwargs['help_text'] = ('Saves the command output contents of %s '
104
'to the given filename' % self.query)
105
super(QueryOutFileArgument, self).__init__(name, *args, **kwargs)
106
107
@property
108
def query(self):
109
return self._query
110
111
@property
112
def perm(self):
113
return self._perm
114
115
def add_to_params(self, parameters, value):
116
value = resolve_given_outfile_path(value)
117
super(QueryOutFileArgument, self).add_to_params(parameters, value)
118
if self.value is not None:
119
# Only register the event to save the argument if it is set
120
self._session.register(self._after_call_event, self.save_query)
121
122
def save_query(self, parsed, **kwargs):
123
"""Saves the result of a JMESPath expression to a file.
124
125
This method only saves the query data if the response code of
126
the parsed result is < 300.
127
"""
128
if is_parsed_result_successful(parsed):
129
contents = jmespath.search(self.query, parsed)
130
with compat_open(
131
self.value, 'w', access_permissions=self.perm) as fp:
132
# Don't write 'None' to a file -- write ''.
133
if contents is None:
134
fp.write('')
135
else:
136
fp.write(contents)
137
# Even though the file is opened using the requested mode
138
# (e.g. 0o600), the mode is only applied if a new file is
139
# created. This means if the file already exists, its
140
# permissions will not be changed. So, the os.chmod call is
141
# retained here to preserve behavior of this argument always
142
# clobbering a preexisting file's permissions to the desired
143
# mode.
144
os.chmod(self.value, self.perm)
145
146
147
class NestedBlobArgumentHoister(object):
148
"""Can be registered to update a single argument / model value combination
149
mapping that to a new top-level argument.
150
Currently limited to blob argument types as these are the only ones
151
requiring the hoist.
152
"""
153
154
def __init__(self, source_arg, source_arg_blob_member,
155
new_arg, new_arg_doc_string, doc_string_addendum):
156
self._source_arg = source_arg
157
self._source_arg_blob_member = source_arg_blob_member
158
self._new_arg = new_arg
159
self._new_arg_doc_string = new_arg_doc_string
160
self._doc_string_addendum = doc_string_addendum
161
162
def __call__(self, session, argument_table, **kwargs):
163
if not self._valid_target(argument_table):
164
return
165
self._update_arg(
166
argument_table, self._source_arg, self._new_arg)
167
168
def _valid_target(self, argument_table):
169
# Find the source argument and check that it has a member of
170
# the same name and type.
171
if self._source_arg in argument_table:
172
arg = argument_table[self._source_arg]
173
input_model = arg.argument_model
174
member = input_model.members.get(self._source_arg_blob_member)
175
if (member is not None and
176
member.type_name == 'blob'):
177
return True
178
return False
179
180
def _update_arg(self, argument_table, source_arg, new_arg):
181
argument_table[new_arg] = _NestedBlobArgumentParamOverwrite(
182
new_arg, source_arg, self._source_arg_blob_member,
183
help_text=self._new_arg_doc_string,
184
cli_type_name='blob')
185
argument_table[source_arg].required = False
186
argument_table[source_arg].documentation += self._doc_string_addendum
187
188
189
class _NestedBlobArgumentParamOverwrite(CustomArgument):
190
def __init__(self, new_arg, source_arg, source_arg_blob_member, **kwargs):
191
super(_NestedBlobArgumentParamOverwrite, self).__init__(
192
new_arg, **kwargs)
193
self._param_to_overwrite = _reverse_xform_name(source_arg)
194
self._source_arg_blob_member = source_arg_blob_member
195
196
def add_to_params(self, parameters, value):
197
if value is None:
198
return
199
param_value = {self._source_arg_blob_member: value}
200
if parameters.get(self._param_to_overwrite):
201
parameters[self._param_to_overwrite].update(param_value)
202
else:
203
parameters[self._param_to_overwrite] = param_value
204
205
206
def _upper(match):
207
return match.group(1).lstrip('-').upper()
208
209
210
def _reverse_xform_name(name):
211
return re.sub(r'(^.|-.)', _upper, name)
212
213