Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/emr/test_emrfs_utils.py
1569 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import copy
15
import os
16
import json
17
18
from awscli.customizations.emr.emrfsutils import CONSISTENT_OPTION_NAME
19
from awscli.customizations.emr.emrfsutils import CSE_CUSTOM_OPTION_NAME
20
from awscli.customizations.emr.emrfsutils import CSE_KMS_OPTION_NAME
21
from awscli.customizations.emr.emrfsutils import CSE_OPTION_NAME
22
23
24
from tests.unit.customizations.emr import EMRBaseAWSCommandParamsTest as \
25
BaseAWSCommandParamsTest
26
27
28
DEFAULT_INSTANCES = {
29
'KeepJobFlowAliveWhenNoSteps': True,
30
'TerminationProtected': False,
31
'InstanceGroups': [{
32
'InstanceRole': 'MASTER',
33
'InstanceCount': 1,
34
'Name': 'MASTER',
35
'Market': 'ON_DEMAND',
36
'InstanceType': 'm1.large'
37
}]
38
}
39
40
DEFAULT_CMD = ('emr create-cluster --use-default-roles'
41
' --instance-type m1.large ')
42
DEFAULT_RESULT = {
43
'Name': "Development Cluster",
44
'Instances': DEFAULT_INSTANCES,
45
'VisibleToAllUsers': True,
46
'JobFlowRole': "EMR_EC2_DefaultRole",
47
'ServiceRole': "EMR_DefaultRole",
48
'Tags': []
49
}
50
51
EMPTY_EMRFS_CONFIGURATION = {
52
'Classification': 'emrfs-site',
53
'Properties': {}
54
}
55
56
DEFAULT_CONFIGURATIONS = [
57
{
58
'Classification': 'hadoop-env',
59
'Configurations': [],
60
'Properties': {'someProperty': 'someValue'}
61
}
62
]
63
64
65
class TestEmrfsUtils(BaseAWSCommandParamsTest):
66
67
def test_consistent(self):
68
emrfs_option_value = 'Consistent=true'
69
expected_emrfs_properties = {'fs.s3.consistent': 'true'}
70
expected_emrfs_ba_key_values = [
71
'fs.s3.consistent=true'
72
]
73
74
self._assert_bootstrap_actions(
75
emrfs_option_value, expected_emrfs_ba_key_values,
76
expected_emrfs_properties)
77
78
def test_consistent_w_optional_args(self):
79
emrfs_option_value = 'Consistent=true,RetryCount=5,RetryPeriod=30'
80
81
expected_emrfs_properties = \
82
{'fs.s3.consistent': 'true',
83
'fs.s3.consistent.retryCount': '5',
84
'fs.s3.consistent.retryPeriodSeconds': '30'}
85
86
expected_emrfs_ba_key_values = [
87
'fs.s3.consistent=true', 'fs.s3.consistent.retryCount=5',
88
'fs.s3.consistent.retryPeriodSeconds=30'
89
]
90
91
self._assert_bootstrap_actions(
92
emrfs_option_value, expected_emrfs_ba_key_values,
93
expected_emrfs_properties)
94
95
def test_consistent_false_w_optional_args(self):
96
emrfs_option_value = 'Consistent=false,RetryCount=5'
97
98
expected_emrfs_properties = {
99
'fs.s3.consistent': 'false',
100
'fs.s3.consistent.retryCount': '5'}
101
102
expected_emrfs_ba_key_values = [
103
'fs.s3.consistent=false', 'fs.s3.consistent.retryCount=5'
104
]
105
106
self._assert_bootstrap_actions(
107
emrfs_option_value, expected_emrfs_ba_key_values,
108
expected_emrfs_properties)
109
110
def test_sse(self):
111
emrfs_option_value = 'SSE=true'
112
expected_emrfs_ba_key_values = [
113
'fs.s3.enableServerSideEncryption=true'
114
]
115
expected_emrfs_properties = {
116
'fs.s3.enableServerSideEncryption': 'true'}
117
self._assert_bootstrap_actions(
118
emrfs_option_value, expected_emrfs_ba_key_values,
119
expected_emrfs_properties)
120
121
emrfs_option_value = 'Encryption=ServerSide'
122
expected_emrfs_ba_key_values = [
123
'fs.s3.enableServerSideEncryption=true'
124
]
125
expected_emrfs_properties = {
126
'fs.s3.enableServerSideEncryption': 'true'}
127
self._assert_bootstrap_actions(
128
emrfs_option_value, expected_emrfs_ba_key_values,
129
expected_emrfs_properties)
130
131
def test_cse_kms(self):
132
emrfs_option_value = 'Encryption=ClientSide,ProviderType=KMS,' \
133
'KMSKeyId=my_key'
134
expected_emrfs_ba_key_values = [
135
'fs.s3.cse.enabled=true', 'fs.s3.cse.encryptionMaterialsProvider='
136
'com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider',
137
'fs.s3.cse.kms.keyId=my_key'
138
]
139
expected_emrfs_properties = {
140
'fs.s3.cse.enabled': 'true',
141
'fs.s3.cse.encryptionMaterialsProvider':
142
'com.amazon.ws.emr.hadoop.fs.cse.'
143
'KMSEncryptionMaterialsProvider',
144
'fs.s3.cse.kms.keyId': 'my_key'}
145
self._assert_bootstrap_actions(
146
emrfs_option_value, expected_emrfs_ba_key_values,
147
expected_emrfs_properties)
148
149
def test_cse_custom(self):
150
emrfs_option_value = 'Encryption=ClientSide,ProviderType=Custom,' \
151
'CustomProviderLocation=my_location,CustomProviderClass=my_class'
152
expected_emrfs_ba_key_values = [
153
'fs.s3.cse.enabled=true', 'fs.s3.cse.encryptionMaterialsProvider='
154
'my_class'
155
]
156
expected_emrfs_properties = {
157
'fs.s3.cse.enabled': 'true',
158
'fs.s3.cse.encryptionMaterialsProvider': 'my_class',
159
'fs.s3.cse.encryptionMaterialsProvider.uri': 'my_location'}
160
161
self._assert_bootstrap_actions(
162
emrfs_option_value, expected_emrfs_ba_key_values,
163
expected_emrfs_properties, 'my_location')
164
165
def test_sse_and_consistent(self):
166
emrfs_option_value = 'SSE=true,Consistent=true'
167
expected_emrfs_ba_key_values = [
168
'fs.s3.consistent=true',
169
'fs.s3.enableServerSideEncryption=true']
170
expected_emrfs_properties = {
171
'fs.s3.consistent': 'true',
172
'fs.s3.enableServerSideEncryption': 'true'}
173
self._assert_bootstrap_actions(
174
emrfs_option_value, expected_emrfs_ba_key_values,
175
expected_emrfs_properties)
176
177
emrfs_option_value = 'Consistent=false,Encryption=serVERSIde'
178
expected_emrfs_ba_key_values = [
179
'fs.s3.consistent=false',
180
'fs.s3.enableServerSideEncryption=true']
181
expected_emrfs_properties = {
182
'fs.s3.consistent': 'false',
183
'fs.s3.enableServerSideEncryption': 'true'}
184
185
self._assert_bootstrap_actions(
186
emrfs_option_value, expected_emrfs_ba_key_values,
187
expected_emrfs_properties)
188
189
def test_cse_and_consistent(self):
190
emrfs_option_value = ('Encryption=ClientSide,ProviderType=KMS,'
191
'KMSKeyId=my_key,Consistent=true')
192
expected_emrfs_ba_key_values = [
193
'fs.s3.consistent=true', 'fs.s3.cse.enabled=true',
194
'fs.s3.cse.encryptionMaterialsProvider=com.amazon.ws.emr.'
195
'hadoop.fs.cse.KMSEncryptionMaterialsProvider',
196
'fs.s3.cse.kms.keyId=my_key']
197
expected_emrfs_properties = {
198
'fs.s3.consistent': 'true',
199
'fs.s3.cse.enabled': 'true',
200
'fs.s3.cse.encryptionMaterialsProvider': 'com.amazon.ws.emr.'
201
'hadoop.fs.cse.KMSEncryptionMaterialsProvider',
202
'fs.s3.cse.kms.keyId': 'my_key'}
203
204
self._assert_bootstrap_actions(
205
emrfs_option_value, expected_emrfs_ba_key_values,
206
expected_emrfs_properties)
207
208
def test_args_and_sse(self):
209
emrfs_option_value = \
210
'SSE=true,Args=[fs.s3.serverSideEncryptionAlgorithm=AES256]'
211
expected_emrfs_ba_key_values = [
212
'fs.s3.enableServerSideEncryption=true',
213
'fs.s3.serverSideEncryptionAlgorithm=AES256']
214
expected_emrfs_properties = {
215
'fs.s3.enableServerSideEncryption': 'true',
216
'fs.s3.serverSideEncryptionAlgorithm': 'AES256'}
217
218
self._assert_bootstrap_actions(
219
emrfs_option_value, expected_emrfs_ba_key_values,
220
expected_emrfs_properties)
221
222
def test_args_and_cse(self):
223
emrfs_option_value = ('Encryption=ClientSide,ProviderType=KMS,'
224
'KMSKeyId=my_key,Args=[k1=v1]')
225
expected_emrfs_ba_key_values = [
226
'fs.s3.cse.enabled=true',
227
'fs.s3.cse.encryptionMaterialsProvider=com.amazon.ws.emr.'
228
'hadoop.fs.cse.KMSEncryptionMaterialsProvider',
229
'fs.s3.cse.kms.keyId=my_key', 'k1=v1']
230
expected_emrfs_properties = {
231
'fs.s3.cse.enabled': 'true',
232
'fs.s3.cse.encryptionMaterialsProvider': 'com.amazon.ws.emr.'
233
'hadoop.fs.cse.KMSEncryptionMaterialsProvider',
234
'fs.s3.cse.kms.keyId': 'my_key',
235
'k1': 'v1'}
236
237
self._assert_bootstrap_actions(
238
emrfs_option_value, expected_emrfs_ba_key_values,
239
expected_emrfs_properties)
240
241
def test_args_and_consistent(self):
242
emrfs_option_value = 'Consistent=true,Args=[k1=v1,k2=v2]'
243
expected_emrfs_ba_key_values = ['fs.s3.consistent=true',
244
'k1=v1', 'k2=v2']
245
expected_emrfs_properties = {
246
'fs.s3.consistent': 'true',
247
'k1': 'v1',
248
'k2': 'v2'}
249
250
self._assert_bootstrap_actions(
251
emrfs_option_value, expected_emrfs_ba_key_values,
252
expected_emrfs_properties)
253
254
def test_only_args(self):
255
emrfs_option_value = 'Args=[k1=v1,k2=v2,k3]'
256
expected_emrfs_ba_key_values = ['k1=v1', 'k2=v2', 'k3']
257
expected_emrfs_properties = {
258
'k1': 'v1',
259
'k2': 'v2',
260
'k3': ''}
261
262
self._assert_bootstrap_actions(
263
emrfs_option_value, expected_emrfs_ba_key_values,
264
expected_emrfs_properties)
265
266
def test_using_json_file(self):
267
data_path = os.path.join(
268
os.path.dirname(__file__), 'input_emr_fs.json')
269
emrfs_option_value = 'file://%s' % data_path
270
expected_emrfs_ba_key_values = [
271
'fs.s3.consistent=true',
272
'fs.s3.consistent.retryCount=10',
273
'fs.s3.consistent.retryPeriodSeconds=3',
274
'fs.s3.enableServerSideEncryption=false',
275
'fs.s3.serverSideEncryptionAlgorithm=AES256',
276
'fs.s3.sleepTimeSeconds=30']
277
expected_emrfs_properties = {
278
'fs.s3.consistent': 'true',
279
'fs.s3.consistent.retryCount': '10',
280
'fs.s3.consistent.retryPeriodSeconds': '3',
281
'fs.s3.enableServerSideEncryption': 'false',
282
'fs.s3.serverSideEncryptionAlgorithm': 'AES256',
283
'fs.s3.sleepTimeSeconds': '30'}
284
285
self._assert_bootstrap_actions(
286
emrfs_option_value, expected_emrfs_ba_key_values,
287
expected_emrfs_properties)
288
289
def test_only_one_encryption_type(self):
290
self._assert_error_msg(
291
emrfs_option_value='SSE=true,Encryption=ClientSide,'
292
'ProviderType=KMS,KMSKeyId=k1',
293
exception_class_name='BothSseAndEncryptionConfiguredError',
294
error_msg_kwargs={'sse': 'True', 'encryption': 'ClientSide'}
295
)
296
297
def test_cse_missing_provider_type(self):
298
self._assert_error_msg(
299
emrfs_option_value='Encryption=ClientSide',
300
exception_class_name='MissingParametersError',
301
error_msg_kwargs={'object_name': CSE_OPTION_NAME,
302
'missing': 'ProviderType'}
303
)
304
305
def test_cse_kms_missing_key_id(self):
306
self._assert_error_msg(
307
emrfs_option_value='Encryption=ClientSide,ProviderType=KMS',
308
exception_class_name='MissingParametersError',
309
error_msg_kwargs={'object_name': CSE_KMS_OPTION_NAME,
310
'missing': 'KMSKeyId'}
311
)
312
313
def test_cse_custom_missing_all(self):
314
self._assert_error_msg(
315
emrfs_option_value='Encryption=ClientSide,ProviderType=Custom',
316
exception_class_name='MissingParametersError',
317
error_msg_kwargs={'object_name': CSE_CUSTOM_OPTION_NAME,
318
'missing': 'CustomProviderClass and '
319
'CustomProviderLocation'}
320
)
321
322
def test_cse_custom_missing_provider_class(self):
323
self._assert_error_msg(
324
emrfs_option_value='Encryption=ClientSide,ProviderType=Custom,'
325
'CustomProviderLocation=my_location',
326
exception_class_name='MissingParametersError',
327
error_msg_kwargs={'object_name': CSE_CUSTOM_OPTION_NAME,
328
'missing': 'CustomProviderClass'}
329
)
330
331
def test_cse_custom_missing_provider_location(self):
332
self._assert_error_msg(
333
emrfs_option_value='Encryption=ClientSide,ProviderType=Custom,'
334
'CustomProviderClass=my_class',
335
exception_class_name='MissingParametersError',
336
error_msg_kwargs={'object_name': CSE_CUSTOM_OPTION_NAME,
337
'missing': 'CustomProviderLocation'}
338
)
339
340
def test_valid_encryption(self):
341
self._assert_error_msg(
342
emrfs_option_value='Encryption=ClientSide1',
343
exception_class_name='UnknownEncryptionTypeError',
344
error_msg_kwargs={'encryption': 'ClientSide1'}
345
)
346
347
def test_valid_cse_provider_type(self):
348
self._assert_error_msg(
349
emrfs_option_value='Encryption=ClientSide,ProviderType=KMS1',
350
exception_class_name='UnknownCseProviderTypeError',
351
error_msg_kwargs={'provider_type': 'KMS1'}
352
)
353
354
def test_valid_consistent_args(self):
355
self._assert_error_msg(
356
emrfs_option_value='SSE=true,RetryCount=5,RetryPeriod=30',
357
exception_class_name='InvalidEmrFsArgumentsError',
358
error_msg_kwargs={'invalid': 'RetryCount and RetryPeriod',
359
'parent_object_name': CONSISTENT_OPTION_NAME}
360
)
361
362
def test_valid_cse_kms_args(self):
363
self._assert_error_msg(
364
emrfs_option_value='Consistent=true,KMSKeyId=k1',
365
exception_class_name='InvalidEmrFsArgumentsError',
366
error_msg_kwargs={'invalid': 'KMSKeyId',
367
'parent_object_name': CSE_KMS_OPTION_NAME}
368
)
369
370
def test_valid_cse_custom_args(self):
371
self._assert_error_msg(
372
emrfs_option_value='Consistent=true,CustomProviderLocation=loc',
373
exception_class_name='InvalidEmrFsArgumentsError',
374
error_msg_kwargs={'invalid': 'CustomProviderLocation',
375
'parent_object_name': CSE_CUSTOM_OPTION_NAME}
376
)
377
378
def test_configurations_and_emrfs(self):
379
emrfs_option_value = 'Args=[someProperty=someValue]'
380
configurations = json.dumps(DEFAULT_CONFIGURATIONS,
381
separators=(',', ':'))
382
383
cmd = "%s --release-label emr-4.0 --emrfs %s --configurations %s" \
384
% (DEFAULT_CMD, emrfs_option_value, configurations)
385
386
expected_emrfs_properties = {'someProperty': 'someValue'}
387
388
emrfs_configuration = copy.deepcopy(EMPTY_EMRFS_CONFIGURATION)
389
configurations = copy.deepcopy(DEFAULT_CONFIGURATIONS)
390
result = copy.deepcopy(DEFAULT_RESULT)
391
392
emrfs_configuration['Properties'] = expected_emrfs_properties
393
configurations.append(emrfs_configuration)
394
result['ReleaseLabel'] = 'emr-4.0'
395
result['Configurations'] = configurations
396
397
self.assert_params_for_cmd(cmd, result)
398
399
def test_duplicate_emrfs_configuration_exception(self):
400
emrfs_option_value = 'Args=[someProperty=someValue]'
401
expected_emrfs_properties = {'someProperty': 'someValue'}
402
403
emrfs_configuration = copy.deepcopy(EMPTY_EMRFS_CONFIGURATION)
404
emrfs_configuration['Properties'] = expected_emrfs_properties
405
configurations = copy.deepcopy(DEFAULT_CONFIGURATIONS)
406
configurations.append(EMPTY_EMRFS_CONFIGURATION)
407
408
configurations_json = json.dumps(configurations,
409
separators=(',', ':'))
410
411
cmd = "%s --release-label emr-4.0 --emrfs %s --configurations %s" \
412
% (DEFAULT_CMD, emrfs_option_value, configurations_json)
413
414
result = copy.deepcopy(DEFAULT_RESULT)
415
result['ReleaseLabel'] = 'emr-4.0'
416
result['Configurations'] = configurations
417
418
self.assert_error_msg(cmd, 'DuplicateEmrFsConfigurationError')
419
420
def _assert_error_msg(self, emrfs_option_value,
421
exception_class_name, error_msg_kwargs):
422
cmd = "%s --ami-version 3.4 --emrfs %s" \
423
% (DEFAULT_CMD, emrfs_option_value)
424
self.assert_error_msg(
425
cmd,
426
exception_class_name=exception_class_name,
427
error_msg_kwargs=error_msg_kwargs)
428
429
cmd = "%s --release-label emr-4.0 --emrfs %s" \
430
% (DEFAULT_CMD, emrfs_option_value)
431
self.assert_error_msg(
432
cmd,
433
exception_class_name=exception_class_name,
434
error_msg_kwargs=error_msg_kwargs)
435
436
def _assert_bootstrap_actions(self, emrfs_option_value,
437
expected_emrfs_ba_key_values,
438
expected_emrfs_properties,
439
provider_location=None):
440
if expected_emrfs_ba_key_values is not None:
441
cmd = "%s --ami-version 3.4 --emrfs %s" \
442
% (DEFAULT_CMD, emrfs_option_value)
443
result = copy.deepcopy(DEFAULT_RESULT)
444
result['BootstrapActions'] = [self._create_s3_get_ba_config(
445
provider_location)] if provider_location is not None else []
446
result['BootstrapActions'] += [self._create_setup_emrfs_ba_config(
447
expected_emrfs_ba_key_values)]
448
result['AmiVersion'] = '3.4'
449
450
self.assert_params_for_cmd(cmd, result)
451
452
if expected_emrfs_properties is not None:
453
cmd = "%s --release-label emr-4.0 --emrfs %s" \
454
% (DEFAULT_CMD, emrfs_option_value)
455
result = copy.deepcopy(DEFAULT_RESULT)
456
emrfs_configuration = copy.deepcopy(EMPTY_EMRFS_CONFIGURATION)
457
emrfs_configuration['Properties'] = expected_emrfs_properties
458
result['Configurations'] = [emrfs_configuration]
459
result['ReleaseLabel'] = 'emr-4.0'
460
461
self.assert_params_for_cmd(cmd, result)
462
463
def _create_setup_emrfs_ba_config(self, ba_arg_values):
464
ba_arg_keys = ['-e' for x in ba_arg_values]
465
ba_args = [x for pair in zip(ba_arg_keys, ba_arg_values) for x in pair]
466
467
return {
468
'Name': 'Setup EMRFS',
469
'ScriptBootstrapAction': {
470
'Path': ('s3://us-east-1.elasticmapreduce/'
471
'bootstrap-actions/configure-hadoop'),
472
'Args': ba_args
473
}
474
}
475
476
def _create_s3_get_ba_config(self, provider_location):
477
return {
478
'Name': 'S3 get',
479
'ScriptBootstrapAction': {
480
'Path': 'file:/usr/share/aws/emr/scripts/s3get',
481
'Args': [
482
'-s', provider_location,
483
'-d', '/usr/share/aws/emr/auxlib',
484
'-f'
485
]
486
}
487
}
488
489