Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/datapipeline/translator.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import json
14
from awscli.clidriver import CLIOperationCaller
15
16
17
class PipelineDefinitionError(Exception):
18
def __init__(self, msg, definition):
19
full_msg = (
20
"Error in pipeline definition: %s\n" % msg)
21
super(PipelineDefinitionError, self).__init__(full_msg)
22
self.msg = msg
23
self.definition = definition
24
25
26
# Method to convert the dictionary input to a string
27
# This is required for escaping
28
def dict_to_string(dictionary, indent=2):
29
return json.dumps(dictionary, indent=indent)
30
31
32
# Method to parse the arguments to get the region value
33
def get_region(session, parsed_globals):
34
region = parsed_globals.region
35
if region is None:
36
region = session.get_config_variable('region')
37
return region
38
39
40
# Method to display the response for a particular CLI operation
41
def display_response(session, operation_name, result, parsed_globals):
42
cli_operation_caller = CLIOperationCaller(session)
43
# Calling a private method. Should be changed after the functionality
44
# is moved outside CliOperationCaller.
45
cli_operation_caller._display_response(
46
operation_name, result, parsed_globals)
47
48
49
def api_to_definition(definition):
50
# When we're translating from api_response -> definition
51
# we have to be careful *not* to mutate the existing
52
# response as other code might need to the original
53
# api_response.
54
if 'pipelineObjects' in definition:
55
definition['objects'] = _api_to_objects_definition(
56
definition.pop('pipelineObjects'))
57
if 'parameterObjects' in definition:
58
definition['parameters'] = _api_to_parameters_definition(
59
definition.pop('parameterObjects'))
60
if 'parameterValues' in definition:
61
definition['values'] = _api_to_values_definition(
62
definition.pop('parameterValues'))
63
return definition
64
65
66
def definition_to_api_objects(definition):
67
if 'objects' not in definition:
68
raise PipelineDefinitionError('Missing "objects" key', definition)
69
api_elements = []
70
# To convert to the structure expected by the service,
71
# we convert the existing structure to a list of dictionaries.
72
# Each dictionary has a 'fields', 'id', and 'name' key.
73
for element in definition['objects']:
74
try:
75
element_id = element.pop('id')
76
except KeyError:
77
raise PipelineDefinitionError('Missing "id" key of element: %s' %
78
json.dumps(element), definition)
79
api_object = {'id': element_id}
80
# If a name is provided, then we use that for the name,
81
# otherwise the id is used for the name.
82
name = element.pop('name', element_id)
83
api_object['name'] = name
84
# Now we need the field list. Each element in the field list is a dict
85
# with a 'key', 'stringValue'|'refValue'
86
fields = []
87
for key, value in sorted(element.items()):
88
fields.extend(_parse_each_field(key, value))
89
api_object['fields'] = fields
90
api_elements.append(api_object)
91
return api_elements
92
93
94
def definition_to_api_parameters(definition):
95
if 'parameters' not in definition:
96
return None
97
parameter_objects = []
98
for element in definition['parameters']:
99
try:
100
parameter_id = element.pop('id')
101
except KeyError:
102
raise PipelineDefinitionError('Missing "id" key of parameter: %s' %
103
json.dumps(element), definition)
104
parameter_object = {'id': parameter_id}
105
# Now we need the attribute list. Each element in the attribute list
106
# is a dict with a 'key', 'stringValue'
107
attributes = []
108
for key, value in sorted(element.items()):
109
attributes.extend(_parse_each_field(key, value))
110
parameter_object['attributes'] = attributes
111
parameter_objects.append(parameter_object)
112
return parameter_objects
113
114
115
def definition_to_parameter_values(definition):
116
if 'values' not in definition:
117
return None
118
parameter_values = []
119
for key in definition['values']:
120
parameter_values.extend(
121
_convert_single_parameter_value(key, definition['values'][key]))
122
123
return parameter_values
124
125
126
def _parse_each_field(key, value):
127
values = []
128
if isinstance(value, list):
129
for item in value:
130
values.append(_convert_single_field(key, item))
131
else:
132
values.append(_convert_single_field(key, value))
133
return values
134
135
136
def _convert_single_field(key, value):
137
field = {'key': key}
138
if isinstance(value, dict) and list(value.keys()) == ['ref']:
139
field['refValue'] = value['ref']
140
else:
141
field['stringValue'] = value
142
return field
143
144
145
def _convert_single_parameter_value(key, values):
146
parameter_values = []
147
if isinstance(values, list):
148
for each_value in values:
149
parameter_value = {'id': key, 'stringValue': each_value}
150
parameter_values.append(parameter_value)
151
else:
152
parameter_value = {'id': key, 'stringValue': values}
153
parameter_values.append(parameter_value)
154
return parameter_values
155
156
157
def _api_to_objects_definition(api_response):
158
pipeline_objects = []
159
for element in api_response:
160
current = {
161
'id': element['id'],
162
'name': element['name']
163
}
164
for field in element['fields']:
165
key = field['key']
166
if 'stringValue' in field:
167
value = field['stringValue']
168
else:
169
value = {'ref': field['refValue']}
170
_add_value(key, value, current)
171
pipeline_objects.append(current)
172
return pipeline_objects
173
174
175
def _api_to_parameters_definition(api_response):
176
parameter_objects = []
177
for element in api_response:
178
current = {
179
'id': element['id']
180
}
181
for attribute in element['attributes']:
182
_add_value(attribute['key'], attribute['stringValue'], current)
183
parameter_objects.append(current)
184
return parameter_objects
185
186
187
def _api_to_values_definition(api_response):
188
pipeline_values = {}
189
for element in api_response:
190
_add_value(element['id'], element['stringValue'], pipeline_values)
191
return pipeline_values
192
193
194
def _add_value(key, value, current_map):
195
if key not in current_map:
196
current_map[key] = value
197
elif isinstance(current_map[key], list):
198
# Dupe keys result in values aggregating
199
# into a list.
200
current_map[key].append(value)
201
else:
202
converted_list = [current_map[key], value]
203
current_map[key] = converted_list
204
205