Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/test_utils.py
1566 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import signal
14
import platform
15
import pytest
16
import subprocess
17
import os
18
19
import botocore.model
20
21
from awscli.testutils import unittest, skip_if_windows, mock
22
from awscli.utils import (
23
split_on_commas, ignore_ctrl_c, find_service_and_method_in_event_name,
24
is_document_type, is_document_type_container, is_streaming_blob_type,
25
is_tagged_union_type, operation_uses_document_types, ShapeWalker,
26
ShapeRecordingVisitor, OutputStreamFactory
27
)
28
29
30
@pytest.fixture()
31
def argument_model():
32
return botocore.model.Shape('argument', {'type': 'string'})
33
34
35
class TestCSVSplit(unittest.TestCase):
36
37
def test_normal_csv_split(self):
38
self.assertEqual(split_on_commas('foo,bar,baz'),
39
['foo', 'bar', 'baz'])
40
41
def test_quote_split(self):
42
self.assertEqual(split_on_commas('foo,"bar",baz'),
43
['foo', 'bar', 'baz'])
44
45
def test_inner_quote_split(self):
46
self.assertEqual(split_on_commas('foo,bar="1,2,3",baz'),
47
['foo', 'bar=1,2,3', 'baz'])
48
49
def test_single_quote(self):
50
self.assertEqual(split_on_commas("foo,bar='1,2,3',baz"),
51
['foo', 'bar=1,2,3', 'baz'])
52
53
def test_mixing_double_single_quotes(self):
54
self.assertEqual(split_on_commas("""foo,bar="1,'2',3",baz"""),
55
['foo', "bar=1,'2',3", 'baz'])
56
57
def test_mixing_double_single_quotes_before_first_comma(self):
58
self.assertEqual(split_on_commas("""foo,bar="1','2',3",baz"""),
59
['foo', "bar=1','2',3", 'baz'])
60
61
def test_inner_quote_split_with_equals(self):
62
self.assertEqual(split_on_commas('foo,bar="Foo:80/bar?a=b",baz'),
63
['foo', 'bar=Foo:80/bar?a=b', 'baz'])
64
65
def test_single_quoted_inner_value_with_no_commas(self):
66
self.assertEqual(split_on_commas("foo,bar='BAR',baz"),
67
['foo', 'bar=BAR', 'baz'])
68
69
def test_escape_quotes(self):
70
self.assertEqual(split_on_commas('foo,bar=1\,2\,3,baz'),
71
['foo', 'bar=1,2,3', 'baz'])
72
73
def test_no_commas(self):
74
self.assertEqual(split_on_commas('foo'), ['foo'])
75
76
def test_trailing_commas(self):
77
self.assertEqual(split_on_commas('foo,'), ['foo', ''])
78
79
def test_escape_backslash(self):
80
self.assertEqual(split_on_commas('foo,bar\\\\,baz\\\\,qux'),
81
['foo', 'bar\\', 'baz\\', 'qux'])
82
83
def test_square_brackets(self):
84
self.assertEqual(split_on_commas('foo,bar=["a=b",\'2\',c=d],baz'),
85
['foo', 'bar=a=b,2,c=d', 'baz'])
86
87
def test_quoted_square_brackets(self):
88
self.assertEqual(split_on_commas('foo,bar="[blah]",c=d],baz'),
89
['foo', 'bar=[blah]', 'c=d]', 'baz'])
90
91
def test_missing_bracket(self):
92
self.assertEqual(split_on_commas('foo,bar=[a,baz'),
93
['foo', 'bar=[a', 'baz'])
94
95
def test_missing_bracket2(self):
96
self.assertEqual(split_on_commas('foo,bar=a],baz'),
97
['foo', 'bar=a]', 'baz'])
98
99
def test_bracket_in_middle(self):
100
self.assertEqual(split_on_commas('foo,bar=a[b][c],baz'),
101
['foo', 'bar=a[b][c]', 'baz'])
102
103
def test_end_bracket_in_value(self):
104
self.assertEqual(split_on_commas('foo,bar=[foo,*[biz]*,baz]'),
105
['foo', 'bar=foo,*[biz]*,baz'])
106
107
108
@skip_if_windows("Ctrl-C not supported on windows.")
109
class TestIgnoreCtrlC(unittest.TestCase):
110
def test_ctrl_c_is_ignored(self):
111
with ignore_ctrl_c():
112
# Should have the noop signal handler installed.
113
self.assertEqual(signal.getsignal(signal.SIGINT), signal.SIG_IGN)
114
# And if we actually try to sigint ourselves, an exception
115
# should not propagate.
116
os.kill(os.getpid(), signal.SIGINT)
117
118
119
class TestFindServiceAndOperationNameFromEvent(unittest.TestCase):
120
def test_finds_service_and_operation_name(self):
121
event_name = "foo.bar.baz"
122
service, operation = find_service_and_method_in_event_name(event_name)
123
self.assertEqual(service, "bar")
124
self.assertEqual(operation, "baz")
125
126
def test_returns_none_if_event_is_too_short(self):
127
event_name = "foo.bar"
128
service, operation = find_service_and_method_in_event_name(event_name)
129
self.assertEqual(service, "bar")
130
self.assertIs(operation, None)
131
132
event_name = "foo"
133
service, operation = find_service_and_method_in_event_name(event_name)
134
self.assertIs(service, None)
135
self.assertIs(operation, None)
136
137
138
class MockProcess(object):
139
@property
140
def stdin(self):
141
raise IOError('broken pipe')
142
143
def communicate(self):
144
pass
145
146
147
class TestOutputStreamFactory(unittest.TestCase):
148
def setUp(self):
149
self.popen = mock.Mock(subprocess.Popen)
150
self.stream_factory = OutputStreamFactory(self.popen)
151
152
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
153
def test_pager(self, mock_get_popen_pager):
154
mock_get_popen_pager.return_value = {
155
'args': ['mypager', '--option']
156
}
157
with self.stream_factory.get_pager_stream():
158
mock_get_popen_pager.assert_called_with(None)
159
self.assertEqual(
160
self.popen.call_args_list,
161
[mock.call(
162
args=['mypager', '--option'],
163
stdin=subprocess.PIPE)]
164
)
165
166
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
167
def test_env_configured_pager(self, mock_get_popen_pager):
168
mock_get_popen_pager.return_value = {
169
'args': ['mypager', '--option']
170
}
171
with self.stream_factory.get_pager_stream('mypager --option'):
172
mock_get_popen_pager.assert_called_with('mypager --option')
173
self.assertEqual(
174
self.popen.call_args_list,
175
[mock.call(
176
args=['mypager', '--option'],
177
stdin=subprocess.PIPE)]
178
)
179
180
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
181
def test_pager_using_shell(self, mock_get_popen_pager):
182
mock_get_popen_pager.return_value = {
183
'args': 'mypager --option', 'shell': True
184
}
185
with self.stream_factory.get_pager_stream():
186
mock_get_popen_pager.assert_called_with(None)
187
self.assertEqual(
188
self.popen.call_args_list,
189
[mock.call(
190
args='mypager --option',
191
stdin=subprocess.PIPE,
192
shell=True)]
193
)
194
195
def test_exit_of_context_manager_for_pager(self):
196
with self.stream_factory.get_pager_stream():
197
pass
198
returned_process = self.popen.return_value
199
self.assertTrue(returned_process.communicate.called)
200
201
@mock.patch('awscli.utils.get_binary_stdout')
202
def test_stdout(self, mock_binary_out):
203
with self.stream_factory.get_stdout_stream():
204
self.assertTrue(mock_binary_out.called)
205
206
def test_can_silence_io_error_from_pager(self):
207
self.popen.return_value = MockProcess()
208
try:
209
# RuntimeError is caught here since a runtime error is raised
210
# when an IOError is raised before the context manager yields.
211
# If we ignore it like this we will actually see the IOError.
212
with self.assertRaises(RuntimeError):
213
with self.stream_factory.get_pager_stream():
214
pass
215
except IOError:
216
self.fail('Should not raise IOError')
217
218
219
class BaseShapeTest(unittest.TestCase):
220
def setUp(self):
221
self.shapes = {}
222
223
def get_shape_model(self, shape_name):
224
shape_model = self.shapes[shape_name]
225
resolver = botocore.model.ShapeResolver(self.shapes)
226
shape_cls = resolver.SHAPE_CLASSES.get(
227
shape_model['type'], botocore.model.Shape
228
)
229
return shape_cls(shape_name, shape_model, resolver)
230
231
def get_doc_type_shape_definition(self):
232
return {
233
'type': 'structure',
234
'members': {},
235
'document': True
236
}
237
238
239
class TestIsDocumentType(BaseShapeTest):
240
def test_is_document_type(self):
241
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
242
self.assertTrue(is_document_type(self.get_shape_model('DocStructure')))
243
244
def test_is_not_document_type_if_missing_document_trait(self):
245
self.shapes['NonDocStructure'] = {
246
'type': 'structure',
247
'members': {},
248
}
249
self.assertFalse(
250
is_document_type(self.get_shape_model('NonDocStructure'))
251
)
252
253
def test_is_not_document_type_if_not_structure(self):
254
self.shapes['String'] = {'type': 'string'}
255
self.assertFalse(is_document_type(self.get_shape_model('String')))
256
257
258
class TestIsDocumentTypeContainer(BaseShapeTest):
259
def test_is_document_type_container_for_doc_type(self):
260
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
261
self.assertTrue(
262
is_document_type_container(self.get_shape_model('DocStructure'))
263
)
264
265
def test_is_not_document_type_container_if_missing_document_trait(self):
266
self.shapes['NonDocStructure'] = {
267
'type': 'structure',
268
'members': {},
269
}
270
self.assertFalse(
271
is_document_type_container(self.get_shape_model('NonDocStructure'))
272
)
273
274
def test_is_not_document_type_container_if_not_scalar(self):
275
self.shapes['String'] = {'type': 'string'}
276
self.assertFalse(
277
is_document_type_container(self.get_shape_model('String')))
278
279
def test_is_document_type_container_if_list_member(self):
280
self.shapes['ListOfDocTypes'] = {
281
'type': 'list',
282
'member': {'shape': 'DocType'}
283
}
284
self.shapes['DocType'] = self.get_doc_type_shape_definition()
285
self.assertTrue(
286
is_document_type_container(self.get_shape_model('ListOfDocTypes'))
287
)
288
289
def test_is_document_type_container_if_map_value(self):
290
self.shapes['MapOfDocTypes'] = {
291
'type': 'map',
292
'key': {'shape': 'String'},
293
'value': {'shape': 'DocType'}
294
}
295
self.shapes['DocType'] = self.get_doc_type_shape_definition()
296
self.shapes['String'] = {'type': 'string'}
297
self.assertTrue(
298
is_document_type_container(self.get_shape_model('MapOfDocTypes'))
299
)
300
301
def test_is_document_type_container_if_nested_list_member(self):
302
self.shapes['NestedListsOfDocTypes'] = {
303
'type': 'list',
304
'member': {'shape': 'ListOfDocTypes'}
305
}
306
self.shapes['ListOfDocTypes'] = {
307
'type': 'list',
308
'member': {'shape': 'DocType'}
309
}
310
self.shapes['DocType'] = self.get_doc_type_shape_definition()
311
self.assertTrue(
312
is_document_type_container(
313
self.get_shape_model('NestedListsOfDocTypes')
314
)
315
)
316
317
318
class TestOperationUsesDocumentTypes(BaseShapeTest):
319
def setUp(self):
320
super(TestOperationUsesDocumentTypes, self).setUp()
321
self.input_shape_definition = {
322
'type': 'structure',
323
'members': {}
324
}
325
self.shapes['Input'] = self.input_shape_definition
326
self.output_shape_definition = {
327
'type': 'structure',
328
'members': {}
329
}
330
self.shapes['Output'] = self.output_shape_definition
331
self.operation_definition = {
332
'input': {'shape': 'Input'},
333
'output': {'shape': 'Output'}
334
}
335
self.service_model = botocore.model.ServiceModel(
336
{
337
'operations': {'DescribeResource': self.operation_definition},
338
'shapes': self.shapes
339
}
340
)
341
self.operation_model = self.service_model.operation_model(
342
'DescribeResource')
343
344
def test_operation_uses_document_types_if_doc_type_in_input(self):
345
self.shapes['DocType'] = self.get_doc_type_shape_definition()
346
self.input_shape_definition['members']['DocType'] = {
347
'shape': 'DocType'}
348
self.assertTrue(operation_uses_document_types(self.operation_model))
349
350
def test_operation_uses_document_types_if_doc_type_in_output(self):
351
self.shapes['DocType'] = self.get_doc_type_shape_definition()
352
self.output_shape_definition['members']['DocType'] = {
353
'shape': 'DocType'}
354
self.assertTrue(operation_uses_document_types(self.operation_model))
355
356
def test_operation_uses_document_types_is_false_when_no_doc_types(self):
357
self.assertFalse(operation_uses_document_types(self.operation_model))
358
359
360
class TestShapeWalker(BaseShapeTest):
361
def setUp(self):
362
super(TestShapeWalker, self).setUp()
363
self.walker = ShapeWalker()
364
self.visitor = ShapeRecordingVisitor()
365
366
def assert_visited_shapes(self, expected_shape_names):
367
self.assertEqual(
368
expected_shape_names,
369
[shape.name for shape in self.visitor.visited]
370
)
371
372
def test_walk_scalar(self):
373
self.shapes['String'] = {'type': 'string'}
374
self.walker.walk(self.get_shape_model('String'), self.visitor)
375
self.assert_visited_shapes(['String'])
376
377
def test_walk_structure(self):
378
self.shapes['Structure'] = {
379
'type': 'structure',
380
'members': {
381
'String1': {'shape': 'String'},
382
'String2': {'shape': 'String'}
383
}
384
}
385
self.shapes['String'] = {'type': 'string'}
386
self.walker.walk(self.get_shape_model('Structure'), self.visitor)
387
self.assert_visited_shapes(['Structure', 'String', 'String'])
388
389
def test_walk_list(self):
390
self.shapes['List'] = {
391
'type': 'list',
392
'member': {'shape': 'String'}
393
}
394
self.shapes['String'] = {'type': 'string'}
395
self.walker.walk(self.get_shape_model('List'), self.visitor)
396
self.assert_visited_shapes(['List', 'String'])
397
398
def test_walk_map(self):
399
self.shapes['Map'] = {
400
'type': 'map',
401
'key': {'shape': 'KeyString'},
402
'value': {'shape': 'ValueString'}
403
}
404
self.shapes['KeyString'] = {'type': 'string'}
405
self.shapes['ValueString'] = {'type': 'string'}
406
self.walker.walk(self.get_shape_model('Map'), self.visitor)
407
self.assert_visited_shapes(['Map', 'ValueString'])
408
409
def test_can_escape_recursive_shapes(self):
410
self.shapes['Recursive'] = {
411
'type': 'structure',
412
'members': {
413
'Recursive': {'shape': 'Recursive'},
414
}
415
}
416
self.walker.walk(self.get_shape_model('Recursive'), self.visitor)
417
self.assert_visited_shapes(['Recursive'])
418
419
420
@pytest.mark.usefixtures('argument_model')
421
class TestStreamingBlob:
422
def test_blob_is_streaming(self, argument_model):
423
argument_model.type_name = 'blob'
424
argument_model.serialization = {'streaming': True}
425
assert is_streaming_blob_type(argument_model)
426
427
def test_blob_is_not_streaming(self, argument_model):
428
argument_model.type_name = 'blob'
429
argument_model.serialization = {}
430
assert not is_streaming_blob_type(argument_model)
431
432
def test_non_blob_is_not_streaming(self, argument_model):
433
argument_model.type_name = 'string'
434
argument_model.serialization = {}
435
assert not is_streaming_blob_type(argument_model)
436
437
438
@pytest.mark.usefixtures('argument_model')
439
class TestTaggedUnion:
440
def test_shape_is_tagged_union(self, argument_model):
441
setattr(argument_model, 'is_tagged_union', True)
442
assert is_tagged_union_type(argument_model)
443
444
def test_shape_is_not_tagged_union(self, argument_model):
445
assert not is_tagged_union_type(argument_model)
446
447