Path: blob/develop/tests/unit/customizations/history/test_list.py
1569 views
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.12import argparse13import datetime1415from botocore.session import Session1617from awscli.compat import BytesIO18from awscli.compat import ensure_text_type19from awscli.utils import OutputStreamFactory20from awscli.testutils import unittest, mock21from awscli.customizations.history.list import ListCommand22from awscli.customizations.history.list import RecordAdapter23from awscli.customizations.history.list import TextFormatter24from awscli.customizations.history.db import DatabaseRecordReader252627class TestRecordAdapter(unittest.TestCase):28def test_no_records_no_next(self):29source = iter([])30adapter = RecordAdapter(source)31self.assertFalse(adapter.has_next())3233def test_one_record_has_next(self):34source = iter([1])35adapter = RecordAdapter(source)36self.assertTrue(adapter.has_next())3738def test_can_iterate(self):39source = iter([1, 2, 3])40adapter = RecordAdapter(source)41result = [r for r in adapter]42self.assertEqual(result, [1, 2, 3])434445class TestTextFormatter(unittest.TestCase):46_COL_WIDTHS = {47'id_a': 10,48'timestamp': 23,49'args': 10,50'rc': 1051}5253def setUp(self):54self.output_stream = BytesIO()55self.formatter = TextFormatter(self._COL_WIDTHS, self.output_stream)5657self.timestamp = 151137624206758command_time = datetime.datetime.fromtimestamp(self.timestamp / 1000)59self.formatted_time = datetime.datetime.strftime(60command_time, '%Y-%m-%d %I:%M:%S %p')6162def _format_records(self, records):63adapter = RecordAdapter(iter(records))64self.formatter(adapter)6566def test_can_emit_single_row(self):67self._format_records([68{69'id_a': 'foo',70'timestamp': self.timestamp,71'args': '["s3", "ls"]',72'rc': 073}74])75expected_output = 'foo %s s3 ls 0\n' % self.formatted_time76actual_output = ensure_text_type(self.output_stream.getvalue())77self.assertEqual(expected_output, actual_output)7879def test_can_emit_multiple_rows(self):80self._format_records([81{82'id_a': 'foo',83'timestamp': self.timestamp,84'args': '["s3", "ls"]',85'rc': 086},87{88'id_a': 'bar',89'timestamp': self.timestamp,90'args': '["s3", "cp"]',91'rc': 192}93])94expected_output = ('foo %s s3 ls 0\n'95'bar %s s3 cp 1\n') % (96self.formatted_time, self.formatted_time)97actual_output = ensure_text_type(self.output_stream.getvalue())98self.assertEqual(expected_output, actual_output)99100def test_can_truncate_args(self):101# Truncate the argument if it won't fit in the space alotted to the102# arguments field.103self._format_records([104{105'id_a': 'foo',106'timestamp': self.timestamp,107'args': ('["s3", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'108'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"]'),109'rc': 0110}111])112expected_output = 'foo %s s3 aaa... 0\n' % self.formatted_time113actual_output = ensure_text_type(self.output_stream.getvalue())114self.assertEqual(expected_output, actual_output)115116117class TestListCommand(unittest.TestCase):118def setUp(self):119self.session = mock.Mock(Session)120121self.output_stream_factory = mock.Mock(OutputStreamFactory)122123# MagicMock is needed because it can handle context managers.124# Normal Mock will throw AttributeErrors125output_stream_context = mock.MagicMock()126self.output_stream = mock.Mock()127output_stream_context.__enter__.return_value = self.output_stream128129self.output_stream_factory.get_pager_stream.return_value = \130output_stream_context131132self.db_reader = mock.Mock(DatabaseRecordReader)133self.db_reader.iter_all_records.return_value = iter([])134135self.list_cmd = ListCommand(136self.session, self.db_reader, self.output_stream_factory)137138self.parsed_args = argparse.Namespace()139140self.parsed_globals = argparse.Namespace()141self.parsed_globals.color = 'auto'142143def _make_record(self, cid, time, args, rc):144record = {145'id_a': cid,146'timestamp': time,147'args': args,148'rc': rc149}150return record151152@mock.patch('awscli.customizations.history.commands.is_windows', False)153@mock.patch('awscli.customizations.history.commands.is_a_tty')154def test_does_call_iter_all_records(self, mock_is_a_tty):155mock_is_a_tty.return_value = True156with self.assertRaises(RuntimeError):157self.list_cmd._run_main(self.parsed_args, self.parsed_globals)158self.assertTrue(self.db_reader.iter_all_records.called)159160@mock.patch('awscli.customizations.history.commands.is_windows', False)161@mock.patch('awscli.customizations.history.commands.is_a_tty')162def test_list_does_write_values_to_stream(self, mock_is_a_tty):163mock_is_a_tty.return_value = True164self.db_reader.iter_all_records.return_value = iter([165self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')166])167self.list_cmd._run_main(self.parsed_args, self.parsed_globals)168self.assertTrue(self.output_stream.write.called)169170@mock.patch('awscli.customizations.history.commands.is_windows', False)171@mock.patch('awscli.customizations.history.commands.is_a_tty')172@mock.patch('awscli.customizations.history.list.default_pager', 'less -R')173def test_default_pager_has_correct_args_non_windows(self, mock_is_a_tty):174mock_is_a_tty.return_value = True175self.db_reader.iter_all_records.return_value = iter([176self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')177])178self.list_cmd._run_main(self.parsed_args, self.parsed_globals)179self.output_stream_factory.get_pager_stream.assert_called_with(180'less -SR')181182@mock.patch('awscli.customizations.history.commands.is_windows', True)183@mock.patch('awscli.customizations.history.commands.is_a_tty')184@mock.patch('awscli.customizations.history.list.default_pager', 'more')185def test_default_pager_has_correct_args_windows(self, mock_is_a_tty):186mock_is_a_tty.return_value = True187self.db_reader.iter_all_records.return_value = iter([188self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')189])190self.list_cmd._run_main(self.parsed_args, self.parsed_globals)191self.output_stream_factory.get_pager_stream.assert_called_with(192'more')193194195