Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/functional/kinesis/test_list_streams.py
1567 views
1
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import json
14
15
from awscli.testutils import BaseAWSCommandParamsTest, BaseAWSHelpOutputTest
16
17
18
class TestListStreams(BaseAWSCommandParamsTest):
19
20
prefix = ['kinesis', 'list-streams']
21
22
def test_exclusive_start_stream_name_disables_auto_pagination(self):
23
cmdline = self.prefix + ['--exclusive-start-stream-name', 'stream-1']
24
self.parsed_responses = [
25
{
26
'StreamNames': ['stream-1', 'stream-2'],
27
'StreamSummaries': [
28
{'StreamName': 'stream-1'},
29
{'StreamName': 'stream-2'},
30
],
31
'HasMoreStreams': True,
32
'NextToken': 'token',
33
}
34
]
35
expected_params = {'ExclusiveStartStreamName': 'stream-1'}
36
stdout, _, _ = self.assert_params_for_cmd(cmdline, expected_params)
37
output = json.dumps(stdout)
38
self.assertIn('NextToken', output)
39
self.assertIn('HasMoreStreams', output)
40
41
def test_exclusive_start_stream_name_incompatible_with_page_args(self):
42
cmdline = self.prefix + ['--exclusive-start-stream-name', 'stream-1']
43
cmdline += ['--page-size', '1']
44
_, stderr, _ = self.run_cmd(cmdline, expected_rc=255)
45
self.assertIn('Error during pagination: Cannot specify', stderr)
46
self.assertIn('--page-size', stderr)
47
48
49
class TestListStreamsHelp(BaseAWSHelpOutputTest):
50
def test_exclusive_start_stream_name_is_undocumented(self):
51
self.driver.main(['kinesis', 'list-streams', 'help'])
52
self.assert_not_contains('--exclusive-start-steam-name')
53
54