Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/emr/createcluster.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import re
15
16
from botocore.compat import json
17
18
from awscli.customizations.commands import BasicCommand
19
from awscli.customizations.emr import (
20
applicationutils,
21
argumentschema,
22
constants,
23
emrfsutils,
24
emrutils,
25
exceptions,
26
hbaseutils,
27
helptext,
28
instancefleetsutils,
29
instancegroupsutils,
30
steputils,
31
)
32
from awscli.customizations.emr.command import Command
33
from awscli.customizations.emr.constants import EC2_ROLE_NAME, EMR_ROLE_NAME
34
35
36
class CreateCluster(Command):
37
NAME = 'create-cluster'
38
DESCRIPTION = helptext.CREATE_CLUSTER_DESCRIPTION
39
ARG_TABLE = [
40
{'name': 'release-label', 'help_text': helptext.RELEASE_LABEL},
41
{'name': 'os-release-label', 'help_text': helptext.OS_RELEASE_LABEL},
42
{'name': 'ami-version', 'help_text': helptext.AMI_VERSION},
43
{
44
'name': 'instance-groups',
45
'schema': argumentschema.INSTANCE_GROUPS_SCHEMA,
46
'help_text': helptext.INSTANCE_GROUPS,
47
},
48
{'name': 'instance-type', 'help_text': helptext.INSTANCE_TYPE},
49
{'name': 'instance-count', 'help_text': helptext.INSTANCE_COUNT},
50
{
51
'name': 'auto-terminate',
52
'action': 'store_true',
53
'group_name': 'auto_terminate',
54
'help_text': helptext.AUTO_TERMINATE,
55
},
56
{
57
'name': 'no-auto-terminate',
58
'action': 'store_true',
59
'group_name': 'auto_terminate',
60
},
61
{
62
'name': 'instance-fleets',
63
'schema': argumentschema.INSTANCE_FLEETS_SCHEMA,
64
'help_text': helptext.INSTANCE_FLEETS,
65
},
66
{
67
'name': 'name',
68
'default': 'Development Cluster',
69
'help_text': helptext.CLUSTER_NAME,
70
},
71
{'name': 'log-uri', 'help_text': helptext.LOG_URI},
72
{
73
'name': 'log-encryption-kms-key-id',
74
'help_text': helptext.LOG_ENCRYPTION_KMS_KEY_ID,
75
},
76
{'name': 'service-role', 'help_text': helptext.SERVICE_ROLE},
77
{'name': 'auto-scaling-role', 'help_text': helptext.AUTOSCALING_ROLE},
78
{
79
'name': 'use-default-roles',
80
'action': 'store_true',
81
'help_text': helptext.USE_DEFAULT_ROLES,
82
},
83
{'name': 'configurations', 'help_text': helptext.CONFIGURATIONS},
84
{
85
'name': 'ec2-attributes',
86
'help_text': helptext.EC2_ATTRIBUTES,
87
'schema': argumentschema.EC2_ATTRIBUTES_SCHEMA,
88
},
89
{
90
'name': 'termination-protected',
91
'action': 'store_true',
92
'group_name': 'termination_protected',
93
'help_text': helptext.TERMINATION_PROTECTED,
94
},
95
{
96
'name': 'no-termination-protected',
97
'action': 'store_true',
98
'group_name': 'termination_protected',
99
},
100
{
101
'name': 'unhealthy-node-replacement',
102
'action': 'store_true',
103
'group_name': 'unhealthy_node_replacement',
104
'help_text': helptext.UNHEALTHY_NODE_REPLACEMENT,
105
},
106
{
107
'name': 'no-unhealthy-node-replacement',
108
'action': 'store_true',
109
'group_name': 'unhealthy_node_replacement',
110
},
111
{
112
'name': 'scale-down-behavior',
113
'help_text': helptext.SCALE_DOWN_BEHAVIOR,
114
},
115
{
116
'name': 'visible-to-all-users',
117
'action': 'store_true',
118
'group_name': 'visibility',
119
'help_text': helptext.VISIBILITY,
120
},
121
{
122
'name': 'no-visible-to-all-users',
123
'action': 'store_true',
124
'group_name': 'visibility',
125
},
126
{
127
'name': 'enable-debugging',
128
'action': 'store_true',
129
'group_name': 'debug',
130
'help_text': helptext.DEBUGGING,
131
},
132
{
133
'name': 'no-enable-debugging',
134
'action': 'store_true',
135
'group_name': 'debug',
136
},
137
{
138
'name': 'tags',
139
'nargs': '+',
140
'help_text': helptext.TAGS,
141
'schema': argumentschema.TAGS_SCHEMA,
142
},
143
{
144
'name': 'bootstrap-actions',
145
'help_text': helptext.BOOTSTRAP_ACTIONS,
146
'schema': argumentschema.BOOTSTRAP_ACTIONS_SCHEMA,
147
},
148
{
149
'name': 'applications',
150
'help_text': helptext.APPLICATIONS,
151
'schema': argumentschema.APPLICATIONS_SCHEMA,
152
},
153
{
154
'name': 'emrfs',
155
'help_text': helptext.EMR_FS,
156
'schema': argumentschema.EMR_FS_SCHEMA,
157
},
158
{
159
'name': 'steps',
160
'schema': argumentschema.STEPS_SCHEMA,
161
'help_text': helptext.STEPS,
162
},
163
{'name': 'additional-info', 'help_text': helptext.ADDITIONAL_INFO},
164
{
165
'name': 'restore-from-hbase-backup',
166
'schema': argumentschema.HBASE_RESTORE_FROM_BACKUP_SCHEMA,
167
'help_text': helptext.RESTORE_FROM_HBASE,
168
},
169
{
170
'name': 'security-configuration',
171
'help_text': helptext.SECURITY_CONFIG,
172
},
173
{'name': 'custom-ami-id', 'help_text': helptext.CUSTOM_AMI_ID},
174
{
175
'name': 'ebs-root-volume-size',
176
'help_text': helptext.EBS_ROOT_VOLUME_SIZE,
177
},
178
{
179
'name': 'ebs-root-volume-iops',
180
'help_text': helptext.EBS_ROOT_VOLUME_IOPS,
181
},
182
{
183
'name': 'ebs-root-volume-throughput',
184
'help_text': helptext.EBS_ROOT_VOLUME_THROUGHPUT,
185
},
186
{
187
'name': 'repo-upgrade-on-boot',
188
'help_text': helptext.REPO_UPGRADE_ON_BOOT,
189
},
190
{
191
'name': 'kerberos-attributes',
192
'schema': argumentschema.KERBEROS_ATTRIBUTES_SCHEMA,
193
'help_text': helptext.KERBEROS_ATTRIBUTES,
194
},
195
{
196
'name': 'step-concurrency-level',
197
'cli_type_name': 'integer',
198
'help_text': helptext.STEP_CONCURRENCY_LEVEL,
199
},
200
{
201
'name': 'managed-scaling-policy',
202
'schema': argumentschema.MANAGED_SCALING_POLICY_SCHEMA,
203
'help_text': helptext.MANAGED_SCALING_POLICY,
204
},
205
{
206
'name': 'placement-group-configs',
207
'schema': argumentschema.PLACEMENT_GROUP_CONFIGS_SCHEMA,
208
'help_text': helptext.PLACEMENT_GROUP_CONFIGS,
209
},
210
{
211
'name': 'auto-termination-policy',
212
'schema': argumentschema.AUTO_TERMINATION_POLICY_SCHEMA,
213
'help_text': helptext.AUTO_TERMINATION_POLICY,
214
},
215
{
216
'name': 'extended-support',
217
'action': 'store_true',
218
'group_name': 'extended-support',
219
'help_text': helptext.EXTENDED_SUPPORT,
220
},
221
{
222
'name': 'no-extended-support',
223
'action': 'store_true',
224
'group_name': 'extended-support',
225
},
226
]
227
SYNOPSIS = BasicCommand.FROM_FILE('emr', 'create-cluster-synopsis.txt')
228
EXAMPLES = BasicCommand.FROM_FILE('emr', 'create-cluster-examples.rst')
229
230
def _run_main_command(self, parsed_args, parsed_globals):
231
params = {}
232
params['Name'] = parsed_args.name
233
234
self._validate_release_label_ami_version(parsed_args)
235
236
service_role_validation_message = (
237
" Either choose --use-default-roles or use both --service-role "
238
"<roleName> and --ec2-attributes InstanceProfile=<profileName>."
239
)
240
241
if (
242
parsed_args.use_default_roles is True
243
and parsed_args.service_role is not None
244
):
245
raise exceptions.MutualExclusiveOptionError(
246
option1="--use-default-roles",
247
option2="--service-role",
248
message=service_role_validation_message,
249
)
250
251
if (
252
parsed_args.use_default_roles is True
253
and parsed_args.ec2_attributes is not None
254
and 'InstanceProfile' in parsed_args.ec2_attributes
255
):
256
raise exceptions.MutualExclusiveOptionError(
257
option1="--use-default-roles",
258
option2="--ec2-attributes InstanceProfile",
259
message=service_role_validation_message,
260
)
261
262
if (
263
parsed_args.instance_groups is not None
264
and parsed_args.instance_fleets is not None
265
):
266
raise exceptions.MutualExclusiveOptionError(
267
option1="--instance-groups", option2="--instance-fleets"
268
)
269
270
instances_config = {}
271
if parsed_args.instance_fleets is not None:
272
instances_config['InstanceFleets'] = (
273
instancefleetsutils.validate_and_build_instance_fleets(
274
parsed_args.instance_fleets
275
)
276
)
277
else:
278
instances_config['InstanceGroups'] = (
279
instancegroupsutils.validate_and_build_instance_groups(
280
instance_groups=parsed_args.instance_groups,
281
instance_type=parsed_args.instance_type,
282
instance_count=parsed_args.instance_count,
283
)
284
)
285
286
if parsed_args.release_label is not None:
287
params["ReleaseLabel"] = parsed_args.release_label
288
if parsed_args.configurations is not None:
289
try:
290
params["Configurations"] = json.loads(
291
parsed_args.configurations
292
)
293
except ValueError:
294
raise ValueError(
295
'aws: error: invalid json argument for '
296
'option --configurations'
297
)
298
299
if (
300
parsed_args.release_label is None
301
and parsed_args.ami_version is not None
302
):
303
is_valid_ami_version = re.match(
304
r'\d?\..*', parsed_args.ami_version
305
)
306
if is_valid_ami_version is None:
307
raise exceptions.InvalidAmiVersionError(
308
ami_version=parsed_args.ami_version
309
)
310
params['AmiVersion'] = parsed_args.ami_version
311
emrutils.apply_dict(
312
params, 'AdditionalInfo', parsed_args.additional_info
313
)
314
emrutils.apply_dict(params, 'LogUri', parsed_args.log_uri)
315
316
if parsed_args.os_release_label is not None:
317
emrutils.apply_dict(
318
params, 'OSReleaseLabel', parsed_args.os_release_label
319
)
320
321
if parsed_args.log_encryption_kms_key_id is not None:
322
emrutils.apply_dict(
323
params,
324
'LogEncryptionKmsKeyId',
325
parsed_args.log_encryption_kms_key_id,
326
)
327
328
if parsed_args.use_default_roles is True:
329
parsed_args.service_role = EMR_ROLE_NAME
330
if parsed_args.ec2_attributes is None:
331
parsed_args.ec2_attributes = {}
332
parsed_args.ec2_attributes['InstanceProfile'] = EC2_ROLE_NAME
333
334
emrutils.apply_dict(params, 'ServiceRole', parsed_args.service_role)
335
336
if parsed_args.instance_groups is not None:
337
for instance_group in instances_config['InstanceGroups']:
338
if 'AutoScalingPolicy' in instance_group.keys():
339
if parsed_args.auto_scaling_role is None:
340
raise exceptions.MissingAutoScalingRoleError()
341
342
emrutils.apply_dict(
343
params, 'AutoScalingRole', parsed_args.auto_scaling_role
344
)
345
346
if parsed_args.scale_down_behavior is not None:
347
emrutils.apply_dict(
348
params, 'ScaleDownBehavior', parsed_args.scale_down_behavior
349
)
350
351
if (
352
parsed_args.no_auto_terminate is False
353
and parsed_args.auto_terminate is False
354
):
355
parsed_args.no_auto_terminate = True
356
357
instances_config['KeepJobFlowAliveWhenNoSteps'] = (
358
emrutils.apply_boolean_options(
359
parsed_args.no_auto_terminate,
360
'--no-auto-terminate',
361
parsed_args.auto_terminate,
362
'--auto-terminate',
363
)
364
)
365
366
instances_config['TerminationProtected'] = (
367
emrutils.apply_boolean_options(
368
parsed_args.termination_protected,
369
'--termination-protected',
370
parsed_args.no_termination_protected,
371
'--no-termination-protected',
372
)
373
)
374
375
if (
376
parsed_args.unhealthy_node_replacement
377
or parsed_args.no_unhealthy_node_replacement
378
):
379
instances_config['UnhealthyNodeReplacement'] = (
380
emrutils.apply_boolean_options(
381
parsed_args.unhealthy_node_replacement,
382
'--unhealthy-node-replacement',
383
parsed_args.no_unhealthy_node_replacement,
384
'--no-unhealthy-node-replacement',
385
)
386
)
387
388
if (
389
parsed_args.visible_to_all_users is False
390
and parsed_args.no_visible_to_all_users is False
391
):
392
parsed_args.visible_to_all_users = True
393
394
params['VisibleToAllUsers'] = emrutils.apply_boolean_options(
395
parsed_args.visible_to_all_users,
396
'--visible-to-all-users',
397
parsed_args.no_visible_to_all_users,
398
'--no-visible-to-all-users',
399
)
400
401
params['Tags'] = emrutils.parse_tags(parsed_args.tags)
402
params['Instances'] = instances_config
403
404
if parsed_args.ec2_attributes is not None:
405
self._build_ec2_attributes(
406
cluster=params, parsed_attrs=parsed_args.ec2_attributes
407
)
408
409
debugging_enabled = emrutils.apply_boolean_options(
410
parsed_args.enable_debugging,
411
'--enable-debugging',
412
parsed_args.no_enable_debugging,
413
'--no-enable-debugging',
414
)
415
416
if parsed_args.log_uri is None and debugging_enabled is True:
417
raise exceptions.LogUriError
418
419
if debugging_enabled is True:
420
self._update_cluster_dict(
421
cluster=params,
422
key='Steps',
423
value=[
424
self._build_enable_debugging(parsed_args, parsed_globals)
425
],
426
)
427
428
if parsed_args.applications is not None:
429
if parsed_args.release_label is None:
430
app_list, ba_list, step_list = (
431
applicationutils.build_applications(
432
region=self.region,
433
parsed_applications=parsed_args.applications,
434
ami_version=params['AmiVersion'],
435
)
436
)
437
self._update_cluster_dict(
438
params, 'NewSupportedProducts', app_list
439
)
440
self._update_cluster_dict(params, 'BootstrapActions', ba_list)
441
self._update_cluster_dict(params, 'Steps', step_list)
442
else:
443
params["Applications"] = []
444
for application in parsed_args.applications:
445
params["Applications"].append(application)
446
447
hbase_restore_config = parsed_args.restore_from_hbase_backup
448
if hbase_restore_config is not None:
449
args = hbaseutils.build_hbase_restore_from_backup_args(
450
dir=hbase_restore_config.get('Dir'),
451
backup_version=hbase_restore_config.get('BackupVersion'),
452
)
453
step_config = emrutils.build_step(
454
jar=constants.HBASE_JAR_PATH,
455
name=constants.HBASE_RESTORE_STEP_NAME,
456
action_on_failure=constants.CANCEL_AND_WAIT,
457
args=args,
458
)
459
self._update_cluster_dict(params, 'Steps', [step_config])
460
461
if parsed_args.bootstrap_actions is not None:
462
self._build_bootstrap_actions(
463
cluster=params,
464
parsed_boostrap_actions=parsed_args.bootstrap_actions,
465
)
466
467
if parsed_args.emrfs is not None:
468
self._handle_emrfs_parameters(
469
cluster=params,
470
emrfs_args=parsed_args.emrfs,
471
release_label=parsed_args.release_label,
472
)
473
474
if parsed_args.steps is not None:
475
steps_list = steputils.build_step_config_list(
476
parsed_step_list=parsed_args.steps,
477
region=self.region,
478
release_label=parsed_args.release_label,
479
)
480
self._update_cluster_dict(
481
cluster=params, key='Steps', value=steps_list
482
)
483
484
if parsed_args.security_configuration is not None:
485
emrutils.apply_dict(
486
params,
487
'SecurityConfiguration',
488
parsed_args.security_configuration,
489
)
490
491
if parsed_args.custom_ami_id is not None:
492
emrutils.apply_dict(
493
params, 'CustomAmiId', parsed_args.custom_ami_id
494
)
495
if parsed_args.ebs_root_volume_size is not None:
496
emrutils.apply_dict(
497
params,
498
'EbsRootVolumeSize',
499
int(parsed_args.ebs_root_volume_size),
500
)
501
if parsed_args.ebs_root_volume_iops is not None:
502
emrutils.apply_dict(
503
params,
504
'EbsRootVolumeIops',
505
int(parsed_args.ebs_root_volume_iops),
506
)
507
if parsed_args.ebs_root_volume_throughput is not None:
508
emrutils.apply_dict(
509
params,
510
'EbsRootVolumeThroughput',
511
int(parsed_args.ebs_root_volume_throughput),
512
)
513
514
if parsed_args.repo_upgrade_on_boot is not None:
515
emrutils.apply_dict(
516
params, 'RepoUpgradeOnBoot', parsed_args.repo_upgrade_on_boot
517
)
518
519
if parsed_args.kerberos_attributes is not None:
520
emrutils.apply_dict(
521
params, 'KerberosAttributes', parsed_args.kerberos_attributes
522
)
523
524
if parsed_args.step_concurrency_level is not None:
525
params['StepConcurrencyLevel'] = parsed_args.step_concurrency_level
526
527
if parsed_args.extended_support or parsed_args.no_extended_support:
528
params['ExtendedSupport'] = emrutils.apply_boolean_options(
529
parsed_args.extended_support,
530
'--extended-support',
531
parsed_args.no_extended_support,
532
'--no-extended-support',
533
)
534
535
if parsed_args.managed_scaling_policy is not None:
536
emrutils.apply_dict(
537
params,
538
'ManagedScalingPolicy',
539
parsed_args.managed_scaling_policy,
540
)
541
542
if parsed_args.placement_group_configs is not None:
543
emrutils.apply_dict(
544
params,
545
'PlacementGroupConfigs',
546
parsed_args.placement_group_configs,
547
)
548
549
if parsed_args.auto_termination_policy is not None:
550
emrutils.apply_dict(
551
params,
552
'AutoTerminationPolicy',
553
parsed_args.auto_termination_policy,
554
)
555
556
self._validate_required_applications(parsed_args)
557
558
run_job_flow_response = emrutils.call(
559
self._session,
560
'run_job_flow',
561
params,
562
self.region,
563
parsed_globals.endpoint_url,
564
parsed_globals.verify_ssl,
565
)
566
567
constructed_result = self._construct_result(run_job_flow_response)
568
emrutils.display_response(
569
self._session, 'run_job_flow', constructed_result, parsed_globals
570
)
571
572
return 0
573
574
def _construct_result(self, run_job_flow_result):
575
jobFlowId = None
576
clusterArn = None
577
if run_job_flow_result is not None:
578
jobFlowId = run_job_flow_result.get('JobFlowId')
579
clusterArn = run_job_flow_result.get('ClusterArn')
580
581
if jobFlowId is not None:
582
return {'ClusterId': jobFlowId, 'ClusterArn': clusterArn}
583
else:
584
return {}
585
586
def _build_ec2_attributes(self, cluster, parsed_attrs):
587
keys = parsed_attrs.keys()
588
instances = cluster['Instances']
589
590
if 'SubnetId' in keys and 'SubnetIds' in keys:
591
raise exceptions.MutualExclusiveOptionError(
592
option1="SubnetId", option2="SubnetIds"
593
)
594
595
if 'AvailabilityZone' in keys and 'AvailabilityZones' in keys:
596
raise exceptions.MutualExclusiveOptionError(
597
option1="AvailabilityZone", option2="AvailabilityZones"
598
)
599
600
if ('SubnetId' in keys or 'SubnetIds' in keys) and (
601
'AvailabilityZone' in keys or 'AvailabilityZones' in keys
602
):
603
raise exceptions.SubnetAndAzValidationError
604
605
emrutils.apply_params(
606
src_params=parsed_attrs,
607
src_key='KeyName',
608
dest_params=instances,
609
dest_key='Ec2KeyName',
610
)
611
emrutils.apply_params(
612
src_params=parsed_attrs,
613
src_key='SubnetId',
614
dest_params=instances,
615
dest_key='Ec2SubnetId',
616
)
617
emrutils.apply_params(
618
src_params=parsed_attrs,
619
src_key='SubnetIds',
620
dest_params=instances,
621
dest_key='Ec2SubnetIds',
622
)
623
624
if 'AvailabilityZone' in keys:
625
instances['Placement'] = dict()
626
emrutils.apply_params(
627
src_params=parsed_attrs,
628
src_key='AvailabilityZone',
629
dest_params=instances['Placement'],
630
dest_key='AvailabilityZone',
631
)
632
633
if 'AvailabilityZones' in keys:
634
instances['Placement'] = dict()
635
emrutils.apply_params(
636
src_params=parsed_attrs,
637
src_key='AvailabilityZones',
638
dest_params=instances['Placement'],
639
dest_key='AvailabilityZones',
640
)
641
642
emrutils.apply_params(
643
src_params=parsed_attrs,
644
src_key='InstanceProfile',
645
dest_params=cluster,
646
dest_key='JobFlowRole',
647
)
648
649
emrutils.apply_params(
650
src_params=parsed_attrs,
651
src_key='EmrManagedMasterSecurityGroup',
652
dest_params=instances,
653
dest_key='EmrManagedMasterSecurityGroup',
654
)
655
656
emrutils.apply_params(
657
src_params=parsed_attrs,
658
src_key='EmrManagedSlaveSecurityGroup',
659
dest_params=instances,
660
dest_key='EmrManagedSlaveSecurityGroup',
661
)
662
663
emrutils.apply_params(
664
src_params=parsed_attrs,
665
src_key='ServiceAccessSecurityGroup',
666
dest_params=instances,
667
dest_key='ServiceAccessSecurityGroup',
668
)
669
670
emrutils.apply_params(
671
src_params=parsed_attrs,
672
src_key='AdditionalMasterSecurityGroups',
673
dest_params=instances,
674
dest_key='AdditionalMasterSecurityGroups',
675
)
676
677
emrutils.apply_params(
678
src_params=parsed_attrs,
679
src_key='AdditionalSlaveSecurityGroups',
680
dest_params=instances,
681
dest_key='AdditionalSlaveSecurityGroups',
682
)
683
684
emrutils.apply(params=cluster, key='Instances', value=instances)
685
686
return cluster
687
688
def _build_bootstrap_actions(self, cluster, parsed_boostrap_actions):
689
cluster_ba_list = cluster.get('BootstrapActions')
690
if cluster_ba_list is None:
691
cluster_ba_list = []
692
693
bootstrap_actions = []
694
if (
695
len(cluster_ba_list) + len(parsed_boostrap_actions)
696
> constants.MAX_BOOTSTRAP_ACTION_NUMBER
697
):
698
raise ValueError(
699
'aws: error: maximum number of '
700
'bootstrap actions for a cluster exceeded.'
701
)
702
703
for ba in parsed_boostrap_actions:
704
ba_config = {}
705
if ba.get('Name') is not None:
706
ba_config['Name'] = ba.get('Name')
707
else:
708
ba_config['Name'] = constants.BOOTSTRAP_ACTION_NAME
709
script_arg_config = {}
710
emrutils.apply_params(
711
src_params=ba,
712
src_key='Path',
713
dest_params=script_arg_config,
714
dest_key='Path',
715
)
716
emrutils.apply_params(
717
src_params=ba,
718
src_key='Args',
719
dest_params=script_arg_config,
720
dest_key='Args',
721
)
722
emrutils.apply(
723
params=ba_config,
724
key='ScriptBootstrapAction',
725
value=script_arg_config,
726
)
727
bootstrap_actions.append(ba_config)
728
729
result = cluster_ba_list + bootstrap_actions
730
if result:
731
cluster['BootstrapActions'] = result
732
733
return cluster
734
735
def _build_enable_debugging(self, parsed_args, parsed_globals):
736
if parsed_args.release_label:
737
jar = constants.COMMAND_RUNNER
738
args = [constants.DEBUGGING_COMMAND]
739
else:
740
jar = emrutils.get_script_runner(self.region)
741
args = [
742
emrutils.build_s3_link(
743
relative_path=constants.DEBUGGING_PATH, region=self.region
744
)
745
]
746
747
return emrutils.build_step(
748
name=constants.DEBUGGING_NAME,
749
action_on_failure=constants.TERMINATE_CLUSTER,
750
jar=jar,
751
args=args,
752
)
753
754
def _update_cluster_dict(self, cluster, key, value):
755
if key in cluster:
756
cluster[key] += value
757
elif value:
758
cluster[key] = value
759
return cluster
760
761
def _validate_release_label_ami_version(self, parsed_args):
762
if (
763
parsed_args.ami_version is not None
764
and parsed_args.release_label is not None
765
):
766
raise exceptions.MutualExclusiveOptionError(
767
option1="--ami-version", option2="--release-label"
768
)
769
770
if (
771
parsed_args.ami_version is None
772
and parsed_args.release_label is None
773
):
774
raise exceptions.RequiredOptionsError(
775
option1="--ami-version", option2="--release-label"
776
)
777
778
# Checks if the applications required by steps are specified
779
# using the --applications option.
780
def _validate_required_applications(self, parsed_args):
781
specified_apps = set([])
782
if parsed_args.applications is not None:
783
specified_apps = set(
784
[app['Name'].lower() for app in parsed_args.applications]
785
)
786
787
missing_apps = self._get_missing_applications_for_steps(
788
specified_apps, parsed_args
789
)
790
# Check for HBase.
791
if parsed_args.restore_from_hbase_backup is not None:
792
if constants.HBASE not in specified_apps:
793
missing_apps.add(constants.HBASE.title())
794
795
if missing_apps:
796
raise exceptions.MissingApplicationsError(
797
applications=missing_apps
798
)
799
800
def _get_missing_applications_for_steps(self, specified_apps, parsed_args):
801
allowed_app_steps = set(
802
[constants.HIVE, constants.PIG, constants.IMPALA]
803
)
804
missing_apps = set()
805
if parsed_args.steps is not None:
806
for step in parsed_args.steps:
807
if len(missing_apps) == len(allowed_app_steps):
808
break
809
step_type = step.get('Type')
810
811
if step_type is not None:
812
step_type = step_type.lower()
813
if (
814
step_type in allowed_app_steps
815
and step_type not in specified_apps
816
):
817
missing_apps.add(step['Type'].title())
818
return missing_apps
819
820
def _filter_configurations_in_special_cases(
821
self, configurations, parsed_args, parsed_configs
822
):
823
if parsed_args.use_default_roles:
824
configurations = [
825
x
826
for x in configurations
827
if x.name != 'service_role' and x.name != 'instance_profile'
828
]
829
return configurations
830
831
def _handle_emrfs_parameters(self, cluster, emrfs_args, release_label):
832
if release_label:
833
self.validate_no_emrfs_configuration(cluster)
834
emrfs_configuration = emrfsutils.build_emrfs_confiuration(
835
emrfs_args
836
)
837
838
self._update_cluster_dict(
839
cluster=cluster,
840
key='Configurations',
841
value=[emrfs_configuration],
842
)
843
else:
844
emrfs_ba_config_list = emrfsutils.build_bootstrap_action_configs(
845
self.region, emrfs_args
846
)
847
self._update_cluster_dict(
848
cluster=cluster,
849
key='BootstrapActions',
850
value=emrfs_ba_config_list,
851
)
852
853
def validate_no_emrfs_configuration(self, cluster):
854
if 'Configurations' in cluster:
855
for config in cluster['Configurations']:
856
if (
857
config is not None
858
and config.get('Classification') == constants.EMRFS_SITE
859
):
860
raise exceptions.DuplicateEmrFsConfigurationError
861
862