Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/emr/steputils.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
from awscli.customizations.emr import emrutils
15
from awscli.customizations.emr import constants
16
from awscli.customizations.emr import exceptions
17
18
19
def build_step_config_list(parsed_step_list, region, release_label):
20
step_config_list = []
21
for step in parsed_step_list:
22
step_type = step.get('Type')
23
if step_type is None:
24
step_type = constants.CUSTOM_JAR
25
26
step_type = step_type.lower()
27
step_config = {}
28
if step_type == constants.CUSTOM_JAR:
29
step_config = build_custom_jar_step(parsed_step=step)
30
elif step_type == constants.STREAMING:
31
step_config = build_streaming_step(
32
parsed_step=step, release_label=release_label)
33
elif step_type == constants.HIVE:
34
step_config = build_hive_step(
35
parsed_step=step, region=region,
36
release_label=release_label)
37
elif step_type == constants.PIG:
38
step_config = build_pig_step(
39
parsed_step=step, region=region,
40
release_label=release_label)
41
elif step_type == constants.IMPALA:
42
step_config = build_impala_step(
43
parsed_step=step, region=region,
44
release_label=release_label)
45
elif step_type == constants.SPARK:
46
step_config = build_spark_step(
47
parsed_step=step, region=region,
48
release_label=release_label)
49
else:
50
raise exceptions.UnknownStepTypeError(step_type=step_type)
51
52
step_config_list.append(step_config)
53
54
return step_config_list
55
56
57
def build_custom_jar_step(parsed_step):
58
name = _apply_default_value(
59
arg=parsed_step.get('Name'),
60
value=constants.DEFAULT_CUSTOM_JAR_STEP_NAME)
61
action_on_failure = _apply_default_value(
62
arg=parsed_step.get('ActionOnFailure'),
63
value=constants.DEFAULT_FAILURE_ACTION)
64
emrutils.check_required_field(
65
structure=constants.CUSTOM_JAR_STEP_CONFIG,
66
name='Jar',
67
value=parsed_step.get('Jar'))
68
return emrutils.build_step(
69
jar=parsed_step.get('Jar'),
70
args=parsed_step.get('Args'),
71
name=name,
72
action_on_failure=action_on_failure,
73
main_class=parsed_step.get('MainClass'),
74
properties=emrutils.parse_key_value_string(
75
parsed_step.get('Properties')))
76
77
78
def build_streaming_step(parsed_step, release_label):
79
name = _apply_default_value(
80
arg=parsed_step.get('Name'),
81
value=constants.DEFAULT_STREAMING_STEP_NAME)
82
action_on_failure = _apply_default_value(
83
arg=parsed_step.get('ActionOnFailure'),
84
value=constants.DEFAULT_FAILURE_ACTION)
85
86
args = parsed_step.get('Args')
87
emrutils.check_required_field(
88
structure=constants.STREAMING_STEP_CONFIG,
89
name='Args',
90
value=args)
91
emrutils.check_empty_string_list(name='Args', value=args)
92
args_list = []
93
94
if release_label:
95
jar = constants.COMMAND_RUNNER
96
args_list.append(constants.HADOOP_STREAMING_COMMAND)
97
else:
98
jar = constants.HADOOP_STREAMING_PATH
99
100
args_list += args
101
102
return emrutils.build_step(
103
jar=jar,
104
args=args_list,
105
name=name,
106
action_on_failure=action_on_failure)
107
108
109
def build_hive_step(parsed_step, release_label, region=None):
110
args = parsed_step.get('Args')
111
emrutils.check_required_field(
112
structure=constants.HIVE_STEP_CONFIG, name='Args', value=args)
113
emrutils.check_empty_string_list(name='Args', value=args)
114
name = _apply_default_value(
115
arg=parsed_step.get('Name'),
116
value=constants.DEFAULT_HIVE_STEP_NAME)
117
action_on_failure = \
118
_apply_default_value(
119
arg=parsed_step.get('ActionOnFailure'),
120
value=constants.DEFAULT_FAILURE_ACTION)
121
122
return emrutils.build_step(
123
jar=_get_runner_jar(release_label, region),
124
args=_build_hive_args(args, release_label, region),
125
name=name,
126
action_on_failure=action_on_failure)
127
128
129
def _build_hive_args(args, release_label, region):
130
args_list = []
131
if release_label:
132
args_list.append(constants.HIVE_SCRIPT_COMMAND)
133
else:
134
args_list.append(emrutils.build_s3_link(
135
relative_path=constants.HIVE_SCRIPT_PATH, region=region))
136
137
args_list.append(constants.RUN_HIVE_SCRIPT)
138
139
if not release_label:
140
args_list.append(constants.HIVE_VERSIONS)
141
args_list.append(constants.LATEST)
142
143
args_list.append(constants.ARGS)
144
args_list += args
145
146
return args_list
147
148
149
def build_pig_step(parsed_step, release_label, region=None):
150
args = parsed_step.get('Args')
151
emrutils.check_required_field(
152
structure=constants.PIG_STEP_CONFIG, name='Args', value=args)
153
emrutils.check_empty_string_list(name='Args', value=args)
154
name = _apply_default_value(
155
arg=parsed_step.get('Name'),
156
value=constants.DEFAULT_PIG_STEP_NAME)
157
action_on_failure = _apply_default_value(
158
arg=parsed_step.get('ActionOnFailure'),
159
value=constants.DEFAULT_FAILURE_ACTION)
160
161
return emrutils.build_step(
162
jar=_get_runner_jar(release_label, region),
163
args=_build_pig_args(args, release_label, region),
164
name=name,
165
action_on_failure=action_on_failure)
166
167
168
def _build_pig_args(args, release_label, region):
169
args_list = []
170
if release_label:
171
args_list.append(constants.PIG_SCRIPT_COMMAND)
172
else:
173
args_list.append(emrutils.build_s3_link(
174
relative_path=constants.PIG_SCRIPT_PATH, region=region))
175
176
args_list.append(constants.RUN_PIG_SCRIPT)
177
178
if not release_label:
179
args_list.append(constants.PIG_VERSIONS)
180
args_list.append(constants.LATEST)
181
182
args_list.append(constants.ARGS)
183
args_list += args
184
185
return args_list
186
187
188
def build_impala_step(parsed_step, release_label, region=None):
189
if release_label:
190
raise exceptions.UnknownStepTypeError(step_type=constants.IMPALA)
191
name = _apply_default_value(
192
arg=parsed_step.get('Name'),
193
value=constants.DEFAULT_IMPALA_STEP_NAME)
194
action_on_failure = _apply_default_value(
195
arg=parsed_step.get('ActionOnFailure'),
196
value=constants.DEFAULT_FAILURE_ACTION)
197
args_list = [
198
emrutils.build_s3_link(
199
relative_path=constants.IMPALA_INSTALL_PATH, region=region),
200
constants.RUN_IMPALA_SCRIPT]
201
args = parsed_step.get('Args')
202
emrutils.check_required_field(
203
structure=constants.IMPALA_STEP_CONFIG, name='Args', value=args)
204
args_list += args
205
206
return emrutils.build_step(
207
jar=emrutils.get_script_runner(region),
208
args=args_list,
209
name=name,
210
action_on_failure=action_on_failure)
211
212
213
def build_spark_step(parsed_step, release_label, region=None):
214
name = _apply_default_value(
215
arg=parsed_step.get('Name'),
216
value=constants.DEFAULT_SPARK_STEP_NAME)
217
action_on_failure = _apply_default_value(
218
arg=parsed_step.get('ActionOnFailure'),
219
value=constants.DEFAULT_FAILURE_ACTION)
220
args = parsed_step.get('Args')
221
emrutils.check_required_field(
222
structure=constants.SPARK_STEP_CONFIG, name='Args', value=args)
223
224
return emrutils.build_step(
225
jar=_get_runner_jar(release_label, region),
226
args=_build_spark_args(args, release_label, region),
227
name=name,
228
action_on_failure=action_on_failure)
229
230
231
def _build_spark_args(args, release_label, region):
232
args_list = []
233
if release_label:
234
args_list.append(constants.SPARK_SUBMIT_COMMAND)
235
else:
236
args_list.append(constants.SPARK_SUBMIT_PATH)
237
args_list += args
238
239
return args_list
240
241
242
def _apply_default_value(arg, value):
243
if arg is None:
244
arg = value
245
246
return arg
247
248
249
def _get_runner_jar(release_label, region):
250
return constants.COMMAND_RUNNER if release_label \
251
else emrutils.get_script_runner(region)
252
253