Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/history/test_list.py
1569 views
1
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import argparse
14
import datetime
15
16
from botocore.session import Session
17
18
from awscli.compat import BytesIO
19
from awscli.compat import ensure_text_type
20
from awscli.utils import OutputStreamFactory
21
from awscli.testutils import unittest, mock
22
from awscli.customizations.history.list import ListCommand
23
from awscli.customizations.history.list import RecordAdapter
24
from awscli.customizations.history.list import TextFormatter
25
from awscli.customizations.history.db import DatabaseRecordReader
26
27
28
class TestRecordAdapter(unittest.TestCase):
29
def test_no_records_no_next(self):
30
source = iter([])
31
adapter = RecordAdapter(source)
32
self.assertFalse(adapter.has_next())
33
34
def test_one_record_has_next(self):
35
source = iter([1])
36
adapter = RecordAdapter(source)
37
self.assertTrue(adapter.has_next())
38
39
def test_can_iterate(self):
40
source = iter([1, 2, 3])
41
adapter = RecordAdapter(source)
42
result = [r for r in adapter]
43
self.assertEqual(result, [1, 2, 3])
44
45
46
class TestTextFormatter(unittest.TestCase):
47
_COL_WIDTHS = {
48
'id_a': 10,
49
'timestamp': 23,
50
'args': 10,
51
'rc': 10
52
}
53
54
def setUp(self):
55
self.output_stream = BytesIO()
56
self.formatter = TextFormatter(self._COL_WIDTHS, self.output_stream)
57
58
self.timestamp = 1511376242067
59
command_time = datetime.datetime.fromtimestamp(self.timestamp / 1000)
60
self.formatted_time = datetime.datetime.strftime(
61
command_time, '%Y-%m-%d %I:%M:%S %p')
62
63
def _format_records(self, records):
64
adapter = RecordAdapter(iter(records))
65
self.formatter(adapter)
66
67
def test_can_emit_single_row(self):
68
self._format_records([
69
{
70
'id_a': 'foo',
71
'timestamp': self.timestamp,
72
'args': '["s3", "ls"]',
73
'rc': 0
74
}
75
])
76
expected_output = 'foo %s s3 ls 0\n' % self.formatted_time
77
actual_output = ensure_text_type(self.output_stream.getvalue())
78
self.assertEqual(expected_output, actual_output)
79
80
def test_can_emit_multiple_rows(self):
81
self._format_records([
82
{
83
'id_a': 'foo',
84
'timestamp': self.timestamp,
85
'args': '["s3", "ls"]',
86
'rc': 0
87
},
88
{
89
'id_a': 'bar',
90
'timestamp': self.timestamp,
91
'args': '["s3", "cp"]',
92
'rc': 1
93
}
94
])
95
expected_output = ('foo %s s3 ls 0\n'
96
'bar %s s3 cp 1\n') % (
97
self.formatted_time, self.formatted_time)
98
actual_output = ensure_text_type(self.output_stream.getvalue())
99
self.assertEqual(expected_output, actual_output)
100
101
def test_can_truncate_args(self):
102
# Truncate the argument if it won't fit in the space alotted to the
103
# arguments field.
104
self._format_records([
105
{
106
'id_a': 'foo',
107
'timestamp': self.timestamp,
108
'args': ('["s3", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
109
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"]'),
110
'rc': 0
111
}
112
])
113
expected_output = 'foo %s s3 aaa... 0\n' % self.formatted_time
114
actual_output = ensure_text_type(self.output_stream.getvalue())
115
self.assertEqual(expected_output, actual_output)
116
117
118
class TestListCommand(unittest.TestCase):
119
def setUp(self):
120
self.session = mock.Mock(Session)
121
122
self.output_stream_factory = mock.Mock(OutputStreamFactory)
123
124
# MagicMock is needed because it can handle context managers.
125
# Normal Mock will throw AttributeErrors
126
output_stream_context = mock.MagicMock()
127
self.output_stream = mock.Mock()
128
output_stream_context.__enter__.return_value = self.output_stream
129
130
self.output_stream_factory.get_pager_stream.return_value = \
131
output_stream_context
132
133
self.db_reader = mock.Mock(DatabaseRecordReader)
134
self.db_reader.iter_all_records.return_value = iter([])
135
136
self.list_cmd = ListCommand(
137
self.session, self.db_reader, self.output_stream_factory)
138
139
self.parsed_args = argparse.Namespace()
140
141
self.parsed_globals = argparse.Namespace()
142
self.parsed_globals.color = 'auto'
143
144
def _make_record(self, cid, time, args, rc):
145
record = {
146
'id_a': cid,
147
'timestamp': time,
148
'args': args,
149
'rc': rc
150
}
151
return record
152
153
@mock.patch('awscli.customizations.history.commands.is_windows', False)
154
@mock.patch('awscli.customizations.history.commands.is_a_tty')
155
def test_does_call_iter_all_records(self, mock_is_a_tty):
156
mock_is_a_tty.return_value = True
157
with self.assertRaises(RuntimeError):
158
self.list_cmd._run_main(self.parsed_args, self.parsed_globals)
159
self.assertTrue(self.db_reader.iter_all_records.called)
160
161
@mock.patch('awscli.customizations.history.commands.is_windows', False)
162
@mock.patch('awscli.customizations.history.commands.is_a_tty')
163
def test_list_does_write_values_to_stream(self, mock_is_a_tty):
164
mock_is_a_tty.return_value = True
165
self.db_reader.iter_all_records.return_value = iter([
166
self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')
167
])
168
self.list_cmd._run_main(self.parsed_args, self.parsed_globals)
169
self.assertTrue(self.output_stream.write.called)
170
171
@mock.patch('awscli.customizations.history.commands.is_windows', False)
172
@mock.patch('awscli.customizations.history.commands.is_a_tty')
173
@mock.patch('awscli.customizations.history.list.default_pager', 'less -R')
174
def test_default_pager_has_correct_args_non_windows(self, mock_is_a_tty):
175
mock_is_a_tty.return_value = True
176
self.db_reader.iter_all_records.return_value = iter([
177
self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')
178
])
179
self.list_cmd._run_main(self.parsed_args, self.parsed_globals)
180
self.output_stream_factory.get_pager_stream.assert_called_with(
181
'less -SR')
182
183
@mock.patch('awscli.customizations.history.commands.is_windows', True)
184
@mock.patch('awscli.customizations.history.commands.is_a_tty')
185
@mock.patch('awscli.customizations.history.list.default_pager', 'more')
186
def test_default_pager_has_correct_args_windows(self, mock_is_a_tty):
187
mock_is_a_tty.return_value = True
188
self.db_reader.iter_all_records.return_value = iter([
189
self._make_record('abc', 1511376242067, '["s3", "ls"]', '0')
190
])
191
self.list_cmd._run_main(self.parsed_args, self.parsed_globals)
192
self.output_stream_factory.get_pager_stream.assert_called_with(
193
'more')
194
195