Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/emr/emrutils.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import json
15
import logging
16
import os
17
18
19
from awscli.clidriver import CLIOperationCaller
20
from awscli.customizations.emr import constants
21
from awscli.customizations.emr import exceptions
22
from botocore.exceptions import WaiterError, NoCredentialsError
23
from botocore import xform_name
24
25
LOG = logging.getLogger(__name__)
26
27
28
def parse_tags(raw_tags_list):
29
tags_dict_list = []
30
if raw_tags_list:
31
for tag in raw_tags_list:
32
if tag.find('=') == -1:
33
key, value = tag, ''
34
else:
35
key, value = tag.split('=', 1)
36
tags_dict_list.append({'Key': key, 'Value': value})
37
38
return tags_dict_list
39
40
41
def parse_key_value_string(key_value_string):
42
# raw_key_value_string is a list of key value pairs separated by comma.
43
# Examples: "k1=v1,k2='v 2',k3,k4"
44
key_value_list = []
45
if key_value_string is not None:
46
raw_key_value_list = key_value_string.split(',')
47
for kv in raw_key_value_list:
48
if kv.find('=') == -1:
49
key, value = kv, ''
50
else:
51
key, value = kv.split('=', 1)
52
key_value_list.append({'Key': key, 'Value': value})
53
return key_value_list
54
else:
55
return None
56
57
58
def apply_boolean_options(
59
true_option, true_option_name, false_option, false_option_name):
60
if true_option and false_option:
61
error_message = \
62
'aws: error: cannot use both ' + true_option_name + \
63
' and ' + false_option_name + ' options together.'
64
raise ValueError(error_message)
65
elif true_option:
66
return True
67
else:
68
return False
69
70
71
# Deprecate. Rename to apply_dict
72
def apply(params, key, value):
73
if value:
74
params[key] = value
75
76
return params
77
78
79
def apply_dict(params, key, value):
80
if value:
81
params[key] = value
82
83
return params
84
85
86
def apply_params(src_params, src_key, dest_params, dest_key):
87
if src_key in src_params.keys() and src_params[src_key]:
88
dest_params[dest_key] = src_params[src_key]
89
90
return dest_params
91
92
93
def build_step(
94
jar, name='Step',
95
action_on_failure=constants.DEFAULT_FAILURE_ACTION,
96
args=None,
97
main_class=None,
98
properties=None):
99
check_required_field(
100
structure='HadoopJarStep', name='Jar', value=jar)
101
102
step = {}
103
apply_dict(step, 'Name', name)
104
apply_dict(step, 'ActionOnFailure', action_on_failure)
105
jar_config = {}
106
jar_config['Jar'] = jar
107
apply_dict(jar_config, 'Args', args)
108
apply_dict(jar_config, 'MainClass', main_class)
109
apply_dict(jar_config, 'Properties', properties)
110
step['HadoopJarStep'] = jar_config
111
112
return step
113
114
115
def build_bootstrap_action(
116
path,
117
name='Bootstrap Action',
118
args=None):
119
if path is None:
120
raise exceptions.MissingParametersError(
121
object_name='ScriptBootstrapActionConfig', missing='Path')
122
ba_config = {}
123
apply_dict(ba_config, 'Name', name)
124
script_config = {}
125
apply_dict(script_config, 'Args', args)
126
script_config['Path'] = path
127
apply_dict(ba_config, 'ScriptBootstrapAction', script_config)
128
129
return ba_config
130
131
132
def build_s3_link(relative_path='', region='us-east-1'):
133
if region is None:
134
region = 'us-east-1'
135
return 's3://{0}.elasticmapreduce{1}'.format(region, relative_path)
136
137
138
def get_script_runner(region='us-east-1'):
139
if region is None:
140
region = 'us-east-1'
141
return build_s3_link(
142
relative_path=constants.SCRIPT_RUNNER_PATH, region=region)
143
144
145
def check_required_field(structure, name, value):
146
if not value:
147
raise exceptions.MissingParametersError(
148
object_name=structure, missing=name)
149
150
151
def check_empty_string_list(name, value):
152
if not value or (len(value) == 1 and value[0].strip() == ""):
153
raise exceptions.EmptyListError(param=name)
154
155
156
def call(session, operation_name, parameters, region_name=None,
157
endpoint_url=None, verify=None):
158
# We could get an error from get_endpoint() about not having
159
# a region configured. Before this happens we want to check
160
# for credentials so we can give a good error message.
161
if session.get_credentials() is None:
162
raise NoCredentialsError()
163
164
client = session.create_client(
165
'emr', region_name=region_name, endpoint_url=endpoint_url,
166
verify=verify)
167
LOG.debug('Calling ' + str(operation_name))
168
return getattr(client, operation_name)(**parameters)
169
170
171
def get_example_file(command):
172
return open('awscli/examples/emr/' + command + '.rst')
173
174
175
def dict_to_string(dict, indent=2):
176
return json.dumps(dict, indent=indent)
177
178
179
def get_client(session, parsed_globals):
180
return session.create_client(
181
'emr',
182
region_name=get_region(session, parsed_globals),
183
endpoint_url=parsed_globals.endpoint_url,
184
verify=parsed_globals.verify_ssl)
185
186
187
def get_cluster_state(session, parsed_globals, cluster_id):
188
client = get_client(session, parsed_globals)
189
data = client.describe_cluster(ClusterId=cluster_id)
190
return data['Cluster']['Status']['State']
191
192
193
def find_master_dns(session, parsed_globals, cluster_id):
194
"""
195
Returns the master_instance's 'PublicDnsName'.
196
"""
197
client = get_client(session, parsed_globals)
198
data = client.describe_cluster(ClusterId=cluster_id)
199
return data['Cluster']['MasterPublicDnsName']
200
201
202
def which(program):
203
for path in os.environ["PATH"].split(os.pathsep):
204
path = path.strip('"')
205
exe_file = os.path.join(path, program)
206
if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
207
return exe_file
208
209
return None
210
211
212
def call_and_display_response(session, operation_name, parameters,
213
parsed_globals):
214
cli_operation_caller = CLIOperationCaller(session)
215
cli_operation_caller.invoke(
216
'emr', operation_name,
217
parameters, parsed_globals)
218
219
220
def display_response(session, operation_name, result, parsed_globals):
221
cli_operation_caller = CLIOperationCaller(session)
222
# Calling a private method. Should be changed after the functionality
223
# is moved outside CliOperationCaller.
224
cli_operation_caller._display_response(
225
operation_name, result, parsed_globals)
226
227
228
def get_region(session, parsed_globals):
229
region = parsed_globals.region
230
if region is None:
231
region = session.get_config_variable('region')
232
return region
233
234
235
def join(values, separator=',', lastSeparator='and'):
236
"""
237
Helper method to print a list of values
238
[1,2,3] -> '1, 2 and 3'
239
"""
240
values = [str(x) for x in values]
241
if len(values) < 1:
242
return ""
243
elif len(values) == 1:
244
return values[0]
245
else:
246
separator = '%s ' % separator
247
return ' '.join([separator.join(values[:-1]),
248
lastSeparator, values[-1]])
249
250
251
def split_to_key_value(string):
252
if string.find('=') == -1:
253
return string, ''
254
else:
255
return string.split('=', 1)
256
257
258
def get_cluster(cluster_id, session, region,
259
endpoint_url, verify_ssl):
260
describe_cluster_params = {'ClusterId': cluster_id}
261
describe_cluster_response = call(
262
session, 'describe_cluster', describe_cluster_params,
263
region, endpoint_url,
264
verify_ssl)
265
266
if describe_cluster_response is not None:
267
return describe_cluster_response.get('Cluster')
268
269
270
def get_release_label(cluster_id, session, region,
271
endpoint_url, verify_ssl):
272
cluster = get_cluster(cluster_id, session, region,
273
endpoint_url, verify_ssl)
274
if cluster is not None:
275
return cluster.get('ReleaseLabel')
276
277