import signal
import platform
import pytest
import subprocess
import os
import botocore.model
from awscli.testutils import unittest, skip_if_windows, mock
from awscli.utils import (
split_on_commas, ignore_ctrl_c, find_service_and_method_in_event_name,
is_document_type, is_document_type_container, is_streaming_blob_type,
is_tagged_union_type, operation_uses_document_types, ShapeWalker,
ShapeRecordingVisitor, OutputStreamFactory
)
@pytest.fixture()
def argument_model():
return botocore.model.Shape('argument', {'type': 'string'})
class TestCSVSplit(unittest.TestCase):
def test_normal_csv_split(self):
self.assertEqual(split_on_commas('foo,bar,baz'),
['foo', 'bar', 'baz'])
def test_quote_split(self):
self.assertEqual(split_on_commas('foo,"bar",baz'),
['foo', 'bar', 'baz'])
def test_inner_quote_split(self):
self.assertEqual(split_on_commas('foo,bar="1,2,3",baz'),
['foo', 'bar=1,2,3', 'baz'])
def test_single_quote(self):
self.assertEqual(split_on_commas("foo,bar='1,2,3',baz"),
['foo', 'bar=1,2,3', 'baz'])
def test_mixing_double_single_quotes(self):
self.assertEqual(split_on_commas("""foo,bar="1,'2',3",baz"""),
['foo', "bar=1,'2',3", 'baz'])
def test_mixing_double_single_quotes_before_first_comma(self):
self.assertEqual(split_on_commas("""foo,bar="1','2',3",baz"""),
['foo', "bar=1','2',3", 'baz'])
def test_inner_quote_split_with_equals(self):
self.assertEqual(split_on_commas('foo,bar="Foo:80/bar?a=b",baz'),
['foo', 'bar=Foo:80/bar?a=b', 'baz'])
def test_single_quoted_inner_value_with_no_commas(self):
self.assertEqual(split_on_commas("foo,bar='BAR',baz"),
['foo', 'bar=BAR', 'baz'])
def test_escape_quotes(self):
self.assertEqual(split_on_commas('foo,bar=1\,2\,3,baz'),
['foo', 'bar=1,2,3', 'baz'])
def test_no_commas(self):
self.assertEqual(split_on_commas('foo'), ['foo'])
def test_trailing_commas(self):
self.assertEqual(split_on_commas('foo,'), ['foo', ''])
def test_escape_backslash(self):
self.assertEqual(split_on_commas('foo,bar\\\\,baz\\\\,qux'),
['foo', 'bar\\', 'baz\\', 'qux'])
def test_square_brackets(self):
self.assertEqual(split_on_commas('foo,bar=["a=b",\'2\',c=d],baz'),
['foo', 'bar=a=b,2,c=d', 'baz'])
def test_quoted_square_brackets(self):
self.assertEqual(split_on_commas('foo,bar="[blah]",c=d],baz'),
['foo', 'bar=[blah]', 'c=d]', 'baz'])
def test_missing_bracket(self):
self.assertEqual(split_on_commas('foo,bar=[a,baz'),
['foo', 'bar=[a', 'baz'])
def test_missing_bracket2(self):
self.assertEqual(split_on_commas('foo,bar=a],baz'),
['foo', 'bar=a]', 'baz'])
def test_bracket_in_middle(self):
self.assertEqual(split_on_commas('foo,bar=a[b][c],baz'),
['foo', 'bar=a[b][c]', 'baz'])
def test_end_bracket_in_value(self):
self.assertEqual(split_on_commas('foo,bar=[foo,*[biz]*,baz]'),
['foo', 'bar=foo,*[biz]*,baz'])
@skip_if_windows("Ctrl-C not supported on windows.")
class TestIgnoreCtrlC(unittest.TestCase):
def test_ctrl_c_is_ignored(self):
with ignore_ctrl_c():
self.assertEqual(signal.getsignal(signal.SIGINT), signal.SIG_IGN)
os.kill(os.getpid(), signal.SIGINT)
class TestFindServiceAndOperationNameFromEvent(unittest.TestCase):
def test_finds_service_and_operation_name(self):
event_name = "foo.bar.baz"
service, operation = find_service_and_method_in_event_name(event_name)
self.assertEqual(service, "bar")
self.assertEqual(operation, "baz")
def test_returns_none_if_event_is_too_short(self):
event_name = "foo.bar"
service, operation = find_service_and_method_in_event_name(event_name)
self.assertEqual(service, "bar")
self.assertIs(operation, None)
event_name = "foo"
service, operation = find_service_and_method_in_event_name(event_name)
self.assertIs(service, None)
self.assertIs(operation, None)
class MockProcess(object):
@property
def stdin(self):
raise IOError('broken pipe')
def communicate(self):
pass
class TestOutputStreamFactory(unittest.TestCase):
def setUp(self):
self.popen = mock.Mock(subprocess.Popen)
self.stream_factory = OutputStreamFactory(self.popen)
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
def test_pager(self, mock_get_popen_pager):
mock_get_popen_pager.return_value = {
'args': ['mypager', '--option']
}
with self.stream_factory.get_pager_stream():
mock_get_popen_pager.assert_called_with(None)
self.assertEqual(
self.popen.call_args_list,
[mock.call(
args=['mypager', '--option'],
stdin=subprocess.PIPE)]
)
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
def test_env_configured_pager(self, mock_get_popen_pager):
mock_get_popen_pager.return_value = {
'args': ['mypager', '--option']
}
with self.stream_factory.get_pager_stream('mypager --option'):
mock_get_popen_pager.assert_called_with('mypager --option')
self.assertEqual(
self.popen.call_args_list,
[mock.call(
args=['mypager', '--option'],
stdin=subprocess.PIPE)]
)
@mock.patch('awscli.utils.get_popen_kwargs_for_pager_cmd')
def test_pager_using_shell(self, mock_get_popen_pager):
mock_get_popen_pager.return_value = {
'args': 'mypager --option', 'shell': True
}
with self.stream_factory.get_pager_stream():
mock_get_popen_pager.assert_called_with(None)
self.assertEqual(
self.popen.call_args_list,
[mock.call(
args='mypager --option',
stdin=subprocess.PIPE,
shell=True)]
)
def test_exit_of_context_manager_for_pager(self):
with self.stream_factory.get_pager_stream():
pass
returned_process = self.popen.return_value
self.assertTrue(returned_process.communicate.called)
@mock.patch('awscli.utils.get_binary_stdout')
def test_stdout(self, mock_binary_out):
with self.stream_factory.get_stdout_stream():
self.assertTrue(mock_binary_out.called)
def test_can_silence_io_error_from_pager(self):
self.popen.return_value = MockProcess()
try:
with self.assertRaises(RuntimeError):
with self.stream_factory.get_pager_stream():
pass
except IOError:
self.fail('Should not raise IOError')
class BaseShapeTest(unittest.TestCase):
def setUp(self):
self.shapes = {}
def get_shape_model(self, shape_name):
shape_model = self.shapes[shape_name]
resolver = botocore.model.ShapeResolver(self.shapes)
shape_cls = resolver.SHAPE_CLASSES.get(
shape_model['type'], botocore.model.Shape
)
return shape_cls(shape_name, shape_model, resolver)
def get_doc_type_shape_definition(self):
return {
'type': 'structure',
'members': {},
'document': True
}
class TestIsDocumentType(BaseShapeTest):
def test_is_document_type(self):
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
self.assertTrue(is_document_type(self.get_shape_model('DocStructure')))
def test_is_not_document_type_if_missing_document_trait(self):
self.shapes['NonDocStructure'] = {
'type': 'structure',
'members': {},
}
self.assertFalse(
is_document_type(self.get_shape_model('NonDocStructure'))
)
def test_is_not_document_type_if_not_structure(self):
self.shapes['String'] = {'type': 'string'}
self.assertFalse(is_document_type(self.get_shape_model('String')))
class TestIsDocumentTypeContainer(BaseShapeTest):
def test_is_document_type_container_for_doc_type(self):
self.shapes['DocStructure'] = self.get_doc_type_shape_definition()
self.assertTrue(
is_document_type_container(self.get_shape_model('DocStructure'))
)
def test_is_not_document_type_container_if_missing_document_trait(self):
self.shapes['NonDocStructure'] = {
'type': 'structure',
'members': {},
}
self.assertFalse(
is_document_type_container(self.get_shape_model('NonDocStructure'))
)
def test_is_not_document_type_container_if_not_scalar(self):
self.shapes['String'] = {'type': 'string'}
self.assertFalse(
is_document_type_container(self.get_shape_model('String')))
def test_is_document_type_container_if_list_member(self):
self.shapes['ListOfDocTypes'] = {
'type': 'list',
'member': {'shape': 'DocType'}
}
self.shapes['DocType'] = self.get_doc_type_shape_definition()
self.assertTrue(
is_document_type_container(self.get_shape_model('ListOfDocTypes'))
)
def test_is_document_type_container_if_map_value(self):
self.shapes['MapOfDocTypes'] = {
'type': 'map',
'key': {'shape': 'String'},
'value': {'shape': 'DocType'}
}
self.shapes['DocType'] = self.get_doc_type_shape_definition()
self.shapes['String'] = {'type': 'string'}
self.assertTrue(
is_document_type_container(self.get_shape_model('MapOfDocTypes'))
)
def test_is_document_type_container_if_nested_list_member(self):
self.shapes['NestedListsOfDocTypes'] = {
'type': 'list',
'member': {'shape': 'ListOfDocTypes'}
}
self.shapes['ListOfDocTypes'] = {
'type': 'list',
'member': {'shape': 'DocType'}
}
self.shapes['DocType'] = self.get_doc_type_shape_definition()
self.assertTrue(
is_document_type_container(
self.get_shape_model('NestedListsOfDocTypes')
)
)
class TestOperationUsesDocumentTypes(BaseShapeTest):
def setUp(self):
super(TestOperationUsesDocumentTypes, self).setUp()
self.input_shape_definition = {
'type': 'structure',
'members': {}
}
self.shapes['Input'] = self.input_shape_definition
self.output_shape_definition = {
'type': 'structure',
'members': {}
}
self.shapes['Output'] = self.output_shape_definition
self.operation_definition = {
'input': {'shape': 'Input'},
'output': {'shape': 'Output'}
}
self.service_model = botocore.model.ServiceModel(
{
'operations': {'DescribeResource': self.operation_definition},
'shapes': self.shapes
}
)
self.operation_model = self.service_model.operation_model(
'DescribeResource')
def test_operation_uses_document_types_if_doc_type_in_input(self):
self.shapes['DocType'] = self.get_doc_type_shape_definition()
self.input_shape_definition['members']['DocType'] = {
'shape': 'DocType'}
self.assertTrue(operation_uses_document_types(self.operation_model))
def test_operation_uses_document_types_if_doc_type_in_output(self):
self.shapes['DocType'] = self.get_doc_type_shape_definition()
self.output_shape_definition['members']['DocType'] = {
'shape': 'DocType'}
self.assertTrue(operation_uses_document_types(self.operation_model))
def test_operation_uses_document_types_is_false_when_no_doc_types(self):
self.assertFalse(operation_uses_document_types(self.operation_model))
class TestShapeWalker(BaseShapeTest):
def setUp(self):
super(TestShapeWalker, self).setUp()
self.walker = ShapeWalker()
self.visitor = ShapeRecordingVisitor()
def assert_visited_shapes(self, expected_shape_names):
self.assertEqual(
expected_shape_names,
[shape.name for shape in self.visitor.visited]
)
def test_walk_scalar(self):
self.shapes['String'] = {'type': 'string'}
self.walker.walk(self.get_shape_model('String'), self.visitor)
self.assert_visited_shapes(['String'])
def test_walk_structure(self):
self.shapes['Structure'] = {
'type': 'structure',
'members': {
'String1': {'shape': 'String'},
'String2': {'shape': 'String'}
}
}
self.shapes['String'] = {'type': 'string'}
self.walker.walk(self.get_shape_model('Structure'), self.visitor)
self.assert_visited_shapes(['Structure', 'String', 'String'])
def test_walk_list(self):
self.shapes['List'] = {
'type': 'list',
'member': {'shape': 'String'}
}
self.shapes['String'] = {'type': 'string'}
self.walker.walk(self.get_shape_model('List'), self.visitor)
self.assert_visited_shapes(['List', 'String'])
def test_walk_map(self):
self.shapes['Map'] = {
'type': 'map',
'key': {'shape': 'KeyString'},
'value': {'shape': 'ValueString'}
}
self.shapes['KeyString'] = {'type': 'string'}
self.shapes['ValueString'] = {'type': 'string'}
self.walker.walk(self.get_shape_model('Map'), self.visitor)
self.assert_visited_shapes(['Map', 'ValueString'])
def test_can_escape_recursive_shapes(self):
self.shapes['Recursive'] = {
'type': 'structure',
'members': {
'Recursive': {'shape': 'Recursive'},
}
}
self.walker.walk(self.get_shape_model('Recursive'), self.visitor)
self.assert_visited_shapes(['Recursive'])
@pytest.mark.usefixtures('argument_model')
class TestStreamingBlob:
def test_blob_is_streaming(self, argument_model):
argument_model.type_name = 'blob'
argument_model.serialization = {'streaming': True}
assert is_streaming_blob_type(argument_model)
def test_blob_is_not_streaming(self, argument_model):
argument_model.type_name = 'blob'
argument_model.serialization = {}
assert not is_streaming_blob_type(argument_model)
def test_non_blob_is_not_streaming(self, argument_model):
argument_model.type_name = 'string'
argument_model.serialization = {}
assert not is_streaming_blob_type(argument_model)
@pytest.mark.usefixtures('argument_model')
class TestTaggedUnion:
def test_shape_is_tagged_union(self, argument_model):
setattr(argument_model, 'is_tagged_union', True)
assert is_tagged_union_type(argument_model)
def test_shape_is_not_tagged_union(self, argument_model):
assert not is_tagged_union_type(argument_model)