Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/emr/test_add_steps.py
2624 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import copy
14
import os
15
16
from awscli.testutils import mock
17
from tests.unit.customizations.emr import (
18
EMRBaseAWSCommandParamsTest as BaseAWSCommandParamsTest,
19
)
20
21
22
class TestAddSteps(BaseAWSCommandParamsTest):
23
prefix = 'emr add-steps --cluster-id j-ABC --steps '
24
25
STREAMING_ARGS = (
26
'Args=-files,'
27
+ 's3://elasticmapreduce/samples/wordcount/wordSplitter.py,'
28
+ '-mapper,wordSplitter.py,'
29
+ '-reducer,aggregate,'
30
+ '-input,s3://elasticmapreduce/samples/wordcount/input,'
31
+ '-output,s3://mybucket/wordcount/output/2014-04-18/12-15-24'
32
)
33
STREAMING_HADOOP_SCRIPT_RUNNER_STEP = {
34
'Jar': '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
35
'Args': [
36
'-files',
37
's3://elasticmapreduce/samples/wordcount/wordSplitter.py',
38
'-mapper',
39
'wordSplitter.py',
40
'-reducer',
41
'aggregate',
42
'-input',
43
's3://elasticmapreduce/samples/wordcount/input',
44
'-output',
45
's3://mybucket/wordcount/output/2014-04-18/12-15-24',
46
],
47
}
48
STREAMING_HADOOP_COMMAND_RUNNER_STEP = {
49
'Jar': 'command-runner.jar',
50
'Args': [
51
'hadoop-streaming',
52
'-files',
53
's3://elasticmapreduce/samples/wordcount/wordSplitter.py',
54
'-mapper',
55
'wordSplitter.py',
56
'-reducer',
57
'aggregate',
58
'-input',
59
's3://elasticmapreduce/samples/wordcount/input',
60
'-output',
61
's3://mybucket/wordcount/output/2014-04-18/12-15-24',
62
],
63
}
64
65
HIVE_BASIC_ARGS = (
66
'Args=-f,'
67
+ 's3://elasticmapreduce/samples/hive-ads/libs/model-build.q'
68
)
69
70
HIVE_DEFAULT_SCRIPT_RUNNER_STEP = {
71
'Jar': (
72
's3://us-east-1.elasticmapreduce/'
73
'libs/script-runner/script-runner.jar'
74
),
75
'Args': [
76
's3://us-east-1.elasticmapreduce/libs/hive/hive-script',
77
'--run-hive-script',
78
'--hive-versions',
79
'latest',
80
'--args',
81
'-f',
82
's3://elasticmapreduce/samples/hive-ads/libs/model-build.q',
83
],
84
}
85
86
HIVE_DEFAULT_COMMAND_RUNNER_STEP = {
87
'Jar': ('command-runner.jar'),
88
'Args': [
89
'hive-script',
90
'--run-hive-script',
91
'--args',
92
'-f',
93
's3://elasticmapreduce/samples/hive-ads/libs/model-build.q',
94
],
95
}
96
97
PIG_BASIC_ARGS = (
98
'Args=-f,' + 's3://elasticmapreduce/samples/pig-apache/do-reports2.pig'
99
)
100
101
PIG_DEFAULT_SCRIPT_RUNNER_STEP = {
102
'Jar': (
103
's3://us-east-1.elasticmapreduce/libs/'
104
'script-runner/script-runner.jar'
105
),
106
'Args': [
107
's3://us-east-1.elasticmapreduce/libs/pig/pig-script',
108
'--run-pig-script',
109
'--pig-versions',
110
'latest',
111
'--args',
112
'-f',
113
's3://elasticmapreduce/samples/' 'pig-apache/do-reports2.pig',
114
],
115
}
116
117
PIG_DEFAULT_COMMAND_RUNNER_STEP = {
118
'Jar': ('command-runner.jar'),
119
'Args': [
120
'pig-script',
121
'--run-pig-script',
122
'--args',
123
'-f',
124
's3://elasticmapreduce/samples/' 'pig-apache/do-reports2.pig',
125
],
126
}
127
128
IMPALA_BASIC_ARGS = (
129
'Args='
130
+ '--impala-script,s3://myimpala/input,'
131
+ '--console-output-path,s3://myimpala/output'
132
)
133
134
IMPALA_BASIC_SCRIPT_RUNNER_STEP = {
135
'Jar': (
136
's3://us-east-1.elasticmapreduce/libs/'
137
'script-runner/script-runner.jar'
138
),
139
'Args': [
140
's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
141
'--run-impala-script',
142
'--impala-script',
143
's3://myimpala/input',
144
'--console-output-path',
145
's3://myimpala/output',
146
],
147
}
148
149
SPARK_SUBMIT_BASIC_ARGS = (
150
'Args='
151
+ '[--deploy-mode,'
152
+ 'cluster,'
153
+ '--conf,'
154
+ 'k1=v1,'
155
+ 's3://mybucket/myfolder/app.jar,'
156
+ 'k2=v2]'
157
)
158
159
SPARK_SUBMIT_SCRIPT_RUNNER_STEP = {
160
'Jar': (
161
's3://us-east-1.elasticmapreduce/libs/'
162
'script-runner/script-runner.jar'
163
),
164
'Args': [
165
'/home/hadoop/spark/bin/spark-submit',
166
'--deploy-mode',
167
'cluster',
168
'--conf',
169
'k1=v1',
170
's3://mybucket/myfolder/app.jar',
171
'k2=v2',
172
],
173
}
174
175
SPARK_SUBMIT_COMMAND_RUNNER_STEP = {
176
'Jar': 'command-runner.jar',
177
'Args': [
178
'spark-submit',
179
'--deploy-mode',
180
'cluster',
181
'--conf',
182
'k1=v1',
183
's3://mybucket/myfolder/app.jar',
184
'k2=v2',
185
],
186
}
187
188
def test_unknown_step_type(self):
189
cmd = self.prefix + 'Type=unknown'
190
expected_error_msg = (
191
'\naws: error: ' + 'The step type unknown is not supported.\n'
192
)
193
self.assert_error_for_ami_and_release_based_clusters(
194
cmd=cmd,
195
expected_error_msg=expected_error_msg,
196
expected_result_release=expected_error_msg,
197
)
198
199
def test_default_step_type_name_action_on_failure(self):
200
cmd = self.prefix + 'Jar=s3://mybucket/mytest.jar'
201
expected_result = {
202
'JobFlowId': 'j-ABC',
203
'Steps': [
204
{
205
'Name': 'Custom JAR',
206
'ActionOnFailure': 'CONTINUE',
207
'HadoopJarStep': {'Jar': 's3://mybucket/mytest.jar'},
208
}
209
],
210
}
211
212
self.assert_params_for_ami_and_release_based_clusters(
213
cmd=cmd,
214
expected_result=expected_result,
215
expected_result_release=expected_result,
216
)
217
218
def test_custom_jar_step_missing_jar(self):
219
cmd = self.prefix + 'Name=CustomJarMissingJar'
220
expected_error_msg = (
221
'\naws: error: The following '
222
+ 'required parameters are missing for CustomJARStepConfig: Jar.\n'
223
)
224
self.assert_error_for_ami_and_release_based_clusters(
225
cmd=cmd,
226
expected_error_msg=expected_error_msg,
227
expected_result_release=expected_error_msg,
228
)
229
230
def test_custom_jar_step_with_all_fields(self):
231
cmd = self.prefix + (
232
'Name=Custom,Type=Custom_jar,'
233
'Jar=s3://mybucket/mytest.jar,'
234
'Args=arg1,arg2,MainClass=mymainclass,'
235
'ActionOnFailure=TERMINATE_CLUSTER,'
236
'LogUri="TestLogUri",'
237
'EncryptionKeyArn="TestEncryptionKeyArn",'
238
'Properties=k1=v1\\,k2=v2\\,k3'
239
)
240
expected_result = {
241
'JobFlowId': 'j-ABC',
242
'Steps': [
243
{
244
'Name': 'Custom',
245
'ActionOnFailure': 'TERMINATE_CLUSTER',
246
'HadoopJarStep': {
247
'Jar': 's3://mybucket/mytest.jar',
248
'Args': ['arg1', 'arg2'],
249
'MainClass': 'mymainclass',
250
'Properties': [
251
{'Key': 'k1', 'Value': 'v1'},
252
{'Key': 'k2', 'Value': 'v2'},
253
{'Key': 'k3', 'Value': ''},
254
],
255
},
256
'StepMonitoringConfiguration': {
257
'S3MonitoringConfiguration': {
258
'LogUri': "TestLogUri",
259
'EncryptionKeyArn': "TestEncryptionKeyArn",
260
}
261
},
262
}
263
],
264
}
265
266
self.assert_params_for_ami_and_release_based_clusters(
267
cmd=cmd,
268
expected_result=expected_result,
269
expected_result_release=expected_result,
270
)
271
272
def test_custom_jar_step_with_step_monitoring_configuration_log_uri_only(
273
self,
274
):
275
cmd = self.prefix + (
276
'Name=Custom,Type=Custom_jar,'
277
'Jar=s3://mybucket/mytest.jar,'
278
'Args=arg1,arg2,MainClass=mymainclass,'
279
'ActionOnFailure=TERMINATE_CLUSTER,'
280
'LogUri="TestLogUri"'
281
)
282
expected_result = {
283
'JobFlowId': 'j-ABC',
284
'Steps': [
285
{
286
'Name': 'Custom',
287
'ActionOnFailure': 'TERMINATE_CLUSTER',
288
'HadoopJarStep': {
289
'Jar': 's3://mybucket/mytest.jar',
290
'Args': ['arg1', 'arg2'],
291
'MainClass': 'mymainclass',
292
},
293
'StepMonitoringConfiguration': {
294
'S3MonitoringConfiguration': {'LogUri': "TestLogUri"}
295
},
296
}
297
],
298
}
299
300
self.assert_params_for_ami_and_release_based_clusters(
301
cmd=cmd,
302
expected_result=expected_result,
303
expected_result_release=expected_result,
304
)
305
306
def test_custom_jar_step_with_step_monitoring_configuration_encryption_key_arn_only(
307
self,
308
):
309
cmd = self.prefix + (
310
'Name=Custom,Type=Custom_jar,'
311
'Jar=s3://mybucket/mytest.jar,'
312
'Args=arg1,arg2,MainClass=mymainclass,'
313
'ActionOnFailure=TERMINATE_CLUSTER,'
314
'EncryptionKeyArn="TestEncryptionKeyArn"'
315
)
316
expected_result = {
317
'JobFlowId': 'j-ABC',
318
'Steps': [
319
{
320
'Name': 'Custom',
321
'ActionOnFailure': 'TERMINATE_CLUSTER',
322
'HadoopJarStep': {
323
'Jar': 's3://mybucket/mytest.jar',
324
'Args': ['arg1', 'arg2'],
325
'MainClass': 'mymainclass',
326
},
327
'StepMonitoringConfiguration': {
328
'S3MonitoringConfiguration': {
329
'EncryptionKeyArn': "TestEncryptionKeyArn"
330
}
331
},
332
}
333
],
334
}
335
336
self.assert_params_for_ami_and_release_based_clusters(
337
cmd=cmd,
338
expected_result=expected_result,
339
expected_result_release=expected_result,
340
)
341
342
def test_custom_jar_step_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
343
self,
344
):
345
cmd = self.prefix + (
346
'Name=Custom,Type=Custom_jar,'
347
'Jar=s3://mybucket/mytest.jar,'
348
'Args=arg1,arg2,MainClass=mymainclass,'
349
'ActionOnFailure=TERMINATE_CLUSTER'
350
)
351
expected_result = {
352
'JobFlowId': 'j-ABC',
353
'Steps': [
354
{
355
'Name': 'Custom',
356
'ActionOnFailure': 'TERMINATE_CLUSTER',
357
'HadoopJarStep': {
358
'Jar': 's3://mybucket/mytest.jar',
359
'Args': ['arg1', 'arg2'],
360
'MainClass': 'mymainclass',
361
},
362
}
363
],
364
}
365
366
self.assert_params_for_ami_and_release_based_clusters(
367
cmd=cmd,
368
expected_result=expected_result,
369
expected_result_release=expected_result,
370
)
371
372
def test_streaming_step_with_default_fields(self):
373
cmd = self.prefix + 'Type=Streaming,' + self.STREAMING_ARGS
374
expected_result = {
375
'JobFlowId': 'j-ABC',
376
'Steps': [
377
{
378
'Name': 'Streaming program',
379
'ActionOnFailure': 'CONTINUE',
380
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
381
}
382
],
383
}
384
expected_result_release = copy.deepcopy(expected_result)
385
expected_result_release['Steps'][0]['HadoopJarStep'] = (
386
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
387
)
388
389
self.assert_params_for_ami_and_release_based_clusters(
390
cmd=cmd,
391
expected_result=expected_result,
392
expected_result_release=expected_result_release,
393
)
394
395
def test_step_with_execution_role_arn(self):
396
cmd = self.prefix + 'Type=Streaming,' + self.STREAMING_ARGS
397
cmd += ' --execution-role-arn arn:aws:iam::123456789010:role/sample '
398
expected_result = {
399
'ExecutionRoleArn': 'arn:aws:iam::123456789010:role/sample',
400
'JobFlowId': 'j-ABC',
401
'Steps': [
402
{
403
'Name': 'Streaming program',
404
'ActionOnFailure': 'CONTINUE',
405
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
406
}
407
],
408
}
409
expected_result_release = copy.deepcopy(expected_result)
410
expected_result_release['Steps'][0]['HadoopJarStep'] = (
411
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
412
)
413
414
self.assert_params_for_ami_and_release_based_clusters(
415
cmd=cmd,
416
expected_result=expected_result,
417
expected_result_release=expected_result_release,
418
)
419
420
def test_streaming_step_missing_args(self):
421
cmd = self.prefix + 'Type=Streaming'
422
expected_error_msg = (
423
'\naws: error: The following '
424
+ 'required parameters are missing for StreamingStepConfig: Args.\n'
425
)
426
self.assert_error_for_ami_and_release_based_clusters(
427
cmd=cmd,
428
expected_error_msg=expected_error_msg,
429
expected_result_release=expected_error_msg,
430
)
431
432
def test_streaming_jar_with_all_fields(self):
433
test_step_config = (
434
'Type=Streaming,'
435
+ 'Name=StreamingStepAllFields,'
436
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
437
+ self.STREAMING_ARGS
438
+ ','
439
+ 'LogUri="TestLogUri",'
440
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
441
)
442
cmd = self.prefix + test_step_config
443
expected_result = {
444
'JobFlowId': 'j-ABC',
445
'Steps': [
446
{
447
'Name': 'StreamingStepAllFields',
448
'ActionOnFailure': 'CANCEL_AND_WAIT',
449
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
450
'StepMonitoringConfiguration': {
451
'S3MonitoringConfiguration': {
452
'LogUri': "TestLogUri",
453
'EncryptionKeyArn': "TestEncryptionKeyArn",
454
}
455
},
456
}
457
],
458
}
459
460
expected_result_release = copy.deepcopy(expected_result)
461
expected_result_release['Steps'][0]['HadoopJarStep'] = (
462
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
463
)
464
465
self.assert_params_for_ami_and_release_based_clusters(
466
cmd=cmd,
467
expected_result=expected_result,
468
expected_result_release=expected_result_release,
469
)
470
471
def test_streaming_jar_with_step_monitoring_configuration_log_uri_only(
472
self,
473
):
474
test_step_config = (
475
'Type=Streaming,'
476
+ 'Name=StreamingStepAllFields,'
477
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
478
+ self.STREAMING_ARGS
479
+ ','
480
+ 'LogUri="TestLogUri"'
481
)
482
cmd = self.prefix + test_step_config
483
expected_result = {
484
'JobFlowId': 'j-ABC',
485
'Steps': [
486
{
487
'Name': 'StreamingStepAllFields',
488
'ActionOnFailure': 'CANCEL_AND_WAIT',
489
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
490
'StepMonitoringConfiguration': {
491
'S3MonitoringConfiguration': {'LogUri': "TestLogUri"}
492
},
493
}
494
],
495
}
496
497
expected_result_release = copy.deepcopy(expected_result)
498
expected_result_release['Steps'][0]['HadoopJarStep'] = (
499
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
500
)
501
502
self.assert_params_for_ami_and_release_based_clusters(
503
cmd=cmd,
504
expected_result=expected_result,
505
expected_result_release=expected_result_release,
506
)
507
508
def test_streaming_jar_with_step_monitoring_configuration_encryption_key_arn_only(
509
self,
510
):
511
test_step_config = (
512
'Type=Streaming,'
513
+ 'Name=StreamingStepAllFields,'
514
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
515
+ self.STREAMING_ARGS
516
+ ','
517
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
518
)
519
cmd = self.prefix + test_step_config
520
expected_result = {
521
'JobFlowId': 'j-ABC',
522
'Steps': [
523
{
524
'Name': 'StreamingStepAllFields',
525
'ActionOnFailure': 'CANCEL_AND_WAIT',
526
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
527
'StepMonitoringConfiguration': {
528
'S3MonitoringConfiguration': {
529
'EncryptionKeyArn': "TestEncryptionKeyArn"
530
}
531
},
532
}
533
],
534
}
535
536
expected_result_release = copy.deepcopy(expected_result)
537
expected_result_release['Steps'][0]['HadoopJarStep'] = (
538
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
539
)
540
541
self.assert_params_for_ami_and_release_based_clusters(
542
cmd=cmd,
543
expected_result=expected_result,
544
expected_result_release=expected_result_release,
545
)
546
547
def test_streaming_jar_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
548
self,
549
):
550
test_step_config = (
551
'Type=Streaming,'
552
+ 'Name=StreamingStepAllFields,'
553
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
554
+ self.STREAMING_ARGS
555
)
556
cmd = self.prefix + test_step_config
557
expected_result = {
558
'JobFlowId': 'j-ABC',
559
'Steps': [
560
{
561
'Name': 'StreamingStepAllFields',
562
'ActionOnFailure': 'CANCEL_AND_WAIT',
563
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
564
}
565
],
566
}
567
568
expected_result_release = copy.deepcopy(expected_result)
569
expected_result_release['Steps'][0]['HadoopJarStep'] = (
570
self.STREAMING_HADOOP_COMMAND_RUNNER_STEP
571
)
572
573
self.assert_params_for_ami_and_release_based_clusters(
574
cmd=cmd,
575
expected_result=expected_result,
576
expected_result_release=expected_result_release,
577
)
578
579
def test_hive_step_with_default_fields(self):
580
cmd = self.prefix + 'Type=Hive,' + self.HIVE_BASIC_ARGS
581
expected_result = {
582
'JobFlowId': 'j-ABC',
583
'Steps': [
584
{
585
'Name': 'Hive program',
586
'ActionOnFailure': 'CONTINUE',
587
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
588
}
589
],
590
}
591
expected_result_release = copy.deepcopy(expected_result)
592
expected_result_release['Steps'][0]['HadoopJarStep'] = (
593
self.HIVE_DEFAULT_COMMAND_RUNNER_STEP
594
)
595
596
self.assert_params_for_ami_and_release_based_clusters(
597
cmd=cmd,
598
expected_result=expected_result,
599
expected_result_release=expected_result_release,
600
)
601
602
def test_hive_step_missing_args(self):
603
cmd = self.prefix + 'Type=Hive'
604
expected_error_msg = (
605
'\naws: error: The following '
606
+ 'required parameters are missing for HiveStepConfig: Args.\n'
607
)
608
609
self.assert_error_for_ami_and_release_based_clusters(
610
cmd=cmd,
611
expected_error_msg=expected_error_msg,
612
expected_result_release=expected_error_msg,
613
)
614
615
def test_hive_step_with_all_fields(self):
616
test_step_config = (
617
'Type=Hive,'
618
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
619
+ 'Name=HiveWithAllFields,'
620
+ self.HIVE_BASIC_ARGS
621
+ ','
622
+ 'LogUri="TestLogUri",'
623
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
624
)
625
cmd = self.prefix + test_step_config
626
expected_result = {
627
'JobFlowId': 'j-ABC',
628
'Steps': [
629
{
630
'Name': 'HiveWithAllFields',
631
'ActionOnFailure': 'CANCEL_AND_WAIT',
632
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
633
'StepMonitoringConfiguration': {
634
'S3MonitoringConfiguration': {
635
'LogUri': "TestLogUri",
636
'EncryptionKeyArn': "TestEncryptionKeyArn",
637
}
638
},
639
}
640
],
641
}
642
expected_result_release = copy.deepcopy(expected_result)
643
expected_result_release['Steps'][0]['HadoopJarStep'] = (
644
self.HIVE_DEFAULT_COMMAND_RUNNER_STEP
645
)
646
647
self.assert_params_for_ami_and_release_based_clusters(
648
cmd=cmd,
649
expected_result=expected_result,
650
expected_result_release=expected_result_release,
651
)
652
653
def test_hive_step_with_step_monitoring_configuration_log_uri_only(self):
654
test_step_config = (
655
'Type=Hive,'
656
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
657
+ 'Name=HiveWithAllFields,'
658
+ self.HIVE_BASIC_ARGS
659
+ ','
660
+ 'LogUri="TestLogUri"'
661
)
662
cmd = self.prefix + test_step_config
663
expected_result = {
664
'JobFlowId': 'j-ABC',
665
'Steps': [
666
{
667
'Name': 'HiveWithAllFields',
668
'ActionOnFailure': 'CANCEL_AND_WAIT',
669
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
670
'StepMonitoringConfiguration': {
671
'S3MonitoringConfiguration': {
672
'LogUri': "TestLogUri",
673
}
674
},
675
}
676
],
677
}
678
expected_result_release = copy.deepcopy(expected_result)
679
expected_result_release['Steps'][0]['HadoopJarStep'] = (
680
self.HIVE_DEFAULT_COMMAND_RUNNER_STEP
681
)
682
683
self.assert_params_for_ami_and_release_based_clusters(
684
cmd=cmd,
685
expected_result=expected_result,
686
expected_result_release=expected_result_release,
687
)
688
689
def test_hive_step_with_step_monitoring_configuration_encryption_key_arn_only(
690
self,
691
):
692
test_step_config = (
693
'Type=Hive,'
694
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
695
+ 'Name=HiveWithAllFields,'
696
+ self.HIVE_BASIC_ARGS
697
+ ','
698
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
699
)
700
cmd = self.prefix + test_step_config
701
expected_result = {
702
'JobFlowId': 'j-ABC',
703
'Steps': [
704
{
705
'Name': 'HiveWithAllFields',
706
'ActionOnFailure': 'CANCEL_AND_WAIT',
707
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
708
'StepMonitoringConfiguration': {
709
'S3MonitoringConfiguration': {
710
'EncryptionKeyArn': "TestEncryptionKeyArn"
711
}
712
},
713
}
714
],
715
}
716
expected_result_release = copy.deepcopy(expected_result)
717
expected_result_release['Steps'][0]['HadoopJarStep'] = (
718
self.HIVE_DEFAULT_COMMAND_RUNNER_STEP
719
)
720
721
self.assert_params_for_ami_and_release_based_clusters(
722
cmd=cmd,
723
expected_result=expected_result,
724
expected_result_release=expected_result_release,
725
)
726
727
def test_hive_step_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
728
self,
729
):
730
test_step_config = (
731
'Type=Hive,'
732
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
733
+ 'Name=HiveWithAllFields,'
734
+ self.HIVE_BASIC_ARGS
735
)
736
cmd = self.prefix + test_step_config
737
expected_result = {
738
'JobFlowId': 'j-ABC',
739
'Steps': [
740
{
741
'Name': 'HiveWithAllFields',
742
'ActionOnFailure': 'CANCEL_AND_WAIT',
743
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
744
}
745
],
746
}
747
expected_result_release = copy.deepcopy(expected_result)
748
expected_result_release['Steps'][0]['HadoopJarStep'] = (
749
self.HIVE_DEFAULT_COMMAND_RUNNER_STEP
750
)
751
752
self.assert_params_for_ami_and_release_based_clusters(
753
cmd=cmd,
754
expected_result=expected_result,
755
expected_result_release=expected_result_release,
756
)
757
758
def test_pig_step_with_default_fields(self):
759
cmd = self.prefix + 'Type=Pig,' + self.PIG_BASIC_ARGS
760
expected_result = {
761
'JobFlowId': 'j-ABC',
762
'Steps': [
763
{
764
'Name': 'Pig program',
765
'ActionOnFailure': 'CONTINUE',
766
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
767
}
768
],
769
}
770
expected_result_release = copy.deepcopy(expected_result)
771
expected_result_release['Steps'][0]['HadoopJarStep'] = (
772
self.PIG_DEFAULT_COMMAND_RUNNER_STEP
773
)
774
775
self.assert_params_for_ami_and_release_based_clusters(
776
cmd=cmd,
777
expected_result=expected_result,
778
expected_result_release=expected_result_release,
779
)
780
781
def test_pig_missing_args(self):
782
cmd = self.prefix + 'Type=Pig'
783
expected_error_msg = (
784
'\naws: error: The following '
785
+ 'required parameters are missing for PigStepConfig: Args.\n'
786
)
787
self.assert_error_for_ami_and_release_based_clusters(
788
cmd=cmd,
789
expected_error_msg=expected_error_msg,
790
expected_result_release=expected_error_msg,
791
)
792
793
def test_pig_step_with_all_fields(self):
794
test_step_config = (
795
'Name=PigWithAllFields,'
796
+ 'Type=Pig,'
797
+ self.PIG_BASIC_ARGS
798
+ ','
799
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
800
+ 'LogUri="TestLogUri",'
801
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
802
)
803
cmd = self.prefix + test_step_config
804
expected_result = {
805
'JobFlowId': 'j-ABC',
806
'Steps': [
807
{
808
'Name': 'PigWithAllFields',
809
'ActionOnFailure': 'CANCEL_AND_WAIT',
810
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
811
'StepMonitoringConfiguration': {
812
'S3MonitoringConfiguration': {
813
'LogUri': "TestLogUri",
814
'EncryptionKeyArn': "TestEncryptionKeyArn",
815
}
816
},
817
}
818
],
819
}
820
expected_result_release = copy.deepcopy(expected_result)
821
expected_result_release['Steps'][0]['HadoopJarStep'] = (
822
self.PIG_DEFAULT_COMMAND_RUNNER_STEP
823
)
824
825
self.assert_params_for_ami_and_release_based_clusters(
826
cmd=cmd,
827
expected_result=expected_result,
828
expected_result_release=expected_result_release,
829
)
830
831
def test_pig_step_with_step_monitoring_configuration_log_uri_only(self):
832
test_step_config = (
833
'Name=PigWithAllFields,'
834
+ 'Type=Pig,'
835
+ self.PIG_BASIC_ARGS
836
+ ','
837
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
838
+ 'LogUri="TestLogUri"'
839
)
840
cmd = self.prefix + test_step_config
841
expected_result = {
842
'JobFlowId': 'j-ABC',
843
'Steps': [
844
{
845
'Name': 'PigWithAllFields',
846
'ActionOnFailure': 'CANCEL_AND_WAIT',
847
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
848
'StepMonitoringConfiguration': {
849
'S3MonitoringConfiguration': {'LogUri': "TestLogUri"}
850
},
851
}
852
],
853
}
854
expected_result_release = copy.deepcopy(expected_result)
855
expected_result_release['Steps'][0]['HadoopJarStep'] = (
856
self.PIG_DEFAULT_COMMAND_RUNNER_STEP
857
)
858
859
self.assert_params_for_ami_and_release_based_clusters(
860
cmd=cmd,
861
expected_result=expected_result,
862
expected_result_release=expected_result_release,
863
)
864
865
def test_pig_step_with_step_monitoring_configuration_encryption_key_arn_only(
866
self,
867
):
868
test_step_config = (
869
'Name=PigWithAllFields,'
870
+ 'Type=Pig,'
871
+ self.PIG_BASIC_ARGS
872
+ ','
873
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
874
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
875
)
876
cmd = self.prefix + test_step_config
877
expected_result = {
878
'JobFlowId': 'j-ABC',
879
'Steps': [
880
{
881
'Name': 'PigWithAllFields',
882
'ActionOnFailure': 'CANCEL_AND_WAIT',
883
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
884
'StepMonitoringConfiguration': {
885
'S3MonitoringConfiguration': {
886
'EncryptionKeyArn': "TestEncryptionKeyArn"
887
}
888
},
889
}
890
],
891
}
892
expected_result_release = copy.deepcopy(expected_result)
893
expected_result_release['Steps'][0]['HadoopJarStep'] = (
894
self.PIG_DEFAULT_COMMAND_RUNNER_STEP
895
)
896
897
self.assert_params_for_ami_and_release_based_clusters(
898
cmd=cmd,
899
expected_result=expected_result,
900
expected_result_release=expected_result_release,
901
)
902
903
def test_pig_step_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
904
self,
905
):
906
test_step_config = (
907
'Name=PigWithAllFields,'
908
+ 'Type=Pig,'
909
+ self.PIG_BASIC_ARGS
910
+ ','
911
+ 'ActionOnFailure=CANCEL_AND_WAIT'
912
)
913
cmd = self.prefix + test_step_config
914
expected_result = {
915
'JobFlowId': 'j-ABC',
916
'Steps': [
917
{
918
'Name': 'PigWithAllFields',
919
'ActionOnFailure': 'CANCEL_AND_WAIT',
920
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
921
}
922
],
923
}
924
expected_result_release = copy.deepcopy(expected_result)
925
expected_result_release['Steps'][0]['HadoopJarStep'] = (
926
self.PIG_DEFAULT_COMMAND_RUNNER_STEP
927
)
928
929
self.assert_params_for_ami_and_release_based_clusters(
930
cmd=cmd,
931
expected_result=expected_result,
932
expected_result_release=expected_result_release,
933
)
934
935
def test_impala_step_with_default_fields(self):
936
test_step_config = 'Type=Impala,' + self.IMPALA_BASIC_ARGS
937
cmd = self.prefix + test_step_config
938
expected_result = {
939
'JobFlowId': 'j-ABC',
940
'Steps': [
941
{
942
'Name': 'Impala program',
943
'ActionOnFailure': 'CONTINUE',
944
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
945
}
946
],
947
}
948
self.assert_params_for_cmd(cmd, expected_result)
949
950
def test_SPARK_SUBMIT_SCRIPT_RUNNER_STEP(self):
951
cmd = self.prefix + 'Type=SPARK,' + self.SPARK_SUBMIT_BASIC_ARGS
952
expected_result = {
953
'JobFlowId': 'j-ABC',
954
'Steps': [
955
{
956
'Name': 'Spark application',
957
'ActionOnFailure': 'CONTINUE',
958
'HadoopJarStep': self.SPARK_SUBMIT_SCRIPT_RUNNER_STEP,
959
}
960
],
961
}
962
expected_result_release = copy.deepcopy(expected_result)
963
expected_result_release['Steps'][0]['HadoopJarStep'] = (
964
self.SPARK_SUBMIT_COMMAND_RUNNER_STEP
965
)
966
967
self.assert_params_for_ami_and_release_based_clusters(
968
cmd=cmd,
969
expected_result=expected_result,
970
expected_result_release=expected_result_release,
971
)
972
973
def test_spark_missing_arg(self):
974
cmd = self.prefix + 'Type=SPARK'
975
expected_error_msg = (
976
'\naws: error: The following '
977
+ 'required parameters are missing for SparkStepConfig: Args.\n'
978
)
979
self.assert_error_for_ami_and_release_based_clusters(
980
cmd=cmd,
981
expected_error_msg=expected_error_msg,
982
expected_result_release=expected_error_msg,
983
)
984
985
def test_spark_step_with_step_monitoring_configuration(self):
986
cmd = (
987
self.prefix
988
+ 'Type=SPARK,'
989
+ self.SPARK_SUBMIT_BASIC_ARGS
990
+ ','
991
+ 'LogUri="TestLogUri",'
992
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
993
)
994
expected_result = {
995
'JobFlowId': 'j-ABC',
996
'Steps': [
997
{
998
'Name': 'Spark application',
999
'ActionOnFailure': 'CONTINUE',
1000
'HadoopJarStep': self.SPARK_SUBMIT_SCRIPT_RUNNER_STEP,
1001
'StepMonitoringConfiguration': {
1002
'S3MonitoringConfiguration': {
1003
'LogUri': "TestLogUri",
1004
'EncryptionKeyArn': "TestEncryptionKeyArn",
1005
}
1006
},
1007
}
1008
],
1009
}
1010
expected_result_release = copy.deepcopy(expected_result)
1011
expected_result_release['Steps'][0]['HadoopJarStep'] = (
1012
self.SPARK_SUBMIT_COMMAND_RUNNER_STEP
1013
)
1014
1015
self.assert_params_for_ami_and_release_based_clusters(
1016
cmd=cmd,
1017
expected_result=expected_result,
1018
expected_result_release=expected_result_release,
1019
)
1020
1021
def test_spark_step_with_step_monitoring_configuration_log_uri_only(self):
1022
cmd = (
1023
self.prefix
1024
+ 'Type=SPARK,'
1025
+ self.SPARK_SUBMIT_BASIC_ARGS
1026
+ ','
1027
+ 'LogUri="TestLogUri"'
1028
)
1029
expected_result = {
1030
'JobFlowId': 'j-ABC',
1031
'Steps': [
1032
{
1033
'Name': 'Spark application',
1034
'ActionOnFailure': 'CONTINUE',
1035
'HadoopJarStep': self.SPARK_SUBMIT_SCRIPT_RUNNER_STEP,
1036
'StepMonitoringConfiguration': {
1037
'S3MonitoringConfiguration': {'LogUri': "TestLogUri"}
1038
},
1039
}
1040
],
1041
}
1042
expected_result_release = copy.deepcopy(expected_result)
1043
expected_result_release['Steps'][0]['HadoopJarStep'] = (
1044
self.SPARK_SUBMIT_COMMAND_RUNNER_STEP
1045
)
1046
1047
self.assert_params_for_ami_and_release_based_clusters(
1048
cmd=cmd,
1049
expected_result=expected_result,
1050
expected_result_release=expected_result_release,
1051
)
1052
1053
def test_spark_step_with_step_monitoring_configuration_encryption_key_arn_only(
1054
self,
1055
):
1056
cmd = (
1057
self.prefix
1058
+ 'Type=SPARK,'
1059
+ self.SPARK_SUBMIT_BASIC_ARGS
1060
+ ','
1061
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
1062
)
1063
expected_result = {
1064
'JobFlowId': 'j-ABC',
1065
'Steps': [
1066
{
1067
'Name': 'Spark application',
1068
'ActionOnFailure': 'CONTINUE',
1069
'HadoopJarStep': self.SPARK_SUBMIT_SCRIPT_RUNNER_STEP,
1070
'StepMonitoringConfiguration': {
1071
'S3MonitoringConfiguration': {
1072
'EncryptionKeyArn': "TestEncryptionKeyArn"
1073
}
1074
},
1075
}
1076
],
1077
}
1078
expected_result_release = copy.deepcopy(expected_result)
1079
expected_result_release['Steps'][0]['HadoopJarStep'] = (
1080
self.SPARK_SUBMIT_COMMAND_RUNNER_STEP
1081
)
1082
1083
self.assert_params_for_ami_and_release_based_clusters(
1084
cmd=cmd,
1085
expected_result=expected_result,
1086
expected_result_release=expected_result_release,
1087
)
1088
1089
def test_spark_step_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
1090
self,
1091
):
1092
cmd = self.prefix + 'Type=SPARK,' + self.SPARK_SUBMIT_BASIC_ARGS
1093
expected_result = {
1094
'JobFlowId': 'j-ABC',
1095
'Steps': [
1096
{
1097
'Name': 'Spark application',
1098
'ActionOnFailure': 'CONTINUE',
1099
'HadoopJarStep': self.SPARK_SUBMIT_SCRIPT_RUNNER_STEP,
1100
}
1101
],
1102
}
1103
expected_result_release = copy.deepcopy(expected_result)
1104
expected_result_release['Steps'][0]['HadoopJarStep'] = (
1105
self.SPARK_SUBMIT_COMMAND_RUNNER_STEP
1106
)
1107
1108
self.assert_params_for_ami_and_release_based_clusters(
1109
cmd=cmd,
1110
expected_result=expected_result,
1111
expected_result_release=expected_result_release,
1112
)
1113
1114
def test_impala_missing_args(self):
1115
cmd = self.prefix + 'Type=Impala'
1116
expected_error_msg = (
1117
'\naws: error: The following '
1118
+ 'required parameters are missing for ImpalaStepConfig: Args.\n'
1119
)
1120
self.assert_error_for_ami_and_release_based_clusters(
1121
cmd=cmd,
1122
expected_error_msg=expected_error_msg,
1123
expected_result_release=None,
1124
)
1125
1126
def test_impala_step_with_all_fields(self):
1127
test_step_config = (
1128
'Name=ImpalaWithAllFields,'
1129
+ 'Type=Impala,'
1130
+ self.IMPALA_BASIC_ARGS
1131
+ ','
1132
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
1133
+ 'LogUri="TestLogUri",'
1134
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
1135
)
1136
cmd = self.prefix + test_step_config
1137
expected_result = {
1138
'JobFlowId': 'j-ABC',
1139
'Steps': [
1140
{
1141
'Name': 'ImpalaWithAllFields',
1142
'ActionOnFailure': 'CANCEL_AND_WAIT',
1143
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1144
'StepMonitoringConfiguration': {
1145
'S3MonitoringConfiguration': {
1146
'LogUri': "TestLogUri",
1147
'EncryptionKeyArn': "TestEncryptionKeyArn",
1148
}
1149
},
1150
}
1151
],
1152
}
1153
self.assert_params_for_ami_and_release_based_clusters(
1154
cmd=cmd,
1155
expected_result=expected_result,
1156
expected_result_release=None,
1157
)
1158
1159
def test_impala_step_with_step_monitoring_configuration_log_uri_only(self):
1160
test_step_config = (
1161
'Name=ImpalaWithAllFields,'
1162
+ 'Type=Impala,'
1163
+ self.IMPALA_BASIC_ARGS
1164
+ ','
1165
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
1166
+ 'LogUri="TestLogUri"'
1167
)
1168
cmd = self.prefix + test_step_config
1169
expected_result = {
1170
'JobFlowId': 'j-ABC',
1171
'Steps': [
1172
{
1173
'Name': 'ImpalaWithAllFields',
1174
'ActionOnFailure': 'CANCEL_AND_WAIT',
1175
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1176
'StepMonitoringConfiguration': {
1177
'S3MonitoringConfiguration': {'LogUri': "TestLogUri"}
1178
},
1179
}
1180
],
1181
}
1182
self.assert_params_for_ami_and_release_based_clusters(
1183
cmd=cmd,
1184
expected_result=expected_result,
1185
expected_result_release=None,
1186
)
1187
1188
def test_impala_step_with_step_monitoring_configuration_encryption_key_arn_only(
1189
self,
1190
):
1191
test_step_config = (
1192
'Name=ImpalaWithAllFields,'
1193
+ 'Type=Impala,'
1194
+ self.IMPALA_BASIC_ARGS
1195
+ ','
1196
+ 'ActionOnFailure=CANCEL_AND_WAIT,'
1197
+ 'EncryptionKeyArn="TestEncryptionKeyArn"'
1198
)
1199
cmd = self.prefix + test_step_config
1200
expected_result = {
1201
'JobFlowId': 'j-ABC',
1202
'Steps': [
1203
{
1204
'Name': 'ImpalaWithAllFields',
1205
'ActionOnFailure': 'CANCEL_AND_WAIT',
1206
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1207
'StepMonitoringConfiguration': {
1208
'S3MonitoringConfiguration': {
1209
'EncryptionKeyArn': "TestEncryptionKeyArn"
1210
}
1211
},
1212
}
1213
],
1214
}
1215
self.assert_params_for_ami_and_release_based_clusters(
1216
cmd=cmd,
1217
expected_result=expected_result,
1218
expected_result_release=None,
1219
)
1220
1221
def test_impala_step_with_step_monitoring_configuration_no_log_uri_or_encryption_key_arn(
1222
self,
1223
):
1224
test_step_config = (
1225
'Name=ImpalaWithAllFields,'
1226
+ 'Type=Impala,'
1227
+ self.IMPALA_BASIC_ARGS
1228
+ ','
1229
+ 'ActionOnFailure=CANCEL_AND_WAIT'
1230
)
1231
cmd = self.prefix + test_step_config
1232
expected_result = {
1233
'JobFlowId': 'j-ABC',
1234
'Steps': [
1235
{
1236
'Name': 'ImpalaWithAllFields',
1237
'ActionOnFailure': 'CANCEL_AND_WAIT',
1238
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1239
}
1240
],
1241
}
1242
self.assert_params_for_ami_and_release_based_clusters(
1243
cmd=cmd,
1244
expected_result=expected_result,
1245
expected_result_release=None,
1246
)
1247
1248
def test_impala_step_with_release(self):
1249
test_step_config = 'Type=Impala,' + self.IMPALA_BASIC_ARGS
1250
cmd = self.prefix + test_step_config
1251
expected_result_release = (
1252
'\naws: error: The step type impala ' + 'is not supported.\n'
1253
)
1254
1255
self.assert_error_for_ami_and_release_based_clusters(
1256
cmd=cmd,
1257
expected_error_msg=None,
1258
expected_result_release=expected_result_release,
1259
)
1260
1261
def test_empty_step_args(self):
1262
cmd = self.prefix + 'Type=Streaming,Args='
1263
expected_error_msg = (
1264
'\naws: error: The prameter Args cannot ' 'be an empty list.\n'
1265
)
1266
self.assert_error_for_ami_and_release_based_clusters(
1267
cmd=cmd,
1268
expected_error_msg=expected_error_msg,
1269
expected_result_release=expected_error_msg,
1270
)
1271
1272
cmd = self.prefix + 'Type=Pig,Args='
1273
self.assert_error_for_ami_and_release_based_clusters(
1274
cmd=cmd,
1275
expected_error_msg=expected_error_msg,
1276
expected_result_release=expected_error_msg,
1277
)
1278
1279
cmd = self.prefix + 'Type=Hive,Args='
1280
self.assert_error_for_ami_and_release_based_clusters(
1281
cmd=cmd,
1282
expected_error_msg=expected_error_msg,
1283
expected_result_release=expected_error_msg,
1284
)
1285
1286
cmd = self.prefix + 'Args='
1287
expected_error_msg = (
1288
'\naws: error: The following required parameters'
1289
' are missing for CustomJARStepConfig: Jar.\n'
1290
)
1291
self.assert_error_for_ami_and_release_based_clusters(
1292
cmd=cmd,
1293
expected_error_msg=expected_error_msg,
1294
expected_result_release=expected_error_msg,
1295
)
1296
1297
def test_all_step_types(self):
1298
test_step_config = (
1299
'Jar=s3://mybucket/mytest.jar '
1300
+ ' Type=Streaming,'
1301
+ self.STREAMING_ARGS
1302
+ ' Type=Hive,'
1303
+ self.HIVE_BASIC_ARGS
1304
+ ' Type=Pig,'
1305
+ self.PIG_BASIC_ARGS
1306
+ ' Type=Impala,'
1307
+ self.IMPALA_BASIC_ARGS
1308
)
1309
cmd = self.prefix + test_step_config
1310
expected_result = {
1311
'JobFlowId': 'j-ABC',
1312
'Steps': [
1313
{
1314
'Name': 'Custom JAR',
1315
'ActionOnFailure': 'CONTINUE',
1316
'HadoopJarStep': {'Jar': 's3://mybucket/mytest.jar'},
1317
},
1318
{
1319
'Name': 'Streaming program',
1320
'ActionOnFailure': 'CONTINUE',
1321
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
1322
},
1323
{
1324
'Name': 'Hive program',
1325
'ActionOnFailure': 'CONTINUE',
1326
'HadoopJarStep': self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP,
1327
},
1328
{
1329
'Name': 'Pig program',
1330
'ActionOnFailure': 'CONTINUE',
1331
'HadoopJarStep': self.PIG_DEFAULT_SCRIPT_RUNNER_STEP,
1332
},
1333
{
1334
'Name': 'Impala program',
1335
'ActionOnFailure': 'CONTINUE',
1336
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1337
},
1338
],
1339
}
1340
1341
self.assert_params_for_ami_and_release_based_clusters(
1342
cmd=cmd,
1343
expected_result=expected_result,
1344
expected_result_release=None,
1345
)
1346
1347
def test_all_step_types_release(self):
1348
test_step_config = (
1349
'Jar=s3://mybucket/mytest.jar '
1350
+ ' Type=Streaming,'
1351
+ self.STREAMING_ARGS
1352
+ ' Type=Hive,'
1353
+ self.HIVE_BASIC_ARGS
1354
+ ' Type=Pig,'
1355
+ self.PIG_BASIC_ARGS
1356
)
1357
1358
cmd = self.prefix + test_step_config
1359
expected_result_release = {
1360
'JobFlowId': 'j-ABC',
1361
'Steps': [
1362
{
1363
'Name': 'Custom JAR',
1364
'ActionOnFailure': 'CONTINUE',
1365
'HadoopJarStep': {'Jar': 's3://mybucket/mytest.jar'},
1366
},
1367
{
1368
'Name': 'Streaming program',
1369
'ActionOnFailure': 'CONTINUE',
1370
'HadoopJarStep': self.STREAMING_HADOOP_COMMAND_RUNNER_STEP,
1371
},
1372
{
1373
'Name': 'Hive program',
1374
'ActionOnFailure': 'CONTINUE',
1375
'HadoopJarStep': self.HIVE_DEFAULT_COMMAND_RUNNER_STEP,
1376
},
1377
{
1378
'Name': 'Pig program',
1379
'ActionOnFailure': 'CONTINUE',
1380
'HadoopJarStep': self.PIG_DEFAULT_COMMAND_RUNNER_STEP,
1381
},
1382
],
1383
}
1384
1385
self.assert_params_for_ami_and_release_based_clusters(
1386
cmd=cmd,
1387
expected_result=None,
1388
expected_result_release=expected_result_release,
1389
)
1390
1391
def test_all_step_types_from_json(self):
1392
data_path = os.path.join(os.path.dirname(__file__), 'input_steps.json')
1393
cmd = self.prefix + 'file://' + data_path
1394
hive_script_runner_step = copy.deepcopy(
1395
self.HIVE_DEFAULT_SCRIPT_RUNNER_STEP
1396
)
1397
hive_script_runner_step['Args'] += [
1398
'-d',
1399
'INPUT=s3://elasticmapreduce/samples/hive-ads/tables',
1400
'-d',
1401
'OUTPUT=s3://mybucket/hive-ads/output/2014-04-18/11-07-32',
1402
'-d',
1403
'LIBS=s3://elasticmapreduce/samples/hive-ads/libs',
1404
]
1405
pig_script_runner_step = copy.deepcopy(
1406
self.PIG_DEFAULT_SCRIPT_RUNNER_STEP
1407
)
1408
pig_script_runner_step['Args'] += [
1409
'-p',
1410
'INPUT=s3://elasticmapreduce/samples/pig-apache/input',
1411
'-p',
1412
'OUTPUT=s3://mybucket/pig-apache/output/2014-04-21/20-09-28',
1413
]
1414
1415
expected_result = {
1416
'JobFlowId': 'j-ABC',
1417
'Steps': [
1418
{
1419
'Name': 'Custom JAR step',
1420
'ActionOnFailure': 'CANCEL_AND_WAIT',
1421
'HadoopJarStep': {'Jar': 's3://mybucket/mytest.jar'},
1422
},
1423
{
1424
'Name': 'Streaming step',
1425
'ActionOnFailure': 'CANCEL_AND_WAIT',
1426
'HadoopJarStep': self.STREAMING_HADOOP_SCRIPT_RUNNER_STEP,
1427
},
1428
{
1429
'Name': 'Hive step',
1430
'ActionOnFailure': 'TERMINATE_CLUSTER',
1431
'HadoopJarStep': hive_script_runner_step,
1432
},
1433
{
1434
'Name': 'Pig step',
1435
'ActionOnFailure': 'TERMINATE_CLUSTER',
1436
'HadoopJarStep': pig_script_runner_step,
1437
},
1438
{
1439
'Name': 'Impala step',
1440
'ActionOnFailure': 'CANCEL_AND_WAIT',
1441
'HadoopJarStep': self.IMPALA_BASIC_SCRIPT_RUNNER_STEP,
1442
},
1443
],
1444
}
1445
self.assert_params_for_cmd(cmd, expected_result)
1446
1447
@mock.patch('awscli.customizations.emr.' 'emrutils.get_release_label')
1448
def assert_params_for_ami_and_release_based_clusters(
1449
self, grl_patch, cmd, expected_result, expected_result_release
1450
):
1451
if expected_result:
1452
grl_patch.return_value = None
1453
self.assert_params_for_cmd(cmd, expected_result)
1454
if expected_result_release:
1455
grl_patch.return_value = 'emr-4.0'
1456
self.assert_params_for_cmd(cmd, expected_result_release)
1457
1458
@mock.patch('awscli.customizations.emr.' 'emrutils.get_release_label')
1459
def assert_error_for_ami_and_release_based_clusters(
1460
self, grl_patch, cmd, expected_error_msg, expected_result_release
1461
):
1462
if expected_error_msg:
1463
grl_patch.return_value = None
1464
result = self.run_cmd(cmd, 255)
1465
self.assertEqual(expected_error_msg, result[1])
1466
if expected_result_release:
1467
grl_patch.return_value = 'emr-4.0'
1468
result = self.run_cmd(cmd, 255)
1469
self.assertEqual(expected_result_release, result[1])
1470
1471
1472
if __name__ == "__main__":
1473
unittest.main()
1474
1475