Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/integration/test_cli.py
1566 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import time
14
import signal
15
import os
16
import tempfile
17
import random
18
import shutil
19
20
import botocore.session
21
from awscli.testutils import unittest, aws, BaseS3CLICommand
22
from awscli.testutils import temporary_file
23
from awscli.testutils import skip_if_windows
24
from awscli.clidriver import create_clidriver
25
26
27
class TestBasicCommandFunctionality(unittest.TestCase):
28
"""
29
These are a set of tests that assert high level features of
30
the CLI. They don't test anything exhaustively and they're meant as a smoke
31
test to verify basic CLI functionality isn't entirely broken.
32
"""
33
34
def put_object(self, bucket, key, content, extra_args=None):
35
session = botocore.session.get_session()
36
client = session.create_client('s3', 'us-east-1')
37
client.create_bucket(Bucket=bucket, ObjectOwnership='ObjectWriter')
38
time.sleep(5)
39
client.delete_public_access_block(Bucket=bucket)
40
self.addCleanup(client.delete_bucket, Bucket=bucket)
41
call_args = {
42
'Bucket': bucket,
43
'Key': key, 'Body': content
44
}
45
if extra_args is not None:
46
call_args.update(extra_args)
47
client.put_object(**call_args)
48
self.addCleanup(client.delete_object, Bucket=bucket, Key=key)
49
50
def test_ec2_describe_instances(self):
51
# Verify we can make a call and get output.
52
p = aws('ec2 describe-instances')
53
self.assertEqual(p.rc, 0)
54
# We don't know what instances a user might have, but we know
55
# there should at least be a Reservations key.
56
self.assertIn('Reservations', p.json)
57
58
def test_help_output(self):
59
p = aws('help')
60
self.assertEqual(p.rc, 0)
61
self.assertIn('AWS', p.stdout)
62
self.assertRegex(
63
p.stdout, r'The\s+AWS\s+Command\s+Line\s+Interface')
64
65
def test_service_help_output(self):
66
p = aws('ec2 help')
67
self.assertEqual(p.rc, 0)
68
self.assertRegex(p.stdout, r'Amazon\s+EC2')
69
70
def test_operation_help_output(self):
71
p = aws('ec2 describe-instances help')
72
self.assertEqual(p.rc, 0)
73
# XXX: This is a rendering bug that needs to be fixed in bcdoc. In
74
# the RST version there are multiple spaces between certain words.
75
# For now we're making the test less strict about formatting, but
76
# we eventually should update this test to check exactly for
77
# 'The describe-instances operation'.
78
self.assertRegex(p.stdout, r'\s+Describes\s+the\s+specified\s+instances')
79
80
def test_topic_list_help_output(self):
81
p = aws('help topics')
82
self.assertEqual(p.rc, 0)
83
self.assertRegex(p.stdout, r'\s+AWS\s+CLI\s+Topic\s+Guide')
84
self.assertRegex(
85
p.stdout,
86
r'\s+This\s+is\s+the\s+AWS\s+CLI\s+Topic\s+Guide'
87
)
88
89
def test_topic_help_output(self):
90
p = aws('help return-codes')
91
self.assertEqual(p.rc, 0)
92
self.assertRegex(p.stdout, r'\s+AWS\s+CLI\s+Return\s+Codes')
93
self.assertRegex(
94
p.stdout,
95
r'These\s+are\s+the\s+following\s+return\s+codes'
96
)
97
98
def test_operation_help_with_required_arg(self):
99
p = aws('s3api get-object help')
100
self.assertEqual(p.rc, 0, p.stderr)
101
self.assertIn('get-object', p.stdout)
102
103
def test_service_help_with_required_option(self):
104
# In cloudsearchdomain, the --endpoint-url is required.
105
# We want to make sure if you're just getting help tex
106
# that we don't trigger that validation.
107
p = aws('cloudsearchdomain help')
108
self.assertEqual(p.rc, 0, p.stderr)
109
self.assertIn('cloudsearchdomain', p.stdout)
110
# And nothing on stderr about missing options.
111
self.assertEqual(p.stderr, '')
112
113
def test_operation_help_with_required_option(self):
114
p = aws('cloudsearchdomain search help')
115
self.assertEqual(p.rc, 0, p.stderr)
116
self.assertIn('search', p.stdout)
117
# And nothing on stderr about missing options.
118
self.assertEqual(p.stderr, '')
119
120
def test_help_with_warning_blocks(self):
121
p = aws('elastictranscoder create-pipeline help')
122
self.assertEqual(p.rc, 0, p.stderr)
123
# Check text that appears in the warning block to ensure
124
# the block was actually rendered.
125
self.assertRegex(p.stdout, r'To\s+receive\s+notifications')
126
127
def test_param_shorthand(self):
128
p = aws(
129
'ec2 describe-instances --filters Name=instance-id,Values=i-123')
130
self.assertEqual(p.rc, 0)
131
self.assertIn('Reservations', p.json)
132
133
def test_param_json(self):
134
p = aws(
135
'ec2 describe-instances --filters '
136
'\'{"Name": "instance-id", "Values": ["i-123"]}\'')
137
self.assertEqual(p.rc, 0, p.stdout + p.stderr)
138
self.assertIn('Reservations', p.json)
139
140
def test_param_with_bad_json(self):
141
p = aws(
142
'ec2 describe-instances --filters '
143
'\'{"Name": "bad-filter", "Values": ["i-123"]}\'')
144
self.assertEqual(p.rc, 255)
145
self.assertIn("The filter 'bad-filter' is invalid", p.stderr,
146
"stdout: %s, stderr: %s" % (p.stdout, p.stderr))
147
148
def test_param_with_file(self):
149
d = tempfile.mkdtemp()
150
self.addCleanup(os.rmdir, d)
151
param_file = os.path.abspath(os.path.join(d, 'params.json'))
152
with open(param_file, 'w') as f:
153
f.write('[{"Name": "instance-id", "Values": ["i-123"]}]')
154
self.addCleanup(os.remove, param_file)
155
p = aws('ec2 describe-instances --filters file://%s' % param_file)
156
self.assertEqual(p.rc, 0)
157
self.assertIn('Reservations', p.json)
158
159
def test_streaming_output_operation(self):
160
d = tempfile.mkdtemp()
161
self.addCleanup(shutil.rmtree, d)
162
bucket_name = 'clistream' + str(
163
int(time.time())) + str(random.randint(1, 100))
164
165
self.put_object(bucket=bucket_name, key='foobar',
166
content='foobar contents')
167
p = aws('s3api get-object --bucket %s --key foobar %s' % (
168
bucket_name, os.path.join(d, 'foobar')))
169
self.assertEqual(p.rc, 0)
170
with open(os.path.join(d, 'foobar')) as f:
171
contents = f.read()
172
self.assertEqual(contents, 'foobar contents')
173
174
def test_no_sign_request(self):
175
d = tempfile.mkdtemp()
176
self.addCleanup(shutil.rmtree, d)
177
178
env_vars = os.environ.copy()
179
env_vars['AWS_ACCESS_KEY_ID'] = 'foo'
180
env_vars['AWS_SECRET_ACCESS_KEY'] = 'bar'
181
182
bucket_name = 'nosign' + str(
183
int(time.time())) + str(random.randint(1, 100))
184
self.put_object(bucket_name, 'foo', content='bar',
185
extra_args={'ACL': 'public-read-write'})
186
187
p = aws('s3api get-object --bucket %s --key foo %s' % (
188
bucket_name, os.path.join(d, 'foo')), env_vars=env_vars)
189
# Should have credential issues.
190
self.assertEqual(p.rc, 255)
191
192
p = aws('s3api get-object --bucket %s --key foo '
193
'%s --no-sign-request' % (bucket_name, os.path.join(d, 'foo')),
194
env_vars=env_vars)
195
196
# Should be able to download the file when not signing.
197
self.assertEqual(p.rc, 0)
198
199
with open(os.path.join(d, 'foo')) as f:
200
contents = f.read()
201
self.assertEqual(contents, 'bar')
202
203
def test_no_paginate_arg(self):
204
d = tempfile.mkdtemp()
205
self.addCleanup(shutil.rmtree, d)
206
bucket_name = 'nopaginate' + str(
207
int(time.time())) + str(random.randint(1, 100))
208
209
self.put_object(bucket=bucket_name, key='foobar',
210
content='foobar contents')
211
p = aws('s3api list-objects --bucket %s --no-paginate' % bucket_name)
212
self.assertEqual(p.rc, 0, p.stdout + p.stderr)
213
214
p = aws('s3api list-objects --bucket %s' % bucket_name)
215
self.assertEqual(p.rc, 0, p.stdout + p.stderr)
216
217
def test_top_level_options_debug(self):
218
p = aws('ec2 describe-instances --debug')
219
self.assertEqual(p.rc, 0)
220
self.assertIn('DEBUG', p.stderr)
221
222
def test_make_requests_to_other_region(self):
223
p = aws('ec2 describe-instances --region us-west-2')
224
self.assertEqual(p.rc, 0)
225
self.assertIn('Reservations', p.json)
226
227
def test_help_usage_top_level(self):
228
p = aws('')
229
self.assertIn('usage: aws [options] <command> '
230
'<subcommand> [<subcommand> ...] [parameters]', p.stderr)
231
self.assertIn('aws: error', p.stderr)
232
233
def test_help_usage_service_level(self):
234
p = aws('ec2')
235
self.assertIn('usage: aws [options] <command> '
236
'<subcommand> [<subcommand> ...] [parameters]', p.stderr)
237
# python3: aws: error: the following arguments are required: operation
238
# python2: aws: error: too few arguments
239
# We don't care too much about the specific error message, as long
240
# as it says we have a parse error.
241
self.assertIn('aws: error', p.stderr)
242
243
def test_help_usage_operation_level(self):
244
p = aws('ec2 start-instances')
245
self.assertIn('usage: aws [options] <command> '
246
'<subcommand> [<subcommand> ...] [parameters]', p.stderr)
247
248
def test_unknown_argument(self):
249
p = aws('ec2 describe-instances --filterss')
250
self.assertEqual(p.rc, 255)
251
self.assertIn('Unknown options: --filterss', p.stderr)
252
253
def test_table_output(self):
254
p = aws('ec2 describe-instances --output table --color off')
255
# We're not testing the specifics of table output, we just want
256
# to make sure the output looks like a table using some heuristics.
257
# If this prints JSON instead of a table, for example, this test
258
# should fail.
259
self.assertEqual(p.rc, 0, p.stderr)
260
self.assertIn('-----', p.stdout)
261
self.assertIn('+-', p.stdout)
262
self.assertIn('DescribeInstances', p.stdout)
263
264
def test_version(self):
265
p = aws('--version')
266
self.assertEqual(p.rc, 0)
267
# The version is wrote to standard out for Python 3.4 and
268
# standard error for other Python versions.
269
version_output = p.stderr.startswith('aws-cli') or \
270
p.stdout.startswith('aws-cli')
271
self.assertTrue(version_output, p.stderr)
272
273
def test_traceback_printed_when_debug_on(self):
274
p = aws('ec2 describe-instances --filters BADKEY=foo --debug')
275
self.assertIn('Traceback (most recent call last):', p.stderr, p.stderr)
276
# Also should see DEBUG statements:
277
self.assertIn('DEBUG', p.stderr, p.stderr)
278
279
def test_leftover_args_in_operation(self):
280
p = aws('ec2 describe-instances BADKEY=foo')
281
self.assertEqual(p.rc, 255)
282
self.assertIn("Unknown option", p.stderr, p.stderr)
283
284
def test_json_param_parsing(self):
285
# This is convered by unit tests in botocore, but this is a sanity
286
# check that we get a json response from a json service.
287
p = aws('swf list-domains --registration-status REGISTERED')
288
self.assertEqual(p.rc, 0)
289
self.assertIsInstance(p.json, dict)
290
291
p = aws('dynamodb list-tables')
292
self.assertEqual(p.rc, 0)
293
self.assertIsInstance(p.json, dict)
294
295
def test_pagination_with_text_output(self):
296
p = aws('iam list-users --output text')
297
self.assertEqual(p.rc, 0)
298
299
def test_bad_lc_ctype_env_var_is_handled(self):
300
# Test for bad LC_CTYPE on Mac OS X.
301
base_env_vars = os.environ.copy()
302
base_env_vars['LC_CTYPE'] = 'UTF-8'
303
p = aws('iam list-users', env_vars=base_env_vars)
304
self.assertEqual(p.rc, 0)
305
306
def test_error_msg_with_no_region_configured(self):
307
environ = os.environ.copy()
308
try:
309
del environ['AWS_DEFAULT_REGION']
310
except KeyError:
311
pass
312
environ['AWS_CONFIG_FILE'] = 'nowhere-foo'
313
p = aws('ec2 describe-instances', env_vars=environ)
314
self.assertIn('must specify a region', p.stderr)
315
316
@skip_if_windows('Ctrl-C not supported on windows.')
317
def test_ctrl_c_does_not_print_traceback(self):
318
# Relying on the fact that this generally takes
319
# more than 1 second to complete.
320
process = aws('ec2 describe-images', wait_for_finish=False)
321
time.sleep(1)
322
process.send_signal(signal.SIGINT)
323
stdout, stderr = process.communicate()
324
self.assertNotIn(b'Traceback', stdout)
325
self.assertNotIn(b'Traceback', stderr)
326
327
328
class TestCommandLineage(unittest.TestCase):
329
def setUp(self):
330
self.driver = create_clidriver()
331
self.top_help = self.driver.create_help_command()
332
333
def assert_lineage_names(self, ref_lineage_names):
334
command_table = self.top_help.command_table
335
for i, cmd_name in enumerate(ref_lineage_names):
336
command = command_table[cmd_name]
337
help_command = command.create_help_command()
338
command_table = help_command.command_table
339
340
actual_lineage_names = []
341
for cmd in command.lineage:
342
actual_lineage_names.append(cmd.name)
343
344
# Assert the actual names of each command in a lineage is as expected.
345
self.assertEqual(actual_lineage_names, ref_lineage_names)
346
347
# Assert that ``lineage_names`` for each command is in sync with what
348
# is actually in the command's ``lineage``.
349
self.assertEqual(command.lineage_names, actual_lineage_names)
350
351
def test_service_level_commands(self):
352
# Check a normal unchanged service command
353
self.assert_lineage_names(['ec2'])
354
355
# Check a service that had its name changed.
356
self.assert_lineage_names(['s3api'])
357
358
# Check a couple custom service level commands.
359
self.assert_lineage_names(['s3'])
360
self.assert_lineage_names(['configure'])
361
362
def test_operation_level_commands(self):
363
# Check a normal unchanged service and operation command
364
self.assert_lineage_names(['dynamodb', 'create-table'])
365
366
# Check an operation commands with a service that had its name changed.
367
self.assert_lineage_names(['s3api', 'list-objects'])
368
369
# Check a custom operation level command with no
370
# custom service command.
371
self.assert_lineage_names(['emr', 'create-cluster'])
372
373
# Check a couple of operation level commands that
374
# are based off a custom service command
375
self.assert_lineage_names(['configure', 'set'])
376
self.assert_lineage_names(['s3', 'cp'])
377
378
def test_wait_commands(self):
379
self.assert_lineage_names(['ec2', 'wait'])
380
self.assert_lineage_names(['ec2', 'wait', 'instance-running'])
381
382
383
# We're using BaseS3CLICommand because we need a service to use
384
# for testing the global arguments. We're picking S3 here because
385
# the BaseS3CLICommand has a lot of utility functions that help
386
# with this.
387
class TestGlobalArgs(BaseS3CLICommand):
388
389
def test_endpoint_url(self):
390
p = aws('s3api list-objects --bucket dnscompat '
391
'--endpoint-url http://localhost:51515 '
392
'--debug')
393
debug_logs = p.stderr
394
original_hostname = 'dnscompat.s3.amazonaws.com'
395
expected = 'localhost'
396
self.assertNotIn(original_hostname, debug_logs,
397
'--endpoint-url is being ignored.')
398
self.assertIn(expected, debug_logs)
399
400
def test_no_pagination(self):
401
bucket_name = self.create_bucket()
402
self.put_object(bucket_name, 'foo.txt', contents=b'bar')
403
self.put_object(bucket_name, 'foo2.txt', contents=b'bar')
404
self.put_object(bucket_name, 'foo3.txt', contents=b'bar')
405
p = aws('s3api list-objects --bucket %s '
406
'--no-paginate --output json' % bucket_name)
407
# A really simple way to check that --no-paginate was
408
# honored is to see if we have all the mirrored input
409
# arguments in the response json. These normally aren't
410
# present when the response is paginated.
411
self.assert_no_errors(p)
412
response_json = p.json
413
self.assertIn('IsTruncated', response_json)
414
self.assertIn('Name', response_json)
415
416
def test_no_paginate_and_original_args(self):
417
bucket_name = self.create_bucket()
418
self.put_object(bucket_name, 'foo.txt', contents=b'bar')
419
self.put_object(bucket_name, 'foo2.txt', contents=b'bar')
420
self.put_object(bucket_name, 'foo3.txt', contents=b'bar')
421
p = aws('s3api list-objects --bucket %s '
422
'--max-keys 1 --no-paginate --output json' % bucket_name)
423
self.assert_no_errors(p)
424
response_json = p.json
425
self.assertEqual(len(response_json['Contents']), 1)
426
427
def test_max_items(self):
428
bucket_name = self.create_bucket()
429
self.put_object(bucket_name, 'foo.txt', contents=b'bar')
430
self.put_object(bucket_name, 'foo2.txt', contents=b'bar')
431
self.put_object(bucket_name, 'foo3.txt', contents=b'bar')
432
p = aws('s3api list-objects --bucket %s '
433
'--max-items 1 --output json' % bucket_name)
434
self.assert_no_errors(p)
435
response_json = p.json
436
self.assertEqual(len(response_json['Contents']), 1)
437
438
def test_query(self):
439
bucket_name = self.create_bucket()
440
self.put_object(bucket_name, 'foo.txt', contents=b'bar')
441
p = aws('s3api list-objects --bucket %s '
442
'--query Contents[].Key --output json' % bucket_name)
443
self.assert_no_errors(p)
444
response_json = p.json
445
self.assertEqual(response_json, ['foo.txt'])
446
447
def test_no_sign_requests(self):
448
bucket_name = self.create_bucket()
449
self.put_object(bucket_name, 'public', contents=b'bar',
450
extra_args={'ACL': 'public-read'})
451
self.put_object(bucket_name, 'private', contents=b'bar')
452
env = os.environ.copy()
453
# Set the env vars to bad values so if we do actually
454
# try to sign the request, we'll get an auth error.
455
env['AWS_ACCESS_KEY_ID'] = 'foo'
456
env['AWS_SECRET_ACCESS_KEY'] = 'bar'
457
p = aws('s3api head-object --bucket %s --key public --no-sign-request'
458
% bucket_name, env_vars=env)
459
self.assert_no_errors(p)
460
self.assertIn('ETag', p.json)
461
462
# Should fail because we're not signing the request but the object is
463
# private.
464
p = aws('s3api head-object --bucket %s --key private --no-sign-request'
465
% bucket_name, env_vars=env)
466
self.assertEqual(p.rc, 255)
467
468
def test_profile_arg_has_precedence_over_env_vars(self):
469
# At a high level, we're going to set access_key/secret_key
470
# via env vars, but ensure that a --profile <foo> results
471
# in creds being retrieved from the shared creds file
472
# and not from env vars.
473
env_vars = os.environ.copy()
474
with temporary_file('w') as f:
475
env_vars.pop('AWS_PROFILE', None)
476
env_vars.pop('AWS_DEFAULT_PROFILE', None)
477
# 'aws configure list' only shows 4 values
478
# from the credentials so we'll show
479
# 4 char values.
480
env_vars['AWS_ACCESS_KEY_ID'] = 'enva'
481
env_vars['AWS_SECRET_ACCESS_KEY'] = 'envb'
482
env_vars['AWS_SHARED_CREDENTIALS_FILE'] = f.name
483
env_vars['AWS_CONFIG_FILE'] = 'does-not-exist-foo'
484
f.write(
485
'[from_argument]\n'
486
'aws_access_key_id=proa\n'
487
'aws_secret_access_key=prob\n'
488
)
489
f.flush()
490
p = aws('configure list --profile from_argument',
491
env_vars=env_vars)
492
# 1. We should see the profile name being set.
493
self.assertIn('from_argument', p.stdout)
494
# 2. The creds should be proa/prob, which come
495
# from the "from_argument" profile.
496
self.assertIn('proa', p.stdout)
497
self.assertIn('prob', p.stdout)
498
self.assertIn('shared-credentials-file', p.stdout)
499
500
def test_profile_arg_wins_over_profile_env_var(self):
501
env_vars = os.environ.copy()
502
with temporary_file('w') as f:
503
# Remove existing profile related env vars.
504
env_vars.pop('AWS_PROFILE', None)
505
env_vars.pop('AWS_DEFAULT_PROFILE', None)
506
env_vars['AWS_SHARED_CREDENTIALS_FILE'] = f.name
507
env_vars['AWS_CONFIG_FILE'] = 'does-not-exist-foo'
508
f.write(
509
'[from_env_var]\n'
510
'aws_access_key_id=enva\n'
511
'aws_secret_access_key=envb\n'
512
'\n'
513
'[from_argument]\n'
514
'aws_access_key_id=proa\n'
515
'aws_secret_access_key=prob\n'
516
)
517
f.flush()
518
# Now we set the current profile via env var:
519
env_vars['AWS_PROFILE'] = 'from_env_var'
520
# If we specify the --profile argument, that
521
# value should win over the AWS_PROFILE env var.
522
p = aws('configure list --profile from_argument',
523
env_vars=env_vars)
524
# 1. We should see the profile name being set.
525
self.assertIn('from_argument', p.stdout)
526
# 2. The creds should be profa/profb, which come
527
# from the "from_argument" profile.
528
self.assertIn('proa', p.stdout)
529
self.assertIn('prob', p.stdout)
530
531
532
if __name__ == '__main__':
533
unittest.main()
534
535