Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/emr/steputils.py
2624 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
from awscli.customizations.emr import constants, emrutils, exceptions
15
16
17
def build_step_config_list(parsed_step_list, region, release_label):
18
step_config_list = []
19
for step in parsed_step_list:
20
step_type = step.get('Type')
21
if step_type is None:
22
step_type = constants.CUSTOM_JAR
23
24
step_type = step_type.lower()
25
step_config = {}
26
if step_type == constants.CUSTOM_JAR:
27
step_config = build_custom_jar_step(parsed_step=step)
28
elif step_type == constants.STREAMING:
29
step_config = build_streaming_step(
30
parsed_step=step, release_label=release_label
31
)
32
elif step_type == constants.HIVE:
33
step_config = build_hive_step(
34
parsed_step=step, region=region, release_label=release_label
35
)
36
elif step_type == constants.PIG:
37
step_config = build_pig_step(
38
parsed_step=step, region=region, release_label=release_label
39
)
40
elif step_type == constants.IMPALA:
41
step_config = build_impala_step(
42
parsed_step=step, region=region, release_label=release_label
43
)
44
elif step_type == constants.SPARK:
45
step_config = build_spark_step(
46
parsed_step=step, region=region, release_label=release_label
47
)
48
else:
49
raise exceptions.UnknownStepTypeError(step_type=step_type)
50
51
step_config_list.append(step_config)
52
53
return step_config_list
54
55
56
def build_custom_jar_step(parsed_step):
57
name = _apply_default_value(
58
arg=parsed_step.get('Name'),
59
value=constants.DEFAULT_CUSTOM_JAR_STEP_NAME,
60
)
61
action_on_failure = _apply_default_value(
62
arg=parsed_step.get('ActionOnFailure'),
63
value=constants.DEFAULT_FAILURE_ACTION,
64
)
65
emrutils.check_required_field(
66
structure=constants.CUSTOM_JAR_STEP_CONFIG,
67
name='Jar',
68
value=parsed_step.get('Jar'),
69
)
70
return emrutils.build_step(
71
jar=parsed_step.get('Jar'),
72
args=parsed_step.get('Args'),
73
name=name,
74
action_on_failure=action_on_failure,
75
main_class=parsed_step.get('MainClass'),
76
properties=emrutils.parse_key_value_string(
77
parsed_step.get('Properties')
78
),
79
log_uri=parsed_step.get('LogUri'),
80
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
81
)
82
83
84
def build_streaming_step(parsed_step, release_label):
85
name = _apply_default_value(
86
arg=parsed_step.get('Name'),
87
value=constants.DEFAULT_STREAMING_STEP_NAME,
88
)
89
action_on_failure = _apply_default_value(
90
arg=parsed_step.get('ActionOnFailure'),
91
value=constants.DEFAULT_FAILURE_ACTION,
92
)
93
94
args = parsed_step.get('Args')
95
emrutils.check_required_field(
96
structure=constants.STREAMING_STEP_CONFIG, name='Args', value=args
97
)
98
emrutils.check_empty_string_list(name='Args', value=args)
99
args_list = []
100
101
if release_label:
102
jar = constants.COMMAND_RUNNER
103
args_list.append(constants.HADOOP_STREAMING_COMMAND)
104
else:
105
jar = constants.HADOOP_STREAMING_PATH
106
107
args_list += args
108
109
return emrutils.build_step(
110
jar=jar,
111
args=args_list,
112
name=name,
113
action_on_failure=action_on_failure,
114
log_uri=parsed_step.get('LogUri'),
115
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
116
)
117
118
119
def build_hive_step(parsed_step, release_label, region=None):
120
args = parsed_step.get('Args')
121
emrutils.check_required_field(
122
structure=constants.HIVE_STEP_CONFIG, name='Args', value=args
123
)
124
emrutils.check_empty_string_list(name='Args', value=args)
125
name = _apply_default_value(
126
arg=parsed_step.get('Name'), value=constants.DEFAULT_HIVE_STEP_NAME
127
)
128
action_on_failure = _apply_default_value(
129
arg=parsed_step.get('ActionOnFailure'),
130
value=constants.DEFAULT_FAILURE_ACTION,
131
)
132
133
return emrutils.build_step(
134
jar=_get_runner_jar(release_label, region),
135
args=_build_hive_args(args, release_label, region),
136
name=name,
137
action_on_failure=action_on_failure,
138
log_uri=parsed_step.get('LogUri'),
139
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
140
)
141
142
143
def _build_hive_args(args, release_label, region):
144
args_list = []
145
if release_label:
146
args_list.append(constants.HIVE_SCRIPT_COMMAND)
147
else:
148
args_list.append(
149
emrutils.build_s3_link(
150
relative_path=constants.HIVE_SCRIPT_PATH, region=region
151
)
152
)
153
154
args_list.append(constants.RUN_HIVE_SCRIPT)
155
156
if not release_label:
157
args_list.append(constants.HIVE_VERSIONS)
158
args_list.append(constants.LATEST)
159
160
args_list.append(constants.ARGS)
161
args_list += args
162
163
return args_list
164
165
166
def build_pig_step(parsed_step, release_label, region=None):
167
args = parsed_step.get('Args')
168
emrutils.check_required_field(
169
structure=constants.PIG_STEP_CONFIG, name='Args', value=args
170
)
171
emrutils.check_empty_string_list(name='Args', value=args)
172
name = _apply_default_value(
173
arg=parsed_step.get('Name'), value=constants.DEFAULT_PIG_STEP_NAME
174
)
175
action_on_failure = _apply_default_value(
176
arg=parsed_step.get('ActionOnFailure'),
177
value=constants.DEFAULT_FAILURE_ACTION,
178
)
179
180
return emrutils.build_step(
181
jar=_get_runner_jar(release_label, region),
182
args=_build_pig_args(args, release_label, region),
183
name=name,
184
action_on_failure=action_on_failure,
185
log_uri=parsed_step.get('LogUri'),
186
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
187
)
188
189
190
def _build_pig_args(args, release_label, region):
191
args_list = []
192
if release_label:
193
args_list.append(constants.PIG_SCRIPT_COMMAND)
194
else:
195
args_list.append(
196
emrutils.build_s3_link(
197
relative_path=constants.PIG_SCRIPT_PATH, region=region
198
)
199
)
200
201
args_list.append(constants.RUN_PIG_SCRIPT)
202
203
if not release_label:
204
args_list.append(constants.PIG_VERSIONS)
205
args_list.append(constants.LATEST)
206
207
args_list.append(constants.ARGS)
208
args_list += args
209
210
return args_list
211
212
213
def build_impala_step(parsed_step, release_label, region=None):
214
if release_label:
215
raise exceptions.UnknownStepTypeError(step_type=constants.IMPALA)
216
name = _apply_default_value(
217
arg=parsed_step.get('Name'), value=constants.DEFAULT_IMPALA_STEP_NAME
218
)
219
action_on_failure = _apply_default_value(
220
arg=parsed_step.get('ActionOnFailure'),
221
value=constants.DEFAULT_FAILURE_ACTION,
222
)
223
args_list = [
224
emrutils.build_s3_link(
225
relative_path=constants.IMPALA_INSTALL_PATH, region=region
226
),
227
constants.RUN_IMPALA_SCRIPT,
228
]
229
args = parsed_step.get('Args')
230
emrutils.check_required_field(
231
structure=constants.IMPALA_STEP_CONFIG, name='Args', value=args
232
)
233
args_list += args
234
235
return emrutils.build_step(
236
jar=emrutils.get_script_runner(region),
237
args=args_list,
238
name=name,
239
action_on_failure=action_on_failure,
240
log_uri=parsed_step.get('LogUri'),
241
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
242
)
243
244
245
def build_spark_step(parsed_step, release_label, region=None):
246
name = _apply_default_value(
247
arg=parsed_step.get('Name'), value=constants.DEFAULT_SPARK_STEP_NAME
248
)
249
action_on_failure = _apply_default_value(
250
arg=parsed_step.get('ActionOnFailure'),
251
value=constants.DEFAULT_FAILURE_ACTION,
252
)
253
args = parsed_step.get('Args')
254
emrutils.check_required_field(
255
structure=constants.SPARK_STEP_CONFIG, name='Args', value=args
256
)
257
258
return emrutils.build_step(
259
jar=_get_runner_jar(release_label, region),
260
args=_build_spark_args(args, release_label, region),
261
name=name,
262
action_on_failure=action_on_failure,
263
log_uri=parsed_step.get('LogUri'),
264
encryption_key_arn=parsed_step.get('EncryptionKeyArn'),
265
)
266
267
268
def _build_spark_args(args, release_label, region):
269
args_list = []
270
if release_label:
271
args_list.append(constants.SPARK_SUBMIT_COMMAND)
272
else:
273
args_list.append(constants.SPARK_SUBMIT_PATH)
274
args_list += args
275
276
return args_list
277
278
279
def _apply_default_value(arg, value):
280
if arg is None:
281
arg = value
282
283
return arg
284
285
286
def _get_runner_jar(release_label, region):
287
return (
288
constants.COMMAND_RUNNER
289
if release_label
290
else emrutils.get_script_runner(region)
291
)
292
293