Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/test_utils.py
2624 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import signal
14
import platform
15
import pytest
16
import subprocess
17
import os
18
19
import botocore.model
20
21
from awscli.testutils import unittest, skip_if_windows, mock
22
from awscli.utils import (
23
split_on_commas, ignore_ctrl_c, find_service_and_method_in_event_name,
24
is_document_type, is_document_type_container, is_streaming_blob_type,
25
is_tagged_union_type, operation_uses_document_types, ShapeWalker,
26
ShapeRecordingVisitor, OutputStreamFactory, resolve_v2_debug_mode
27
)
28
29
30
@pytest.fixture()
31
def argument_model():
32
return botocore.model.Shape('argument', {'type': 'string'})
33
34
35
class TestCSVSplit(unittest.TestCase):
36
37
def test_normal_csv_split(self):
38
self.assertEqual(split_on_commas('foo,bar,baz'),
39
['foo', 'bar', 'baz'])
40
41
def test_quote_split(self):
42
self.assertEqual(split_on_commas('foo,"bar",baz'),
43
['foo', 'bar', 'baz'])
44
45
def test_inner_quote_split(self):
46
self.assertEqual(split_on_commas('foo,bar="1,2,3",baz'),
47
['foo', 'bar=1,2,3', 'baz'])
48
49
def test_single_quote(self):
50
self.assertEqual(split_on_commas("foo,bar='1,2,3',baz"),
51
['foo', 'bar=1,2,3', 'baz'])
52
53
def test_mixing_double_single_quotes(self):
54
self.assertEqual(split_on_commas("""foo,bar="1,'2',3",baz"""),
55
['foo', "bar=1,'2',3", 'baz'])
56
57
def test_mixing_double_single_quotes_before_first_comma(self):
58
self.assertEqual(split_on_commas("""foo,bar="1','2',3",baz"""),
59
['foo', "bar=1','2',3", 'baz'])
60
61
def test_inner_quote_split_with_equals(self):
62
self.assertEqual(split_on_commas('foo,bar="Foo:80/bar?a=b",baz'),
63
['foo', 'bar=Foo:80/bar?a=b', 'baz'])
64
65
def test_single_quoted_inner_value_with_no_commas(self):
66
self.assertEqual(split_on_commas("foo,bar='BAR',baz"),
67
['foo', 'bar=BAR', 'baz'])
68
69
def test_escape_quotes(self):
70
self.assertEqual(split_on_commas('foo,bar=1\,2\,3,baz'),
71
['foo', 'bar=1,2,3', 'baz'])
72
73
def test_no_commas(self):
74
self.assertEqual(split_on_commas('foo'), ['foo'])
75
76
def test_trailing_commas(self):
77
self.assertEqual(split_on_commas('foo,'), ['foo', ''])
78
79
def test_escape_backslash(self):
80
self.assertEqual(split_on_commas('foo,bar\\\\,baz\\\\,qux'),
81
['foo', 'bar\\', 'baz\\', 'qux'])
82
83
def test_square_brackets(self):
84
self.assertEqual(split_on_commas('foo,bar=["a=b",\'2\',c=d],baz'),
85
['foo', 'bar=a=b,2,c=d', 'baz'])
86
87
def test_quoted_square_brackets(self):
88
self.assertEqual(split_on_commas('foo,bar="[blah]",c=d],baz'),
89
['foo', 'bar=[blah]', 'c=d]', 'baz'])
90
91
def test_missing_bracket(self):
92
self.assertEqual(split_on_commas('foo,bar=[a,baz'),
93
['foo', 'bar=[a', 'baz'])
94
95
def test_missing_bracket2(self):
96
self.assertEqual(split_on_commas('foo,bar=a],baz'),
97
['foo', 'bar=a]', 'baz'])
98
99
def test_bracket_in_middle(self):
100
self.assertEqual(split_on_commas('foo,bar=a[b][c],baz'),
101
['foo', 'bar=a[b][c]', 'baz'])
102
103
def test_end_bracket_in_value(self):
104
self.assertEqual(split_on_commas('foo,bar=[foo,*[biz]*,baz]'),
105
['foo', 'bar=foo,*[biz]*,baz'])
106
107
108
@skip_if_windows("Ctrl-C not supported on windows.")
109
class TestIgnoreCtrlC(unittest.TestCase):
110
def test_ctrl_c_is_ignored(self):
111
with ignore_ctrl_c():
112
# Should have the noop signal handler installed.
113
self.assertEqual(signal.getsignal(signal.SIGINT), signal.SIG_IGN)
114
# And if we actually try to sigint ourselves, an exception
115
# should not propagate.
116
os.kill(os.getpid(), signal.SIGINT)
117
118
119
class TestFindServiceAndOperationNameFromEvent(unittest.TestCase):
120
def test_finds_service_and_operation_name(self):
121
event_name = "foo.bar.baz"
122
service, operation = find_service_and_method_in_event_name(event_name)
123
self.assertEqual(service, "bar")
124
self.assertEqual(operation, "baz")
125
126
def test_returns_none_if_event_is_too_short(self):
127
event_name = "foo.bar"
128
service, operation = find_service_and_method_in_event_name(event_name)
129
self.assertEqual(service, "bar")
130
self.assertIs(operation, None)
131
132
event_name = "foo"
133
service, operation = find_service_and_method_in_event_name(event_name)
134
self.assertIs(service, None)
135
self.assertIs(operation, None)
136
137
138
class TestV2DebugResolution(unittest.TestCase):
139
def test_v2_debug_flag_enabled(self):
140
args = mock.Mock(v2_debug=True)
141
self.assertTrue(resolve_v2_debug_mode(args))
142
143
def test_env_var_enabled(self):
144
args = mock.Mock(v2_debug=False)
145
with mock.patch.dict(os.environ, {'AWS_CLI_UPGRADE_DEBUG_MODE': 'true'}):
146
self.assertTrue(resolve_v2_debug_mode(args))
147
148
def test_all_disabled(self):
149
args = mock.Mock(v2_debug=False)
150
with mock.patch.dict(os.environ, {}, clear=True):
151
self.assertFalse(resolve_v2_debug_mode(args))
152
153
def test_env_var_non_true_value(self):
154
args = mock.Mock(v2_debug=False)
155
with mock.patch.dict(os.environ, {'AWS_CLI_UPGRADE_DEBUG_MODE': 'false'}):
156
self.assertFalse(resolve_v2_debug_mode(args))
157
158
def test_args_none(self):
159
self.assertFalse(resolve_v2_debug_mode(None))
160
161
162
class MockProcess(object):
163
@property
164
def stdin(self):
165
raise IOError('broken pipe')
166
167
def communicate(self):
168
pass
169
170
171
class TestOutputStreamFactory(unittest.TestCase):
172
def setUp(self):
173
self.popen = mock.Mock(subprocess.Popen)
174
self.stream_factory = OutputStreamFactory(self.popen)
175
176
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
177
def test_pager(self, mock_get_popen_pager):
178
mock_get_popen_pager.return_value = {
179
'args': ['mypager', '--option']
180
}
181
with self.stream_factory.get_pager_stream():
182
mock_get_popen_pager.assert_called_with(None)
183
self.assertEqual(
184
self.popen.call_args_list,
185
[mock.call(
186
args=['mypager', '--option'],
187
stdin=subprocess.PIPE)]
188
)
189
190
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
191
def test_env_configured_pager(self, mock_get_popen_pager):
192
mock_get_popen_pager.return_value = {
193
'args': ['mypager', '--option']
194
}
195
with self.stream_factory.get_pager_stream('mypager --option'):
196
mock_get_popen_pager.assert_called_with('mypager --option')
197
self.assertEqual(
198
self.popen.call_args_list,
199
[mock.call(
200
args=['mypager', '--option'],
201
stdin=subprocess.PIPE)]
202
)
203
204
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
205
def test_pager_using_shell(self, mock_get_popen_pager):
206
mock_get_popen_pager.return_value = {
207
'args': 'mypager --option', 'shell': True
208
}
209
with self.stream_factory.get_pager_stream():
210
mock_get_popen_pager.assert_called_with(None)
211
self.assertEqual(
212
self.popen.call_args_list,
213
[mock.call(
214
args='mypager --option',
215
stdin=subprocess.PIPE,
216
shell=True)]
217
)
218
219
def test_exit_of_context_manager_for_pager(self):
220
with self.stream_factory.get_pager_stream():
221
pass
222
returned_process = self.popen.return_value
223
self.assertTrue(returned_process.communicate.called)
224
225
@mock.patch('awscli.utils.get_binary_stdout')
226
def test_stdout(self, mock_binary_out):
227
with self.stream_factory.get_stdout_stream():
228
self.assertTrue(mock_binary_out.called)
229
230
def test_can_silence_io_error_from_pager(self):
231
self.popen.return_value = MockProcess()
232
try:
233
# RuntimeError is caught here since a runtime error is raised
234
# when an IOError is raised before the context manager yields.
235
# If we ignore it like this we will actually see the IOError.
236
with self.assertRaises(RuntimeError):
237
with self.stream_factory.get_pager_stream():
238
pass
239
except IOError:
240
self.fail('Should not raise IOError')
241
242
243
class BaseShapeTest(unittest.TestCase):
244
def setUp(self):
245
self.shapes = {}
246
247
def get_shape_model(self, shape_name):
248
shape_model = self.shapes[shape_name]
249
resolver = botocore.model.ShapeResolver(self.shapes)
250
shape_cls = resolver.SHAPE_CLASSES.get(
251
shape_model['type'], botocore.model.Shape
252
)
253
return shape_cls(shape_name, shape_model, resolver)
254
255
def get_doc_type_shape_definition(self):
256
return {
257
'type': 'structure',
258
'members': {},
259
'document': True
260
}
261
262
263
class TestIsDocumentType(BaseShapeTest):
264
def test_is_document_type(self):
265
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
266
self.assertTrue(is_document_type(self.get_shape_model('DocStructure')))
267
268
def test_is_not_document_type_if_missing_document_trait(self):
269
self.shapes['NonDocStructure'] = {
270
'type': 'structure',
271
'members': {},
272
}
273
self.assertFalse(
274
is_document_type(self.get_shape_model('NonDocStructure'))
275
)
276
277
def test_is_not_document_type_if_not_structure(self):
278
self.shapes['String'] = {'type': 'string'}
279
self.assertFalse(is_document_type(self.get_shape_model('String')))
280
281
282
class TestIsDocumentTypeContainer(BaseShapeTest):
283
def test_is_document_type_container_for_doc_type(self):
284
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
285
self.assertTrue(
286
is_document_type_container(self.get_shape_model('DocStructure'))
287
)
288
289
def test_is_not_document_type_container_if_missing_document_trait(self):
290
self.shapes['NonDocStructure'] = {
291
'type': 'structure',
292
'members': {},
293
}
294
self.assertFalse(
295
is_document_type_container(self.get_shape_model('NonDocStructure'))
296
)
297
298
def test_is_not_document_type_container_if_not_scalar(self):
299
self.shapes['String'] = {'type': 'string'}
300
self.assertFalse(
301
is_document_type_container(self.get_shape_model('String')))
302
303
def test_is_document_type_container_if_list_member(self):
304
self.shapes['ListOfDocTypes'] = {
305
'type': 'list',
306
'member': {'shape': 'DocType'}
307
}
308
self.shapes['DocType'] = self.get_doc_type_shape_definition()
309
self.assertTrue(
310
is_document_type_container(self.get_shape_model('ListOfDocTypes'))
311
)
312
313
def test_is_document_type_container_if_map_value(self):
314
self.shapes['MapOfDocTypes'] = {
315
'type': 'map',
316
'key': {'shape': 'String'},
317
'value': {'shape': 'DocType'}
318
}
319
self.shapes['DocType'] = self.get_doc_type_shape_definition()
320
self.shapes['String'] = {'type': 'string'}
321
self.assertTrue(
322
is_document_type_container(self.get_shape_model('MapOfDocTypes'))
323
)
324
325
def test_is_document_type_container_if_nested_list_member(self):
326
self.shapes['NestedListsOfDocTypes'] = {
327
'type': 'list',
328
'member': {'shape': 'ListOfDocTypes'}
329
}
330
self.shapes['ListOfDocTypes'] = {
331
'type': 'list',
332
'member': {'shape': 'DocType'}
333
}
334
self.shapes['DocType'] = self.get_doc_type_shape_definition()
335
self.assertTrue(
336
is_document_type_container(
337
self.get_shape_model('NestedListsOfDocTypes')
338
)
339
)
340
341
342
class TestOperationUsesDocumentTypes(BaseShapeTest):
343
def setUp(self):
344
super(TestOperationUsesDocumentTypes, self).setUp()
345
self.input_shape_definition = {
346
'type': 'structure',
347
'members': {}
348
}
349
self.shapes['Input'] = self.input_shape_definition
350
self.output_shape_definition = {
351
'type': 'structure',
352
'members': {}
353
}
354
self.shapes['Output'] = self.output_shape_definition
355
self.operation_definition = {
356
'input': {'shape': 'Input'},
357
'output': {'shape': 'Output'}
358
}
359
self.service_model = botocore.model.ServiceModel(
360
{
361
'operations': {'DescribeResource': self.operation_definition},
362
'shapes': self.shapes
363
}
364
)
365
self.operation_model = self.service_model.operation_model(
366
'DescribeResource')
367
368
def test_operation_uses_document_types_if_doc_type_in_input(self):
369
self.shapes['DocType'] = self.get_doc_type_shape_definition()
370
self.input_shape_definition['members']['DocType'] = {
371
'shape': 'DocType'}
372
self.assertTrue(operation_uses_document_types(self.operation_model))
373
374
def test_operation_uses_document_types_if_doc_type_in_output(self):
375
self.shapes['DocType'] = self.get_doc_type_shape_definition()
376
self.output_shape_definition['members']['DocType'] = {
377
'shape': 'DocType'}
378
self.assertTrue(operation_uses_document_types(self.operation_model))
379
380
def test_operation_uses_document_types_is_false_when_no_doc_types(self):
381
self.assertFalse(operation_uses_document_types(self.operation_model))
382
383
384
class TestShapeWalker(BaseShapeTest):
385
def setUp(self):
386
super(TestShapeWalker, self).setUp()
387
self.walker = ShapeWalker()
388
self.visitor = ShapeRecordingVisitor()
389
390
def assert_visited_shapes(self, expected_shape_names):
391
self.assertEqual(
392
expected_shape_names,
393
[shape.name for shape in self.visitor.visited]
394
)
395
396
def test_walk_scalar(self):
397
self.shapes['String'] = {'type': 'string'}
398
self.walker.walk(self.get_shape_model('String'), self.visitor)
399
self.assert_visited_shapes(['String'])
400
401
def test_walk_structure(self):
402
self.shapes['Structure'] = {
403
'type': 'structure',
404
'members': {
405
'String1': {'shape': 'String'},
406
'String2': {'shape': 'String'}
407
}
408
}
409
self.shapes['String'] = {'type': 'string'}
410
self.walker.walk(self.get_shape_model('Structure'), self.visitor)
411
self.assert_visited_shapes(['Structure', 'String', 'String'])
412
413
def test_walk_list(self):
414
self.shapes['List'] = {
415
'type': 'list',
416
'member': {'shape': 'String'}
417
}
418
self.shapes['String'] = {'type': 'string'}
419
self.walker.walk(self.get_shape_model('List'), self.visitor)
420
self.assert_visited_shapes(['List', 'String'])
421
422
def test_walk_map(self):
423
self.shapes['Map'] = {
424
'type': 'map',
425
'key': {'shape': 'KeyString'},
426
'value': {'shape': 'ValueString'}
427
}
428
self.shapes['KeyString'] = {'type': 'string'}
429
self.shapes['ValueString'] = {'type': 'string'}
430
self.walker.walk(self.get_shape_model('Map'), self.visitor)
431
self.assert_visited_shapes(['Map', 'ValueString'])
432
433
def test_can_escape_recursive_shapes(self):
434
self.shapes['Recursive'] = {
435
'type': 'structure',
436
'members': {
437
'Recursive': {'shape': 'Recursive'},
438
}
439
}
440
self.walker.walk(self.get_shape_model('Recursive'), self.visitor)
441
self.assert_visited_shapes(['Recursive'])
442
443
444
@pytest.mark.usefixtures('argument_model')
445
class TestStreamingBlob:
446
def test_blob_is_streaming(self, argument_model):
447
argument_model.type_name = 'blob'
448
argument_model.serialization = {'streaming': True}
449
assert is_streaming_blob_type(argument_model)
450
451
def test_blob_is_not_streaming(self, argument_model):
452
argument_model.type_name = 'blob'
453
argument_model.serialization = {}
454
assert not is_streaming_blob_type(argument_model)
455
456
def test_non_blob_is_not_streaming(self, argument_model):
457
argument_model.type_name = 'string'
458
argument_model.serialization = {}
459
assert not is_streaming_blob_type(argument_model)
460
461
462
@pytest.mark.usefixtures('argument_model')
463
class TestTaggedUnion:
464
def test_shape_is_tagged_union(self, argument_model):
465
setattr(argument_model, 'is_tagged_union', True)
466
assert is_tagged_union_type(argument_model)
467
468
def test_shape_is_not_tagged_union(self, argument_model):
469
assert not is_tagged_union_type(argument_model)
470
471