Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/emr/createcluster.py
2639 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import re
15
16
from botocore.compat import json
17
18
from awscli.customizations.commands import BasicCommand
19
from awscli.customizations.emr import (
20
applicationutils,
21
argumentschema,
22
constants,
23
emrfsutils,
24
emrutils,
25
exceptions,
26
hbaseutils,
27
helptext,
28
instancefleetsutils,
29
instancegroupsutils,
30
steputils,
31
)
32
from awscli.customizations.emr.command import Command
33
from awscli.customizations.emr.constants import EC2_ROLE_NAME, EMR_ROLE_NAME
34
35
36
class CreateCluster(Command):
37
NAME = 'create-cluster'
38
DESCRIPTION = helptext.CREATE_CLUSTER_DESCRIPTION
39
ARG_TABLE = [
40
{'name': 'release-label', 'help_text': helptext.RELEASE_LABEL},
41
{'name': 'os-release-label', 'help_text': helptext.OS_RELEASE_LABEL},
42
{'name': 'ami-version', 'help_text': helptext.AMI_VERSION},
43
{
44
'name': 'instance-groups',
45
'schema': argumentschema.INSTANCE_GROUPS_SCHEMA,
46
'help_text': helptext.INSTANCE_GROUPS,
47
},
48
{'name': 'instance-type', 'help_text': helptext.INSTANCE_TYPE},
49
{'name': 'instance-count', 'help_text': helptext.INSTANCE_COUNT},
50
{
51
'name': 'auto-terminate',
52
'action': 'store_true',
53
'group_name': 'auto_terminate',
54
'help_text': helptext.AUTO_TERMINATE,
55
},
56
{
57
'name': 'no-auto-terminate',
58
'action': 'store_true',
59
'group_name': 'auto_terminate',
60
},
61
{
62
'name': 'instance-fleets',
63
'schema': argumentschema.INSTANCE_FLEETS_SCHEMA,
64
'help_text': helptext.INSTANCE_FLEETS,
65
},
66
{
67
'name': 'name',
68
'default': 'Development Cluster',
69
'help_text': helptext.CLUSTER_NAME,
70
},
71
{'name': 'log-uri', 'help_text': helptext.LOG_URI},
72
{
73
'name': 'log-encryption-kms-key-id',
74
'help_text': helptext.LOG_ENCRYPTION_KMS_KEY_ID,
75
},
76
{'name': 'service-role', 'help_text': helptext.SERVICE_ROLE},
77
{'name': 'auto-scaling-role', 'help_text': helptext.AUTOSCALING_ROLE},
78
{
79
'name': 'use-default-roles',
80
'action': 'store_true',
81
'help_text': helptext.USE_DEFAULT_ROLES,
82
},
83
{'name': 'configurations', 'help_text': helptext.CONFIGURATIONS},
84
{
85
'name': 'ec2-attributes',
86
'help_text': helptext.EC2_ATTRIBUTES,
87
'schema': argumentschema.EC2_ATTRIBUTES_SCHEMA,
88
},
89
{
90
'name': 'termination-protected',
91
'action': 'store_true',
92
'group_name': 'termination_protected',
93
'help_text': helptext.TERMINATION_PROTECTED,
94
},
95
{
96
'name': 'no-termination-protected',
97
'action': 'store_true',
98
'group_name': 'termination_protected',
99
},
100
{
101
'name': 'unhealthy-node-replacement',
102
'action': 'store_true',
103
'group_name': 'unhealthy_node_replacement',
104
'help_text': helptext.UNHEALTHY_NODE_REPLACEMENT,
105
},
106
{
107
'name': 'no-unhealthy-node-replacement',
108
'action': 'store_true',
109
'group_name': 'unhealthy_node_replacement',
110
},
111
{
112
'name': 'scale-down-behavior',
113
'help_text': helptext.SCALE_DOWN_BEHAVIOR,
114
},
115
{
116
'name': 'visible-to-all-users',
117
'action': 'store_true',
118
'group_name': 'visibility',
119
'help_text': helptext.VISIBILITY,
120
},
121
{
122
'name': 'no-visible-to-all-users',
123
'action': 'store_true',
124
'group_name': 'visibility',
125
},
126
{
127
'name': 'enable-debugging',
128
'action': 'store_true',
129
'group_name': 'debug',
130
'help_text': helptext.DEBUGGING,
131
},
132
{
133
'name': 'no-enable-debugging',
134
'action': 'store_true',
135
'group_name': 'debug',
136
},
137
{
138
'name': 'tags',
139
'nargs': '+',
140
'help_text': helptext.TAGS,
141
'schema': argumentschema.TAGS_SCHEMA,
142
},
143
{
144
'name': 'bootstrap-actions',
145
'help_text': helptext.BOOTSTRAP_ACTIONS,
146
'schema': argumentschema.BOOTSTRAP_ACTIONS_SCHEMA,
147
},
148
{
149
'name': 'applications',
150
'help_text': helptext.APPLICATIONS,
151
'schema': argumentschema.APPLICATIONS_SCHEMA,
152
},
153
{
154
'name': 'emrfs',
155
'help_text': helptext.EMR_FS,
156
'schema': argumentschema.EMR_FS_SCHEMA,
157
},
158
{
159
'name': 'steps',
160
'schema': argumentschema.STEPS_SCHEMA,
161
'help_text': helptext.STEPS,
162
},
163
{'name': 'additional-info', 'help_text': helptext.ADDITIONAL_INFO},
164
{
165
'name': 'restore-from-hbase-backup',
166
'schema': argumentschema.HBASE_RESTORE_FROM_BACKUP_SCHEMA,
167
'help_text': helptext.RESTORE_FROM_HBASE,
168
},
169
{
170
'name': 'security-configuration',
171
'help_text': helptext.SECURITY_CONFIG,
172
},
173
{'name': 'custom-ami-id', 'help_text': helptext.CUSTOM_AMI_ID},
174
{
175
'name': 'ebs-root-volume-size',
176
'help_text': helptext.EBS_ROOT_VOLUME_SIZE,
177
},
178
{
179
'name': 'ebs-root-volume-iops',
180
'help_text': helptext.EBS_ROOT_VOLUME_IOPS,
181
},
182
{
183
'name': 'ebs-root-volume-throughput',
184
'help_text': helptext.EBS_ROOT_VOLUME_THROUGHPUT,
185
},
186
{
187
'name': 'repo-upgrade-on-boot',
188
'help_text': helptext.REPO_UPGRADE_ON_BOOT,
189
},
190
{
191
'name': 'kerberos-attributes',
192
'schema': argumentschema.KERBEROS_ATTRIBUTES_SCHEMA,
193
'help_text': helptext.KERBEROS_ATTRIBUTES,
194
},
195
{
196
'name': 'step-concurrency-level',
197
'cli_type_name': 'integer',
198
'help_text': helptext.STEP_CONCURRENCY_LEVEL,
199
},
200
{
201
'name': 'managed-scaling-policy',
202
'schema': argumentschema.MANAGED_SCALING_POLICY_SCHEMA,
203
'help_text': helptext.MANAGED_SCALING_POLICY,
204
},
205
{
206
'name': 'placement-group-configs',
207
'schema': argumentschema.PLACEMENT_GROUP_CONFIGS_SCHEMA,
208
'help_text': helptext.PLACEMENT_GROUP_CONFIGS,
209
},
210
{
211
'name': 'auto-termination-policy',
212
'schema': argumentschema.AUTO_TERMINATION_POLICY_SCHEMA,
213
'help_text': helptext.AUTO_TERMINATION_POLICY,
214
},
215
{
216
'name': 'monitoring-configuration',
217
'schema': argumentschema.MONITORING_CONFIGURATION_SCHEMA,
218
'help_text': helptext.MONITORING_CONFIGURATION,
219
},
220
{
221
'name': 'extended-support',
222
'action': 'store_true',
223
'group_name': 'extended-support',
224
'help_text': helptext.EXTENDED_SUPPORT,
225
},
226
{
227
'name': 'no-extended-support',
228
'action': 'store_true',
229
'group_name': 'extended-support',
230
},
231
]
232
SYNOPSIS = BasicCommand.FROM_FILE('emr', 'create-cluster-synopsis.txt')
233
EXAMPLES = BasicCommand.FROM_FILE('emr', 'create-cluster-examples.rst')
234
235
def _run_main_command(self, parsed_args, parsed_globals):
236
params = {}
237
params['Name'] = parsed_args.name
238
239
self._validate_release_label_ami_version(parsed_args)
240
241
service_role_validation_message = (
242
" Either choose --use-default-roles or use both --service-role "
243
"<roleName> and --ec2-attributes InstanceProfile=<profileName>."
244
)
245
246
if (
247
parsed_args.use_default_roles is True
248
and parsed_args.service_role is not None
249
):
250
raise exceptions.MutualExclusiveOptionError(
251
option1="--use-default-roles",
252
option2="--service-role",
253
message=service_role_validation_message,
254
)
255
256
if (
257
parsed_args.use_default_roles is True
258
and parsed_args.ec2_attributes is not None
259
and 'InstanceProfile' in parsed_args.ec2_attributes
260
):
261
raise exceptions.MutualExclusiveOptionError(
262
option1="--use-default-roles",
263
option2="--ec2-attributes InstanceProfile",
264
message=service_role_validation_message,
265
)
266
267
if (
268
parsed_args.instance_groups is not None
269
and parsed_args.instance_fleets is not None
270
):
271
raise exceptions.MutualExclusiveOptionError(
272
option1="--instance-groups", option2="--instance-fleets"
273
)
274
275
instances_config = {}
276
if parsed_args.instance_fleets is not None:
277
instances_config['InstanceFleets'] = (
278
instancefleetsutils.validate_and_build_instance_fleets(
279
parsed_args.instance_fleets
280
)
281
)
282
else:
283
instances_config['InstanceGroups'] = (
284
instancegroupsutils.validate_and_build_instance_groups(
285
instance_groups=parsed_args.instance_groups,
286
instance_type=parsed_args.instance_type,
287
instance_count=parsed_args.instance_count,
288
)
289
)
290
291
if parsed_args.release_label is not None:
292
params["ReleaseLabel"] = parsed_args.release_label
293
if parsed_args.configurations is not None:
294
try:
295
params["Configurations"] = json.loads(
296
parsed_args.configurations
297
)
298
except ValueError:
299
raise ValueError(
300
'aws: error: invalid json argument for '
301
'option --configurations'
302
)
303
304
if (
305
parsed_args.release_label is None
306
and parsed_args.ami_version is not None
307
):
308
is_valid_ami_version = re.match(
309
r'\d?\..*', parsed_args.ami_version
310
)
311
if is_valid_ami_version is None:
312
raise exceptions.InvalidAmiVersionError(
313
ami_version=parsed_args.ami_version
314
)
315
params['AmiVersion'] = parsed_args.ami_version
316
emrutils.apply_dict(
317
params, 'AdditionalInfo', parsed_args.additional_info
318
)
319
emrutils.apply_dict(params, 'LogUri', parsed_args.log_uri)
320
321
if parsed_args.os_release_label is not None:
322
emrutils.apply_dict(
323
params, 'OSReleaseLabel', parsed_args.os_release_label
324
)
325
326
if parsed_args.log_encryption_kms_key_id is not None:
327
emrutils.apply_dict(
328
params,
329
'LogEncryptionKmsKeyId',
330
parsed_args.log_encryption_kms_key_id,
331
)
332
333
if parsed_args.use_default_roles is True:
334
parsed_args.service_role = EMR_ROLE_NAME
335
if parsed_args.ec2_attributes is None:
336
parsed_args.ec2_attributes = {}
337
parsed_args.ec2_attributes['InstanceProfile'] = EC2_ROLE_NAME
338
339
emrutils.apply_dict(params, 'ServiceRole', parsed_args.service_role)
340
341
if parsed_args.instance_groups is not None:
342
for instance_group in instances_config['InstanceGroups']:
343
if 'AutoScalingPolicy' in instance_group.keys():
344
if parsed_args.auto_scaling_role is None:
345
raise exceptions.MissingAutoScalingRoleError()
346
347
emrutils.apply_dict(
348
params, 'AutoScalingRole', parsed_args.auto_scaling_role
349
)
350
351
if parsed_args.scale_down_behavior is not None:
352
emrutils.apply_dict(
353
params, 'ScaleDownBehavior', parsed_args.scale_down_behavior
354
)
355
356
if (
357
parsed_args.no_auto_terminate is False
358
and parsed_args.auto_terminate is False
359
):
360
parsed_args.no_auto_terminate = True
361
362
instances_config['KeepJobFlowAliveWhenNoSteps'] = (
363
emrutils.apply_boolean_options(
364
parsed_args.no_auto_terminate,
365
'--no-auto-terminate',
366
parsed_args.auto_terminate,
367
'--auto-terminate',
368
)
369
)
370
371
instances_config['TerminationProtected'] = (
372
emrutils.apply_boolean_options(
373
parsed_args.termination_protected,
374
'--termination-protected',
375
parsed_args.no_termination_protected,
376
'--no-termination-protected',
377
)
378
)
379
380
if (
381
parsed_args.unhealthy_node_replacement
382
or parsed_args.no_unhealthy_node_replacement
383
):
384
instances_config['UnhealthyNodeReplacement'] = (
385
emrutils.apply_boolean_options(
386
parsed_args.unhealthy_node_replacement,
387
'--unhealthy-node-replacement',
388
parsed_args.no_unhealthy_node_replacement,
389
'--no-unhealthy-node-replacement',
390
)
391
)
392
393
if (
394
parsed_args.visible_to_all_users is False
395
and parsed_args.no_visible_to_all_users is False
396
):
397
parsed_args.visible_to_all_users = True
398
399
params['VisibleToAllUsers'] = emrutils.apply_boolean_options(
400
parsed_args.visible_to_all_users,
401
'--visible-to-all-users',
402
parsed_args.no_visible_to_all_users,
403
'--no-visible-to-all-users',
404
)
405
406
params['Tags'] = emrutils.parse_tags(parsed_args.tags)
407
params['Instances'] = instances_config
408
409
if parsed_args.ec2_attributes is not None:
410
self._build_ec2_attributes(
411
cluster=params, parsed_attrs=parsed_args.ec2_attributes
412
)
413
414
debugging_enabled = emrutils.apply_boolean_options(
415
parsed_args.enable_debugging,
416
'--enable-debugging',
417
parsed_args.no_enable_debugging,
418
'--no-enable-debugging',
419
)
420
421
if parsed_args.log_uri is None and debugging_enabled is True:
422
raise exceptions.LogUriError
423
424
if debugging_enabled is True:
425
self._update_cluster_dict(
426
cluster=params,
427
key='Steps',
428
value=[
429
self._build_enable_debugging(parsed_args, parsed_globals)
430
],
431
)
432
433
if parsed_args.applications is not None:
434
if parsed_args.release_label is None:
435
app_list, ba_list, step_list = (
436
applicationutils.build_applications(
437
region=self.region,
438
parsed_applications=parsed_args.applications,
439
ami_version=params['AmiVersion'],
440
)
441
)
442
self._update_cluster_dict(
443
params, 'NewSupportedProducts', app_list
444
)
445
self._update_cluster_dict(params, 'BootstrapActions', ba_list)
446
self._update_cluster_dict(params, 'Steps', step_list)
447
else:
448
params["Applications"] = []
449
for application in parsed_args.applications:
450
params["Applications"].append(application)
451
452
hbase_restore_config = parsed_args.restore_from_hbase_backup
453
if hbase_restore_config is not None:
454
args = hbaseutils.build_hbase_restore_from_backup_args(
455
dir=hbase_restore_config.get('Dir'),
456
backup_version=hbase_restore_config.get('BackupVersion'),
457
)
458
step_config = emrutils.build_step(
459
jar=constants.HBASE_JAR_PATH,
460
name=constants.HBASE_RESTORE_STEP_NAME,
461
action_on_failure=constants.CANCEL_AND_WAIT,
462
args=args,
463
)
464
self._update_cluster_dict(params, 'Steps', [step_config])
465
466
if parsed_args.bootstrap_actions is not None:
467
self._build_bootstrap_actions(
468
cluster=params,
469
parsed_boostrap_actions=parsed_args.bootstrap_actions,
470
)
471
472
if parsed_args.emrfs is not None:
473
self._handle_emrfs_parameters(
474
cluster=params,
475
emrfs_args=parsed_args.emrfs,
476
release_label=parsed_args.release_label,
477
)
478
479
if parsed_args.steps is not None:
480
steps_list = steputils.build_step_config_list(
481
parsed_step_list=parsed_args.steps,
482
region=self.region,
483
release_label=parsed_args.release_label,
484
)
485
self._update_cluster_dict(
486
cluster=params, key='Steps', value=steps_list
487
)
488
489
if parsed_args.security_configuration is not None:
490
emrutils.apply_dict(
491
params,
492
'SecurityConfiguration',
493
parsed_args.security_configuration,
494
)
495
496
if parsed_args.custom_ami_id is not None:
497
emrutils.apply_dict(
498
params, 'CustomAmiId', parsed_args.custom_ami_id
499
)
500
if parsed_args.ebs_root_volume_size is not None:
501
emrutils.apply_dict(
502
params,
503
'EbsRootVolumeSize',
504
int(parsed_args.ebs_root_volume_size),
505
)
506
if parsed_args.ebs_root_volume_iops is not None:
507
emrutils.apply_dict(
508
params,
509
'EbsRootVolumeIops',
510
int(parsed_args.ebs_root_volume_iops),
511
)
512
if parsed_args.ebs_root_volume_throughput is not None:
513
emrutils.apply_dict(
514
params,
515
'EbsRootVolumeThroughput',
516
int(parsed_args.ebs_root_volume_throughput),
517
)
518
519
if parsed_args.repo_upgrade_on_boot is not None:
520
emrutils.apply_dict(
521
params, 'RepoUpgradeOnBoot', parsed_args.repo_upgrade_on_boot
522
)
523
524
if parsed_args.kerberos_attributes is not None:
525
emrutils.apply_dict(
526
params, 'KerberosAttributes', parsed_args.kerberos_attributes
527
)
528
529
if parsed_args.step_concurrency_level is not None:
530
params['StepConcurrencyLevel'] = parsed_args.step_concurrency_level
531
532
if parsed_args.extended_support or parsed_args.no_extended_support:
533
params['ExtendedSupport'] = emrutils.apply_boolean_options(
534
parsed_args.extended_support,
535
'--extended-support',
536
parsed_args.no_extended_support,
537
'--no-extended-support',
538
)
539
540
if parsed_args.managed_scaling_policy is not None:
541
emrutils.apply_dict(
542
params,
543
'ManagedScalingPolicy',
544
parsed_args.managed_scaling_policy,
545
)
546
547
if parsed_args.placement_group_configs is not None:
548
emrutils.apply_dict(
549
params,
550
'PlacementGroupConfigs',
551
parsed_args.placement_group_configs,
552
)
553
554
if parsed_args.auto_termination_policy is not None:
555
emrutils.apply_dict(
556
params,
557
'AutoTerminationPolicy',
558
parsed_args.auto_termination_policy,
559
)
560
561
if parsed_args.monitoring_configuration is not None:
562
emrutils.apply_dict(
563
params,
564
'MonitoringConfiguration',
565
parsed_args.monitoring_configuration,
566
)
567
568
self._validate_required_applications(parsed_args)
569
570
run_job_flow_response = emrutils.call(
571
self._session,
572
'run_job_flow',
573
params,
574
self.region,
575
parsed_globals.endpoint_url,
576
parsed_globals.verify_ssl,
577
)
578
579
constructed_result = self._construct_result(run_job_flow_response)
580
emrutils.display_response(
581
self._session, 'run_job_flow', constructed_result, parsed_globals
582
)
583
584
return 0
585
586
def _construct_result(self, run_job_flow_result):
587
jobFlowId = None
588
clusterArn = None
589
if run_job_flow_result is not None:
590
jobFlowId = run_job_flow_result.get('JobFlowId')
591
clusterArn = run_job_flow_result.get('ClusterArn')
592
593
if jobFlowId is not None:
594
return {'ClusterId': jobFlowId, 'ClusterArn': clusterArn}
595
else:
596
return {}
597
598
def _build_ec2_attributes(self, cluster, parsed_attrs):
599
keys = parsed_attrs.keys()
600
instances = cluster['Instances']
601
602
if 'SubnetId' in keys and 'SubnetIds' in keys:
603
raise exceptions.MutualExclusiveOptionError(
604
option1="SubnetId", option2="SubnetIds"
605
)
606
607
if 'AvailabilityZone' in keys and 'AvailabilityZones' in keys:
608
raise exceptions.MutualExclusiveOptionError(
609
option1="AvailabilityZone", option2="AvailabilityZones"
610
)
611
612
if ('SubnetId' in keys or 'SubnetIds' in keys) and (
613
'AvailabilityZone' in keys or 'AvailabilityZones' in keys
614
):
615
raise exceptions.SubnetAndAzValidationError
616
617
emrutils.apply_params(
618
src_params=parsed_attrs,
619
src_key='KeyName',
620
dest_params=instances,
621
dest_key='Ec2KeyName',
622
)
623
emrutils.apply_params(
624
src_params=parsed_attrs,
625
src_key='SubnetId',
626
dest_params=instances,
627
dest_key='Ec2SubnetId',
628
)
629
emrutils.apply_params(
630
src_params=parsed_attrs,
631
src_key='SubnetIds',
632
dest_params=instances,
633
dest_key='Ec2SubnetIds',
634
)
635
636
if 'AvailabilityZone' in keys:
637
instances['Placement'] = dict()
638
emrutils.apply_params(
639
src_params=parsed_attrs,
640
src_key='AvailabilityZone',
641
dest_params=instances['Placement'],
642
dest_key='AvailabilityZone',
643
)
644
645
if 'AvailabilityZones' in keys:
646
instances['Placement'] = dict()
647
emrutils.apply_params(
648
src_params=parsed_attrs,
649
src_key='AvailabilityZones',
650
dest_params=instances['Placement'],
651
dest_key='AvailabilityZones',
652
)
653
654
emrutils.apply_params(
655
src_params=parsed_attrs,
656
src_key='InstanceProfile',
657
dest_params=cluster,
658
dest_key='JobFlowRole',
659
)
660
661
emrutils.apply_params(
662
src_params=parsed_attrs,
663
src_key='EmrManagedMasterSecurityGroup',
664
dest_params=instances,
665
dest_key='EmrManagedMasterSecurityGroup',
666
)
667
668
emrutils.apply_params(
669
src_params=parsed_attrs,
670
src_key='EmrManagedSlaveSecurityGroup',
671
dest_params=instances,
672
dest_key='EmrManagedSlaveSecurityGroup',
673
)
674
675
emrutils.apply_params(
676
src_params=parsed_attrs,
677
src_key='ServiceAccessSecurityGroup',
678
dest_params=instances,
679
dest_key='ServiceAccessSecurityGroup',
680
)
681
682
emrutils.apply_params(
683
src_params=parsed_attrs,
684
src_key='AdditionalMasterSecurityGroups',
685
dest_params=instances,
686
dest_key='AdditionalMasterSecurityGroups',
687
)
688
689
emrutils.apply_params(
690
src_params=parsed_attrs,
691
src_key='AdditionalSlaveSecurityGroups',
692
dest_params=instances,
693
dest_key='AdditionalSlaveSecurityGroups',
694
)
695
696
emrutils.apply(params=cluster, key='Instances', value=instances)
697
698
return cluster
699
700
def _build_bootstrap_actions(self, cluster, parsed_boostrap_actions):
701
cluster_ba_list = cluster.get('BootstrapActions')
702
if cluster_ba_list is None:
703
cluster_ba_list = []
704
705
bootstrap_actions = []
706
if (
707
len(cluster_ba_list) + len(parsed_boostrap_actions)
708
> constants.MAX_BOOTSTRAP_ACTION_NUMBER
709
):
710
raise ValueError(
711
'aws: error: maximum number of '
712
'bootstrap actions for a cluster exceeded.'
713
)
714
715
for ba in parsed_boostrap_actions:
716
ba_config = {}
717
if ba.get('Name') is not None:
718
ba_config['Name'] = ba.get('Name')
719
else:
720
ba_config['Name'] = constants.BOOTSTRAP_ACTION_NAME
721
script_arg_config = {}
722
emrutils.apply_params(
723
src_params=ba,
724
src_key='Path',
725
dest_params=script_arg_config,
726
dest_key='Path',
727
)
728
emrutils.apply_params(
729
src_params=ba,
730
src_key='Args',
731
dest_params=script_arg_config,
732
dest_key='Args',
733
)
734
emrutils.apply(
735
params=ba_config,
736
key='ScriptBootstrapAction',
737
value=script_arg_config,
738
)
739
bootstrap_actions.append(ba_config)
740
741
result = cluster_ba_list + bootstrap_actions
742
if result:
743
cluster['BootstrapActions'] = result
744
745
return cluster
746
747
def _build_enable_debugging(self, parsed_args, parsed_globals):
748
if parsed_args.release_label:
749
jar = constants.COMMAND_RUNNER
750
args = [constants.DEBUGGING_COMMAND]
751
else:
752
jar = emrutils.get_script_runner(self.region)
753
args = [
754
emrutils.build_s3_link(
755
relative_path=constants.DEBUGGING_PATH, region=self.region
756
)
757
]
758
759
return emrutils.build_step(
760
name=constants.DEBUGGING_NAME,
761
action_on_failure=constants.TERMINATE_CLUSTER,
762
jar=jar,
763
args=args,
764
)
765
766
def _update_cluster_dict(self, cluster, key, value):
767
if key in cluster:
768
cluster[key] += value
769
elif value:
770
cluster[key] = value
771
return cluster
772
773
def _validate_release_label_ami_version(self, parsed_args):
774
if (
775
parsed_args.ami_version is not None
776
and parsed_args.release_label is not None
777
):
778
raise exceptions.MutualExclusiveOptionError(
779
option1="--ami-version", option2="--release-label"
780
)
781
782
if (
783
parsed_args.ami_version is None
784
and parsed_args.release_label is None
785
):
786
raise exceptions.RequiredOptionsError(
787
option1="--ami-version", option2="--release-label"
788
)
789
790
# Checks if the applications required by steps are specified
791
# using the --applications option.
792
def _validate_required_applications(self, parsed_args):
793
specified_apps = set([])
794
if parsed_args.applications is not None:
795
specified_apps = set(
796
[app['Name'].lower() for app in parsed_args.applications]
797
)
798
799
missing_apps = self._get_missing_applications_for_steps(
800
specified_apps, parsed_args
801
)
802
# Check for HBase.
803
if parsed_args.restore_from_hbase_backup is not None:
804
if constants.HBASE not in specified_apps:
805
missing_apps.add(constants.HBASE.title())
806
807
if missing_apps:
808
raise exceptions.MissingApplicationsError(
809
applications=missing_apps
810
)
811
812
def _get_missing_applications_for_steps(self, specified_apps, parsed_args):
813
allowed_app_steps = set(
814
[constants.HIVE, constants.PIG, constants.IMPALA]
815
)
816
missing_apps = set()
817
if parsed_args.steps is not None:
818
for step in parsed_args.steps:
819
if len(missing_apps) == len(allowed_app_steps):
820
break
821
step_type = step.get('Type')
822
823
if step_type is not None:
824
step_type = step_type.lower()
825
if (
826
step_type in allowed_app_steps
827
and step_type not in specified_apps
828
):
829
missing_apps.add(step['Type'].title())
830
return missing_apps
831
832
def _filter_configurations_in_special_cases(
833
self, configurations, parsed_args, parsed_configs
834
):
835
if parsed_args.use_default_roles:
836
configurations = [
837
x
838
for x in configurations
839
if x.name != 'service_role' and x.name != 'instance_profile'
840
]
841
return configurations
842
843
def _handle_emrfs_parameters(self, cluster, emrfs_args, release_label):
844
if release_label:
845
self.validate_no_emrfs_configuration(cluster)
846
emrfs_configuration = emrfsutils.build_emrfs_confiuration(
847
emrfs_args
848
)
849
850
self._update_cluster_dict(
851
cluster=cluster,
852
key='Configurations',
853
value=[emrfs_configuration],
854
)
855
else:
856
emrfs_ba_config_list = emrfsutils.build_bootstrap_action_configs(
857
self.region, emrfs_args
858
)
859
self._update_cluster_dict(
860
cluster=cluster,
861
key='BootstrapActions',
862
value=emrfs_ba_config_list,
863
)
864
865
def validate_no_emrfs_configuration(self, cluster):
866
if 'Configurations' in cluster:
867
for config in cluster['Configurations']:
868
if (
869
config is not None
870
and config.get('Classification') == constants.EMRFS_SITE
871
):
872
raise exceptions.DuplicateEmrFsConfigurationError
873
874