Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/integration/customizations/s3/test_plugin.py
1567 views
1
# -*- coding: utf-8 -*-
2
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License"). You
5
# may not use this file except in compliance with the License. A copy of
6
# the License is located at
7
#
8
# http://aws.amazon.com/apache2.0/
9
#
10
# or in the "license" file accompanying this file. This file is
11
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12
# ANY KIND, either express or implied. See the License for the specific
13
# language governing permissions and limitations under the License.
14
15
# The following tests are performed to ensure that the commands work.
16
# It does not check every possible parameter that can be thrown as
17
# those are checked by tests in other classes
18
import os
19
import platform
20
import contextlib
21
import time
22
import stat
23
import signal
24
import string
25
import socket
26
import tempfile
27
import shutil
28
import copy
29
import logging
30
31
import pytest
32
33
from awscli.compat import BytesIO, urlopen
34
import botocore.session
35
36
from awscli.testutils import unittest, get_stdout_encoding
37
from awscli.testutils import skip_if_windows
38
from awscli.testutils import aws as _aws
39
from awscli.testutils import BaseS3CLICommand
40
from awscli.testutils import random_chars, random_bucket_name
41
from awscli.customizations.s3.transferconfig import DEFAULTS
42
from awscli.customizations.scalarparse import add_scalar_parsers, identity
43
44
45
# Using the same log name as testutils.py
46
LOG = logging.getLogger('awscli.tests.integration')
47
_SHARED_BUCKET = random_bucket_name()
48
_NON_EXISTENT_BUCKET = random_bucket_name()
49
_DEFAULT_REGION = 'us-west-2'
50
_DEFAULT_AZ = 'usw2-az1'
51
_SHARED_DIR_BUCKET = f'{random_bucket_name()}--{_DEFAULT_AZ}--x-s3'
52
53
54
def setup_module():
55
s3 = botocore.session.get_session().create_client('s3')
56
waiter = s3.get_waiter('bucket_exists')
57
params = {
58
'Bucket': _SHARED_BUCKET,
59
'CreateBucketConfiguration': {
60
'LocationConstraint': _DEFAULT_REGION,
61
},
62
'ObjectOwnership': 'ObjectWriter'
63
}
64
dir_bucket_params = {
65
'Bucket': _SHARED_DIR_BUCKET,
66
'CreateBucketConfiguration': {
67
'Location': {
68
'Type': 'AvailabilityZone',
69
'Name': _DEFAULT_AZ
70
},
71
'Bucket': {
72
'Type': 'Directory',
73
'DataRedundancy': 'SingleAvailabilityZone'
74
}
75
}
76
}
77
try:
78
s3.create_bucket(**params)
79
s3.create_bucket(**dir_bucket_params)
80
except Exception as e:
81
# A create_bucket can fail for a number of reasons.
82
# We're going to defer to the waiter below to make the
83
# final call as to whether or not the bucket exists.
84
LOG.debug("create_bucket() raised an exception: %s", e, exc_info=True)
85
waiter.wait(Bucket=_SHARED_BUCKET)
86
waiter.wait(Bucket=_SHARED_DIR_BUCKET)
87
s3.delete_public_access_block(
88
Bucket=_SHARED_BUCKET
89
)
90
91
# Validate that "_NON_EXISTENT_BUCKET" doesn't exist.
92
waiter = s3.get_waiter('bucket_not_exists')
93
try:
94
waiter.wait(Bucket=_NON_EXISTENT_BUCKET)
95
except Exception as e:
96
LOG.debug(
97
f"The following bucket was unexpectedly discovered: {_NON_EXISTENT_BUCKET}",
98
e,
99
exc_info=True,
100
)
101
102
103
def clear_out_bucket(bucket, delete_bucket=False):
104
s3 = botocore.session.get_session().create_client(
105
's3', region_name=_DEFAULT_REGION)
106
page = s3.get_paginator('list_objects_v2')
107
# Use pages paired with batch delete_objects().
108
for page in page.paginate(Bucket=bucket):
109
keys = [{'Key': obj['Key']} for obj in page.get('Contents', [])]
110
if keys:
111
s3.delete_objects(Bucket=bucket, Delete={'Objects': keys})
112
if delete_bucket:
113
try:
114
s3.delete_bucket(Bucket=bucket)
115
except Exception as e:
116
# We can sometimes get exceptions when trying to
117
# delete a bucket. We'll let the waiter make
118
# the final call as to whether the bucket was able
119
# to be deleted.
120
LOG.debug("delete_bucket() raised an exception: %s",
121
e, exc_info=True)
122
waiter = s3.get_waiter('bucket_not_exists')
123
waiter.wait(Bucket=bucket)
124
125
126
def teardown_module():
127
clear_out_bucket(_SHARED_BUCKET, delete_bucket=True)
128
clear_out_bucket(_SHARED_DIR_BUCKET, delete_bucket=True)
129
130
131
@contextlib.contextmanager
132
def cd(directory):
133
original = os.getcwd()
134
try:
135
os.chdir(directory)
136
yield
137
finally:
138
os.chdir(original)
139
140
141
def aws(command, collect_memory=False, env_vars=None, wait_for_finish=True,
142
input_data=None, input_file=None):
143
if not env_vars:
144
env_vars = os.environ.copy()
145
env_vars['AWS_DEFAULT_REGION'] = "us-west-2"
146
return _aws(command, collect_memory=collect_memory, env_vars=env_vars,
147
wait_for_finish=wait_for_finish, input_data=input_data,
148
input_file=input_file)
149
150
151
def wait_for_process_exit(process, timeout=60):
152
deadline = time.time() + timeout
153
while time.time() < deadline:
154
rc = process.poll()
155
if rc is not None:
156
break
157
time.sleep(1)
158
else:
159
process.kill()
160
raise AssertionError("CLI did not exist within %s seconds of "
161
"receiving a Ctrl+C" % timeout)
162
163
164
def _running_on_rhel():
165
return (
166
hasattr(platform, 'linux_distribution') and
167
platform.linux_distribution()[0] == 'Red Hat Enterprise Linux Server')
168
169
170
class BaseS3IntegrationTest(BaseS3CLICommand):
171
172
def setUp(self):
173
clear_out_bucket(_SHARED_BUCKET)
174
clear_out_bucket(_SHARED_DIR_BUCKET)
175
super(BaseS3IntegrationTest, self).setUp()
176
177
178
class TestMoveCommand(BaseS3IntegrationTest):
179
def assert_mv_local_to_s3(self, bucket_name):
180
full_path = self.files.create_file('foo.txt', 'this is foo.txt')
181
p = aws('s3 mv %s s3://%s/foo.txt' % (full_path,
182
bucket_name))
183
self.assert_no_errors(p)
184
# When we move an object, the local file is gone:
185
self.assertTrue(not os.path.exists(full_path))
186
# And now resides in s3.
187
self.assert_key_contents_equal(bucket_name, 'foo.txt',
188
'this is foo.txt')
189
190
def assert_mv_s3_to_local(self, bucket_name):
191
self.put_object(bucket_name, 'foo.txt', 'this is foo.txt')
192
full_path = self.files.full_path('foo.txt')
193
self.assertTrue(self.key_exists(bucket_name, key_name='foo.txt'))
194
p = aws('s3 mv s3://%s/foo.txt %s' % (bucket_name, full_path))
195
self.assert_no_errors(p)
196
self.assertTrue(os.path.exists(full_path))
197
with open(full_path, 'r') as f:
198
self.assertEqual(f.read(), 'this is foo.txt')
199
# The s3 file should not be there anymore.
200
self.assertTrue(self.key_not_exists(bucket_name, key_name='foo.txt'))
201
202
def assert_mv_s3_to_s3(self, from_bucket, create_bucket_call):
203
to_bucket = create_bucket_call()
204
self.put_object(from_bucket, 'foo.txt', 'this is foo.txt')
205
206
p = aws('s3 mv s3://%s/foo.txt s3://%s/foo.txt' % (from_bucket,
207
to_bucket))
208
self.assert_no_errors(p)
209
contents = self.get_key_contents(to_bucket, 'foo.txt')
210
self.assertEqual(contents, 'this is foo.txt')
211
# And verify that the object no longer exists in the from_bucket.
212
self.assertTrue(self.key_not_exists(from_bucket, key_name='foo.txt'))
213
214
def test_mv_local_to_s3(self):
215
self.assert_mv_local_to_s3(_SHARED_BUCKET)
216
217
def test_mv_local_to_s3_express(self):
218
self.assert_mv_local_to_s3(_SHARED_DIR_BUCKET)
219
220
def test_mv_s3_to_local(self):
221
self.assert_mv_s3_to_local(_SHARED_BUCKET)
222
223
def test_mv_s3_express_to_local(self):
224
self.assert_mv_s3_to_local(_SHARED_DIR_BUCKET)
225
226
def test_mv_s3_to_s3(self):
227
self.assert_mv_s3_to_s3(_SHARED_BUCKET, self.create_bucket)
228
229
def test_mv_s3_to_s3_express(self):
230
self.assert_mv_s3_to_s3(_SHARED_BUCKET, self.create_dir_bucket)
231
232
def test_mv_s3_express_to_s3_express(self):
233
self.assert_mv_s3_to_s3(_SHARED_DIR_BUCKET, self.create_dir_bucket)
234
235
def test_mv_s3_express_to_s3(self):
236
self.assert_mv_s3_to_s3(_SHARED_DIR_BUCKET, self.create_bucket)
237
238
@pytest.mark.slow
239
def test_mv_s3_to_s3_multipart(self):
240
from_bucket = _SHARED_BUCKET
241
to_bucket = self.create_bucket()
242
file_contents = BytesIO(b'abcd' * (1024 * 1024 * 10))
243
self.put_object(from_bucket, 'foo.txt', file_contents)
244
245
p = aws('s3 mv s3://%s/foo.txt s3://%s/foo.txt' % (from_bucket,
246
to_bucket))
247
self.assert_no_errors(p)
248
self.assert_key_contents_equal(to_bucket, 'foo.txt', file_contents)
249
# And verify that the object no longer exists in the from_bucket.
250
self.assertTrue(self.key_not_exists(from_bucket, key_name='foo.txt'))
251
252
def test_mv_s3_to_s3_multipart_recursive(self):
253
from_bucket = _SHARED_BUCKET
254
to_bucket = self.create_bucket()
255
256
large_file_contents = BytesIO(b'abcd' * (1024 * 1024 * 10))
257
small_file_contents = 'small file contents'
258
self.put_object(from_bucket, 'largefile', large_file_contents)
259
self.put_object(from_bucket, 'smallfile', small_file_contents)
260
261
p = aws('s3 mv s3://%s/ s3://%s/ --recursive' % (from_bucket,
262
to_bucket))
263
self.assert_no_errors(p)
264
# Nothing's in the from_bucket.
265
self.assertTrue(self.key_not_exists(from_bucket,
266
key_name='largefile'))
267
self.assertTrue(self.key_not_exists(from_bucket,
268
key_name='smallfile'))
269
270
# And both files are in the to_bucket.
271
self.assertTrue(self.key_exists(to_bucket, key_name='largefile'))
272
self.assertTrue(self.key_exists(to_bucket, key_name='smallfile'))
273
274
# And the contents are what we expect.
275
self.assert_key_contents_equal(to_bucket, 'smallfile',
276
small_file_contents)
277
self.assert_key_contents_equal(to_bucket, 'largefile',
278
large_file_contents)
279
280
def test_mv_s3_to_s3_with_sig4(self):
281
to_region = 'eu-central-1'
282
from_region = 'us-west-2'
283
284
from_bucket = self.create_bucket(region=from_region)
285
to_bucket = self.create_bucket(region=to_region)
286
287
file_name = 'hello.txt'
288
file_contents = 'hello'
289
self.put_object(from_bucket, file_name, file_contents)
290
291
p = aws('s3 mv s3://{0}/{4} s3://{1}/{4} '
292
'--source-region {2} --region {3}'
293
.format(from_bucket, to_bucket, from_region, to_region,
294
file_name))
295
self.assert_no_errors(p)
296
297
self.assertTrue(self.key_not_exists(from_bucket, file_name))
298
self.assertTrue(self.key_exists(to_bucket, file_name))
299
300
@pytest.mark.slow
301
def test_mv_with_large_file(self):
302
bucket_name = _SHARED_BUCKET
303
# 40MB will force a multipart upload.
304
file_contents = BytesIO(b'abcd' * (1024 * 1024 * 10))
305
foo_txt = self.files.create_file(
306
'foo.txt', file_contents.getvalue().decode('utf-8'))
307
p = aws('s3 mv %s s3://%s/foo.txt' % (foo_txt, bucket_name))
308
self.assert_no_errors(p)
309
# When we move an object, the local file is gone:
310
self.assertTrue(not os.path.exists(foo_txt))
311
# And now resides in s3.
312
self.assert_key_contents_equal(bucket_name, 'foo.txt', file_contents)
313
314
# Now verify we can download this file.
315
p = aws('s3 mv s3://%s/foo.txt %s' % (bucket_name, foo_txt))
316
self.assert_no_errors(p)
317
self.assertTrue(os.path.exists(foo_txt))
318
self.assertEqual(os.path.getsize(foo_txt),
319
len(file_contents.getvalue()))
320
321
def test_mv_to_nonexistent_bucket(self):
322
full_path = self.files.create_file('foo.txt', 'this is foo.txt')
323
p = aws(f's3 mv {full_path} s3://{_NON_EXISTENT_BUCKET}/foo.txt')
324
self.assertEqual(p.rc, 1)
325
326
def test_cant_move_file_onto_itself_small_file(self):
327
# We don't even need a remote file in this case. We can
328
# immediately validate that we can't move a file onto itself.
329
bucket_name = _SHARED_BUCKET
330
self.put_object(bucket_name, key_name='key.txt', contents='foo')
331
p = aws('s3 mv s3://%s/key.txt s3://%s/key.txt' %
332
(bucket_name, bucket_name))
333
self.assertEqual(p.rc, 255)
334
self.assertIn('Cannot mv a file onto itself', p.stderr)
335
336
def test_cant_move_large_file_onto_itself(self):
337
# At the API level, you can multipart copy an object onto itself,
338
# but a mv command doesn't make sense because a mv is just a
339
# cp + an rm of the src file. We should be consistent and
340
# not allow large files to be mv'd onto themselves.
341
file_contents = BytesIO(b'a' * (1024 * 1024 * 10))
342
bucket_name = _SHARED_BUCKET
343
self.put_object(bucket_name, key_name='key.txt',
344
contents=file_contents)
345
p = aws('s3 mv s3://%s/key.txt s3://%s/key.txt' %
346
(bucket_name, bucket_name))
347
self.assertEqual(p.rc, 255)
348
self.assertIn('Cannot mv a file onto itself', p.stderr)
349
350
351
class TestRm(BaseS3IntegrationTest):
352
def assert_rm_with_page_size(self, bucket_name):
353
self.put_object(bucket_name, 'foo.txt', contents='hello world')
354
self.put_object(bucket_name, 'bar.txt', contents='hello world2')
355
p = aws('s3 rm s3://%s/ --recursive --page-size 1' % bucket_name)
356
self.assert_no_errors(p)
357
358
self.assertTrue(self.key_not_exists(bucket_name, key_name='foo.txt'))
359
self.assertTrue(self.key_not_exists(bucket_name, key_name='bar.txt'))
360
@skip_if_windows('Newline in filename test not valid on windows.')
361
# Windows won't let you do this. You'll get:
362
# [Errno 22] invalid mode ('w') or filename:
363
# 'c:\\windows\\temp\\tmp0fv8uu\\foo\r.txt'
364
def test_rm_with_newlines(self):
365
bucket_name = _SHARED_BUCKET
366
367
# Note the carriage return in the key name.
368
foo_txt = self.files.create_file('foo\r.txt', 'this is foo.txt')
369
p = aws('s3 cp %s s3://%s/foo\r.txt' % (foo_txt, bucket_name))
370
self.assert_no_errors(p)
371
372
# Make sure object is in bucket.
373
self.assertTrue(self.key_exists(bucket_name, key_name='foo\r.txt'))
374
375
# Then delete the file.
376
p = aws('s3 rm s3://%s/ --recursive' % (bucket_name,))
377
378
# And verify it's gone.
379
self.assertTrue(self.key_not_exists(bucket_name, key_name='foo\r.txt'))
380
381
def test_rm_with_page_size(self):
382
self.assert_rm_with_page_size(_SHARED_BUCKET)
383
384
def test_s3_express_rm_with_page_size(self):
385
self.assert_rm_with_page_size(_SHARED_DIR_BUCKET)
386
387
388
class TestCp(BaseS3IntegrationTest):
389
390
def assert_cp_to_and_from_s3(self, bucket_name):
391
# This tests the ability to put a single file in s3
392
# move it to a different bucket.
393
# and download the file locally
394
395
# copy file into bucket.
396
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
397
p = aws('s3 cp %s s3://%s/foo.txt' % (foo_txt, bucket_name))
398
self.assert_no_errors(p)
399
400
# Make sure object is in bucket.
401
self.assertTrue(self.key_exists(bucket_name, key_name='foo.txt'))
402
self.assertEqual(
403
self.get_key_contents(bucket_name, key_name='foo.txt'),
404
'this is foo.txt')
405
406
self.assertEqual(
407
self.content_type_for_key(bucket_name, key_name='foo.txt'),
408
'text/plain')
409
410
# Make a new name for the file and copy it locally.
411
full_path = self.files.full_path('bar.txt')
412
p = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, full_path))
413
self.assert_no_errors(p)
414
415
with open(full_path, 'r') as f:
416
self.assertEqual(f.read(), 'this is foo.txt')
417
418
def test_cp_to_and_from_s3(self):
419
self.assert_cp_to_and_from_s3(_SHARED_BUCKET)
420
421
def test_cp_to_and_from_s3_express(self):
422
self.assert_cp_to_and_from_s3(_SHARED_DIR_BUCKET)
423
424
def test_cp_without_trailing_slash(self):
425
# There's a unit test for this, but we still want to verify this
426
# with an integration test.
427
bucket_name = _SHARED_BUCKET
428
429
# copy file into bucket.
430
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
431
# Note that the destination has no trailing slash.
432
p = aws('s3 cp %s s3://%s' % (foo_txt, bucket_name))
433
self.assert_no_errors(p)
434
435
# Make sure object is in bucket.
436
self.assertTrue(self.key_exists(bucket_name, key_name='foo.txt'))
437
self.assertEqual(
438
self.get_key_contents(bucket_name, key_name='foo.txt'),
439
'this is foo.txt')
440
441
@pytest.mark.slow
442
def test_cp_s3_s3_multipart(self):
443
from_bucket = _SHARED_BUCKET
444
to_bucket = self.create_bucket()
445
file_contents = BytesIO(b'abcd' * (1024 * 1024 * 10))
446
self.put_object(from_bucket, 'foo.txt', file_contents)
447
448
p = aws('s3 cp s3://%s/foo.txt s3://%s/foo.txt' %
449
(from_bucket, to_bucket))
450
self.assert_no_errors(p)
451
self.assert_key_contents_equal(to_bucket, 'foo.txt', file_contents)
452
self.assertTrue(self.key_exists(from_bucket, key_name='foo.txt'))
453
454
def test_guess_mime_type(self):
455
bucket_name = _SHARED_BUCKET
456
bar_png = self.files.create_file('bar.jpeg', 'fake png image')
457
p = aws('s3 cp %s s3://%s/bar.jpeg' % (bar_png, bucket_name))
458
self.assert_no_errors(p)
459
460
# We should have correctly guessed the content type based on the
461
# filename extension.
462
self.assertEqual(
463
self.content_type_for_key(bucket_name, key_name='bar.jpeg'),
464
'image/jpeg')
465
466
@pytest.mark.slow
467
def test_download_large_file(self):
468
# This will force a multipart download.
469
bucket_name = _SHARED_BUCKET
470
foo_contents = BytesIO(b'abcd' * (1024 * 1024 * 10))
471
self.put_object(bucket_name, key_name='foo.txt',
472
contents=foo_contents)
473
local_foo_txt = self.files.full_path('foo.txt')
474
p = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, local_foo_txt))
475
self.assert_no_errors(p)
476
self.assertEqual(os.path.getsize(local_foo_txt),
477
len(foo_contents.getvalue()))
478
479
@pytest.mark.slow
480
@skip_if_windows('SIGINT not supported on Windows.')
481
def test_download_ctrl_c_does_not_hang(self):
482
bucket_name = _SHARED_BUCKET
483
foo_contents = BytesIO(b'abcd' * (1024 * 1024 * 40))
484
self.put_object(bucket_name, key_name='foo.txt',
485
contents=foo_contents)
486
local_foo_txt = self.files.full_path('foo.txt')
487
# --quiet is added to make sure too much output is not communicated
488
# to the PIPE, causing a deadlock when not consumed.
489
process = aws('s3 cp s3://%s/foo.txt %s --quiet' %
490
(bucket_name, local_foo_txt), wait_for_finish=False)
491
# Give it some time to start up and enter it's main task loop.
492
time.sleep(3)
493
# The process has 60 seconds to finish after being sent a Ctrl+C,
494
# otherwise the test fails.
495
process.send_signal(signal.SIGINT)
496
wait_for_process_exit(process, timeout=60)
497
# A Ctrl+C should have a non-zero RC.
498
# We either caught the process in
499
# its main polling loop (rc=1), or it was successfully terminated by
500
# the SIGINT (rc=-2).
501
#
502
# There is also the chance the interrupt happened before the transfer
503
# process started or even after transfer process finished. So the
504
# signal may have never been encountered, resulting in an rc of 0.
505
# Therefore, it is acceptable to have an rc of 0 as the important part
506
# about this test is that it does not hang.
507
self.assertIn(process.returncode, [0, 1, -2])
508
509
@pytest.mark.slow
510
@skip_if_windows('SIGINT not supported on Windows.')
511
def test_cleans_up_aborted_uploads(self):
512
bucket_name = _SHARED_BUCKET
513
foo_txt = self.files.create_file('foo.txt', '')
514
with open(foo_txt, 'wb') as f:
515
for i in range(20):
516
f.write(b'a' * 1024 * 1024)
517
# --quiet is added to make sure too much output is not communicated
518
# to the PIPE, causing a deadlock when not consumed.
519
process = aws('s3 cp %s s3://%s/ --quiet' % (foo_txt, bucket_name),
520
wait_for_finish=False)
521
time.sleep(3)
522
# The process has 60 seconds to finish after being sent a Ctrl+C,
523
# otherwise the test fails.
524
process.send_signal(signal.SIGINT)
525
wait_for_process_exit(process, timeout=60)
526
uploads_after = self.client.list_multipart_uploads(
527
Bucket=bucket_name).get('Uploads', [])
528
self.assertEqual(uploads_after, [],
529
"Not all multipart uploads were properly "
530
"aborted after receiving Ctrl-C: %s" % uploads_after)
531
532
def test_cp_to_nonexistent_bucket(self):
533
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
534
p = aws(f's3 cp {foo_txt} s3://{_NON_EXISTENT_BUCKET}/foo.txt')
535
self.assertEqual(p.rc, 1)
536
537
def test_cp_empty_file(self):
538
bucket_name = _SHARED_BUCKET
539
foo_txt = self.files.create_file('foo.txt', contents='')
540
p = aws('s3 cp %s s3://%s/' % (foo_txt, bucket_name))
541
self.assertEqual(p.rc, 0)
542
self.assertNotIn('failed', p.stderr)
543
self.assertTrue(self.key_exists(bucket_name, 'foo.txt'))
544
545
def test_download_non_existent_key(self):
546
p = aws(f's3 cp s3://{_NON_EXISTENT_BUCKET}/foo.txt foo.txt')
547
self.assertEqual(p.rc, 1)
548
expected_err_msg = (
549
'An error occurred (404) when calling the '
550
'HeadObject operation: Key "foo.txt" does not exist')
551
self.assertIn(expected_err_msg, p.stderr)
552
553
def test_download_encrypted_kms_object(self):
554
bucket_name = self.create_bucket(region='eu-central-1')
555
extra_args = {
556
'ServerSideEncryption': 'aws:kms',
557
'SSEKMSKeyId': 'alias/aws/s3'
558
}
559
object_name = 'foo.txt'
560
contents = 'this is foo.txt'
561
self.put_object(bucket_name, object_name, contents,
562
extra_args=extra_args)
563
local_filename = self.files.full_path('foo.txt')
564
p = aws('s3 cp s3://%s/%s %s --region eu-central-1' %
565
(bucket_name, object_name, local_filename))
566
self.assertEqual(p.rc, 0)
567
# Assert that the file was downloaded properly.
568
with open(local_filename, 'r') as f:
569
self.assertEqual(f.read(), contents)
570
571
def test_download_empty_object(self):
572
bucket_name = _SHARED_BUCKET
573
object_name = 'empty-object'
574
self.put_object(bucket_name, object_name, '')
575
local_filename = self.files.full_path('empty.txt')
576
p = aws('s3 cp s3://%s/%s %s' % (
577
bucket_name, object_name, local_filename))
578
self.assertEqual(p.rc, 0)
579
# Assert that the file was downloaded and has no content.
580
with open(local_filename, 'r') as f:
581
self.assertEqual(f.read(), '')
582
583
def test_website_redirect_ignore_paramfile(self):
584
bucket_name = _SHARED_BUCKET
585
foo_txt = self.files.create_file('foo.txt', 'bar')
586
website_redirect = 'http://someserver'
587
p = aws('s3 cp %s s3://%s/foo.txt --website-redirect %s' %
588
(foo_txt, bucket_name, website_redirect))
589
self.assert_no_errors(p)
590
591
# Ensure that the web address is used as opposed to the contents
592
# of the web address. We can check via a head object.
593
response = self.head_object(bucket_name, 'foo.txt')
594
self.assertEqual(response['WebsiteRedirectLocation'], website_redirect)
595
596
@pytest.mark.slow
597
def test_copy_large_file_signature_v4(self):
598
# Just verify that we can upload a large file to a region
599
# that uses signature version 4.
600
bucket_name = self.create_bucket(region='eu-central-1')
601
num_mb = 200
602
foo_txt = self.files.create_file('foo.txt', '')
603
with open(foo_txt, 'wb') as f:
604
for i in range(num_mb):
605
f.write(b'a' * 1024 * 1024)
606
607
p = aws('s3 cp %s s3://%s/ --region eu-central-1' % (
608
foo_txt, bucket_name))
609
self.assert_no_errors(p)
610
self.assertTrue(self.key_exists(bucket_name, key_name='foo.txt'))
611
612
def test_copy_metadata(self):
613
# Copy the same style of parsing as the CLI session. This is needed
614
# For comparing expires timestamp.
615
add_scalar_parsers(self.session)
616
bucket_name = _SHARED_BUCKET
617
key = random_chars(6)
618
filename = self.files.create_file(key, contents='')
619
p = aws('s3 cp %s s3://%s/%s --metadata keyname=value' %
620
(filename, bucket_name, key))
621
self.assert_no_errors(p)
622
response = self.head_object(bucket_name, key)
623
# These values should have the metadata of the source object
624
self.assertEqual(response['Metadata'].get('keyname'), 'value')
625
626
def test_copy_metadata_directive(self):
627
# Copy the same style of parsing as the CLI session. This is needed
628
# For comparing expires timestamp.
629
self.override_parser(timestamp_parser=identity)
630
bucket_name = _SHARED_BUCKET
631
original_key = '%s-a' % random_chars(6)
632
new_key = '%s-b' % random_chars(6)
633
metadata = {
634
'ContentType': 'foo',
635
'ContentDisposition': 'foo',
636
'ContentEncoding': 'foo',
637
'ContentLanguage': 'foo',
638
'CacheControl': '90',
639
'Expires': '0'
640
}
641
self.put_object(bucket_name, original_key, contents='foo',
642
extra_args=metadata)
643
p = aws('s3 cp s3://%s/%s s3://%s/%s' %
644
(bucket_name, original_key, bucket_name, new_key))
645
self.assert_no_errors(p)
646
response = self.head_object(bucket_name, new_key)
647
# These values should have the metadata of the source object
648
metadata_ref = copy.copy(metadata)
649
metadata_ref['Expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
650
for name, value in metadata_ref.items():
651
self.assertEqual(response[name], value)
652
653
# Use REPLACE to wipe out all of the metadata when copying to a new
654
# key.
655
new_key = '%s-c' % random_chars(6)
656
p = aws('s3 cp s3://%s/%s s3://%s/%s --metadata-directive REPLACE' %
657
(bucket_name, original_key, bucket_name, new_key))
658
self.assert_no_errors(p)
659
response = self.head_object(bucket_name, new_key)
660
# Make sure all of the original metadata is gone.
661
for name, value in metadata_ref.items():
662
self.assertNotEqual(response.get(name), value)
663
664
# Use REPLACE to wipe out all of the metadata but include a new
665
# metadata value.
666
new_key = '%s-d' % random_chars(6)
667
p = aws('s3 cp s3://%s/%s s3://%s/%s --metadata-directive REPLACE '
668
'--content-type bar' %
669
(bucket_name, original_key, bucket_name, new_key))
670
self.assert_no_errors(p)
671
response = self.head_object(bucket_name, new_key)
672
# Make sure the content type metadata is included
673
self.assertEqual(response['ContentType'], 'bar')
674
# Make sure all of the original metadata is gone.
675
for name, value in metadata_ref.items():
676
self.assertNotEqual(response.get(name), value)
677
678
def test_cp_with_request_payer(self):
679
bucket_name = _SHARED_BUCKET
680
681
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
682
p = aws('s3 cp %s s3://%s/mykey --request-payer' % (
683
foo_txt, bucket_name))
684
685
# From the S3 API, the only way to for sure know that request payer is
686
# working is to set up a bucket with request payer and have another
687
# account with permissions make a request to that bucket. If they
688
# do not include request payer, they will get an access denied error.
689
# Setting this up for an integration test would be tricky as it
690
# requires having/creating another account outside of the one running
691
# the integration tests. So instead at the very least we want to
692
# make sure we can use the parameter, have the command run
693
# successfully, and correctly upload the key to S3.
694
self.assert_no_errors(p)
695
self.assertTrue(self.key_exists(bucket_name, key_name='mykey'))
696
self.assertEqual(
697
self.get_key_contents(bucket_name, key_name='mykey'),
698
'this is foo.txt')
699
700
701
class TestSync(BaseS3IntegrationTest):
702
def test_sync_with_plus_chars_paginate(self):
703
# This test ensures pagination tokens are url decoded.
704
# 1. Create > 2 files with '+' in the filename.
705
# 2. Sync up to s3 while the page size is 2.
706
# 3. Sync up to s3 while the page size is 2.
707
# 4. Verify nothing was synced up down from s3 in step 3.
708
bucket_name = _SHARED_BUCKET
709
filenames = []
710
for i in range(4):
711
# Create a file with a space char and a '+' char in the filename.
712
# We're interested in testing the filename comparisons, not the
713
# mtime comparisons so we're setting the mtime to some time
714
# in the past to avoid mtime comparisons interfering with
715
# test results.
716
mtime = time.time() - 300
717
filenames.append(
718
self.files.create_file('foo +%06d' % i,
719
contents='',
720
mtime=mtime))
721
p = aws('s3 sync %s s3://%s/ --page-size 2' %
722
(self.files.rootdir, bucket_name))
723
self.assert_no_errors(p)
724
time.sleep(1)
725
p2 = aws('s3 sync %s s3://%s/ --page-size 2'
726
% (self.files.rootdir, bucket_name))
727
self.assertNotIn('upload:', p2.stdout)
728
self.assertEqual('', p2.stdout)
729
730
def test_s3_to_s3_sync_with_plus_char_paginate(self):
731
keynames = []
732
for i in range(4):
733
keyname = 'foo+%d' % i
734
keynames.append(keyname)
735
self.files.create_file(keyname, contents='')
736
737
bucket_name = _SHARED_BUCKET
738
bucket_name_2 = self.create_bucket()
739
740
p = aws('s3 sync %s s3://%s' % (self.files.rootdir, bucket_name))
741
self.assert_no_errors(p)
742
for key in keynames:
743
self.assertTrue(self.key_exists(bucket_name, key))
744
745
p = aws('s3 sync s3://%s/ s3://%s/ --page-size 2' %
746
(bucket_name, bucket_name_2))
747
self.assert_no_errors(p)
748
for key in keynames:
749
self.assertTrue(self.key_exists(bucket_name_2, key))
750
751
p2 = aws('s3 sync s3://%s/ s3://%s/ --page-size 2' %
752
(bucket_name, bucket_name_2))
753
self.assertNotIn('copy:', p2.stdout)
754
self.assertEqual('', p2.stdout)
755
756
def test_sync_no_resync(self):
757
self.files.create_file('xyz123456789', contents='test1')
758
self.files.create_file(os.path.join('xyz1', 'test'), contents='test2')
759
self.files.create_file(os.path.join('xyz', 'test'), contents='test3')
760
bucket_name = _SHARED_BUCKET
761
762
p = aws('s3 sync %s s3://%s' % (self.files.rootdir, bucket_name))
763
self.assert_no_errors(p)
764
time.sleep(2)
765
self.assertTrue(self.key_exists(bucket_name, 'xyz123456789'))
766
self.assertTrue(self.key_exists(bucket_name, 'xyz1/test'))
767
self.assertTrue(self.key_exists(bucket_name, 'xyz/test'))
768
769
p2 = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name))
770
self.assertNotIn('upload:', p2.stdout)
771
self.assertEqual('', p2.stdout)
772
773
def test_sync_to_from_s3(self):
774
bucket_name = _SHARED_BUCKET
775
foo_txt = self.files.create_file('foo.txt', 'foo contents')
776
bar_txt = self.files.create_file('bar.txt', 'bar contents')
777
778
# Sync the directory and the bucket.
779
p = aws('s3 sync %s s3://%s' % (self.files.rootdir, bucket_name))
780
self.assert_no_errors(p)
781
782
# Ensure both files are in the bucket.
783
self.assertTrue(self.key_exists(bucket_name, 'foo.txt'))
784
self.assertTrue(self.key_exists(bucket_name, 'bar.txt'))
785
786
# Sync back down. First remote the local files.
787
os.remove(foo_txt)
788
os.remove(bar_txt)
789
p = aws('s3 sync s3://%s %s' % (bucket_name, self.files.rootdir))
790
# The files should be back now.
791
self.assertTrue(os.path.isfile(foo_txt))
792
self.assertTrue(os.path.isfile(bar_txt))
793
with open(foo_txt, 'r') as f:
794
self.assertEqual(f.read(), 'foo contents')
795
with open(bar_txt, 'r') as f:
796
self.assertEqual(f.read(), 'bar contents')
797
798
def test_sync_to_nonexistent_bucket(self):
799
self.files.create_file('foo.txt', 'foo contents')
800
self.files.create_file('bar.txt', 'bar contents')
801
802
# Sync the directory and the bucket.
803
p = aws('s3 sync %s s3://noexist-bkt-nme-1412' % (self.files.rootdir,))
804
self.assertEqual(p.rc, 1)
805
806
def test_sync_with_empty_files(self):
807
self.files.create_file('foo.txt', 'foo contents')
808
self.files.create_file('bar.txt', contents='')
809
bucket_name = _SHARED_BUCKET
810
p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name))
811
self.assertEqual(p.rc, 0)
812
self.assertNotIn('failed', p.stderr)
813
self.assertTrue(
814
self.key_exists(bucket_name=bucket_name, key_name='bar.txt'))
815
816
def test_sync_with_delete_option_with_same_prefix(self):
817
# Test for issue 440 (https://github.com/aws/aws-cli/issues/440)
818
# First, we need to create a directory structure that has a dir with
819
# the same prefix as some of the files:
820
#
821
# test/foo.txt
822
# test-123.txt
823
# test-321.txt
824
# test.txt
825
bucket_name = _SHARED_BUCKET
826
# create test/foo.txt
827
nested_dir = os.path.join(self.files.rootdir, 'test')
828
os.mkdir(nested_dir)
829
self.files.create_file(os.path.join(nested_dir, 'foo.txt'),
830
contents='foo.txt contents')
831
# Then create test-123.txt, test-321.txt, test.txt.
832
self.files.create_file('test-123.txt', 'test-123.txt contents')
833
self.files.create_file('test-321.txt', 'test-321.txt contents')
834
self.files.create_file('test.txt', 'test.txt contents')
835
836
# Now sync this content up to s3.
837
# Allow settling time so that we have a different time between
838
# source and destination.
839
time.sleep(2)
840
p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name))
841
self.assert_no_errors(p)
842
843
# Now here's the issue. If we try to sync the contents down
844
# with the --delete flag we should *not* see any output, the
845
# sync operation should determine that nothing is different and
846
# therefore do nothing. We can just use --dryrun to show the issue.
847
p = aws('s3 sync s3://%s/ %s --dryrun --delete' % (
848
bucket_name, self.files.rootdir))
849
self.assert_no_errors(p)
850
# These assertion methods will give better error messages than just
851
# checking if the output is empty.
852
self.assertNotIn('download:', p.stdout)
853
self.assertNotIn('delete:', p.stdout)
854
self.assertEqual('', p.stdout)
855
856
def test_sync_with_delete_across_sig4_regions(self):
857
src_region = 'us-west-2'
858
dst_region = 'eu-central-1'
859
860
src_bucket = self.create_bucket(region=src_region)
861
dst_bucket = self.create_bucket(region=dst_region)
862
863
src_key_name = 'hello.txt'
864
self.files.create_file(src_key_name, contents='hello')
865
866
p = aws('s3 sync %s s3://%s --region %s' %
867
(self.files.rootdir, src_bucket, src_region))
868
self.assert_no_errors(p)
869
self.assertTrue(self.key_exists(src_bucket, src_key_name))
870
871
self.files.remove_all()
872
873
dst_key_name = 'goodbye.txt'
874
self.files.create_file(dst_key_name, contents='goodbye')
875
876
p = aws('s3 sync %s s3://%s --region %s' %
877
(self.files.rootdir, dst_bucket, dst_region))
878
self.assert_no_errors(p)
879
self.assertTrue(self.key_exists(dst_bucket, dst_key_name))
880
self.assertTrue(self.key_not_exists(dst_bucket, src_key_name))
881
882
p = aws('s3 sync --delete s3://%s s3://%s '
883
'--source-region %s --region %s' %
884
(src_bucket, dst_bucket, src_region, dst_region))
885
self.assert_no_errors(p)
886
887
self.assertTrue(self.key_exists(src_bucket, src_key_name))
888
self.assertTrue(self.key_exists(dst_bucket, src_key_name))
889
self.assertTrue(self.key_not_exists(src_bucket, dst_key_name))
890
self.assertTrue(self.key_not_exists(dst_bucket, dst_key_name))
891
892
def test_sync_delete_locally(self):
893
bucket_name = _SHARED_BUCKET
894
file_to_delete = self.files.create_file(
895
'foo.txt', contents='foo contents')
896
self.put_object(bucket_name, 'bar.txt', contents='bar contents')
897
898
p = aws('s3 sync s3://%s/ %s --delete' % (
899
bucket_name, self.files.rootdir))
900
self.assert_no_errors(p)
901
902
# Make sure the uploaded file got downloaded and the previously
903
# existing local file got deleted
904
self.assertTrue(os.path.exists(
905
os.path.join(self.files.rootdir, 'bar.txt')))
906
self.assertFalse(os.path.exists(file_to_delete))
907
908
909
class TestSourceRegion(BaseS3IntegrationTest):
910
def extra_setup(self):
911
name_comp = []
912
# This creates a non DNS compatible bucket name by making two random
913
# sequences of characters and joining them with a period and
914
# adding a .com at the end.
915
for i in range(2):
916
name_comp.append(random_chars(10))
917
self.src_name = '.'.join(name_comp + ['com'])
918
name_comp = []
919
for i in range(2):
920
name_comp.append(random_chars(10))
921
self.dest_name = '.'.join(name_comp + ['com'])
922
self.src_region = 'us-west-1'
923
self.dest_region = 'us-east-1'
924
self.src_bucket = self.create_bucket(self.src_name, self.src_region)
925
self.dest_bucket = self.create_bucket(self.dest_name, self.dest_region)
926
927
def test_cp_region(self):
928
self.files.create_file('foo.txt', 'foo')
929
p = aws('s3 sync %s s3://%s/ --region %s' %
930
(self.files.rootdir, self.src_bucket, self.src_region))
931
self.assert_no_errors(p)
932
p2 = aws('s3 cp s3://%s/ s3://%s/ --region %s --source-region %s '
933
'--recursive' %
934
(self.src_bucket, self.dest_bucket, self.dest_region,
935
self.src_region))
936
self.assertEqual(p2.rc, 0, p2.stdout)
937
self.assertTrue(
938
self.key_exists(bucket_name=self.dest_bucket, key_name='foo.txt'))
939
940
def test_sync_region(self):
941
self.files.create_file('foo.txt', 'foo')
942
p = aws('s3 sync %s s3://%s/ --region %s' %
943
(self.files.rootdir, self.src_bucket, self.src_region))
944
self.assert_no_errors(p)
945
p2 = aws('s3 sync s3://%s/ s3://%s/ --region %s --source-region %s ' %
946
(self.src_bucket, self.dest_bucket, self.dest_region,
947
self.src_region))
948
self.assertEqual(p2.rc, 0, p2.stdout)
949
self.assertTrue(
950
self.key_exists(bucket_name=self.dest_bucket, key_name='foo.txt'))
951
952
def test_mv_region(self):
953
self.files.create_file('foo.txt', 'foo')
954
p = aws('s3 sync %s s3://%s/ --region %s' %
955
(self.files.rootdir, self.src_bucket, self.src_region))
956
self.assert_no_errors(p)
957
p2 = aws('s3 mv s3://%s/ s3://%s/ --region %s --source-region %s '
958
'--recursive' %
959
(self.src_bucket, self.dest_bucket, self.dest_region,
960
self.src_region))
961
self.assertEqual(p2.rc, 0, p2.stdout)
962
self.assertTrue(
963
self.key_exists(bucket_name=self.dest_bucket, key_name='foo.txt'))
964
self.assertTrue(
965
self.key_not_exists(
966
bucket_name=self.src_bucket, key_name='foo.txt'))
967
968
@pytest.mark.slow
969
def test_mv_large_file_region(self):
970
foo_txt = self.files.create_file('foo.txt', 'a' * 1024 * 1024 * 10)
971
p = aws('s3 cp %s s3://%s/foo.txt --region %s' %
972
(foo_txt, self.src_bucket, self.src_region))
973
self.assert_no_errors(p)
974
975
p2 = aws(
976
's3 mv s3://%s/foo.txt s3://%s/ --region %s --source-region %s ' %
977
(self.src_bucket, self.dest_bucket, self.dest_region,
978
self.src_region)
979
)
980
self.assert_no_errors(p2)
981
self.assertTrue(
982
self.key_exists(bucket_name=self.dest_bucket, key_name='foo.txt'))
983
self.assertTrue(
984
self.key_not_exists(
985
bucket_name=self.src_bucket, key_name='foo.txt'))
986
987
988
class TestWarnings(BaseS3IntegrationTest):
989
def test_no_exist(self):
990
bucket_name = _SHARED_BUCKET
991
filename = os.path.join(self.files.rootdir, "no-exists-file")
992
p = aws('s3 cp %s s3://%s/' % (filename, bucket_name))
993
# If the local path provided by the user is nonexistent for an
994
# upload, this should error out.
995
self.assertEqual(p.rc, 255, p.stderr)
996
self.assertIn('The user-provided path %s does not exist.' %
997
filename, p.stderr)
998
999
@skip_if_windows('Read permissions tests only supported on mac/linux')
1000
def test_no_read_access(self):
1001
if os.geteuid() == 0:
1002
self.skipTest('Cannot completely remove read access as root user.')
1003
bucket_name = _SHARED_BUCKET
1004
self.files.create_file('foo.txt', 'foo')
1005
filename = os.path.join(self.files.rootdir, 'foo.txt')
1006
permissions = stat.S_IMODE(os.stat(filename).st_mode)
1007
# Remove read permissions
1008
permissions = permissions ^ stat.S_IREAD
1009
os.chmod(filename, permissions)
1010
p = aws('s3 cp %s s3://%s/' % (filename, bucket_name))
1011
self.assertEqual(p.rc, 2, p.stderr)
1012
self.assertIn('warning: Skipping file %s. File/Directory is '
1013
'not readable.' % filename, p.stderr)
1014
1015
@skip_if_windows('Special files only supported on mac/linux')
1016
def test_is_special_file(self):
1017
bucket_name = _SHARED_BUCKET
1018
file_path = os.path.join(self.files.rootdir, 'foo')
1019
# Use socket for special file.
1020
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1021
sock.bind(file_path)
1022
p = aws('s3 cp %s s3://%s/' % (file_path, bucket_name))
1023
self.assertEqual(p.rc, 2, p.stderr)
1024
self.assertIn(("warning: Skipping file %s. File is character "
1025
"special device, block special device, FIFO, or "
1026
"socket." % file_path), p.stderr)
1027
1028
1029
class TestUnableToWriteToFile(BaseS3IntegrationTest):
1030
1031
@skip_if_windows('Write permissions tests only supported on mac/linux')
1032
def test_no_write_access_small_file(self):
1033
bucket_name = _SHARED_BUCKET
1034
if os.geteuid() == 0:
1035
self.skipTest(
1036
'Cannot completely remove write access as root user.')
1037
os.chmod(self.files.rootdir, 0o444)
1038
self.put_object(bucket_name, 'foo.txt',
1039
contents='Hello world')
1040
p = aws('s3 cp s3://%s/foo.txt %s' % (
1041
bucket_name, os.path.join(self.files.rootdir, 'foo.txt')))
1042
self.assertEqual(p.rc, 1)
1043
self.assertIn('download failed', p.stderr)
1044
1045
@skip_if_windows('Write permissions tests only supported on mac/linux')
1046
def test_no_write_access_large_file(self):
1047
if os.geteuid() == 0:
1048
self.skipTest(
1049
'Cannot completely remove write access as root user.')
1050
bucket_name = _SHARED_BUCKET
1051
# We have to use a file like object because using a string
1052
# would result in the header + body sent as a single packet
1053
# which effectively disables the expect 100 continue logic.
1054
# This will result in a test error because we won't follow
1055
# the temporary redirect for the newly created bucket.
1056
contents = BytesIO(b'a' * 10 * 1024 * 1024)
1057
self.put_object(bucket_name, 'foo.txt',
1058
contents=contents)
1059
os.chmod(self.files.rootdir, 0o444)
1060
p = aws('s3 cp s3://%s/foo.txt %s' % (
1061
bucket_name, os.path.join(self.files.rootdir, 'foo.txt')))
1062
self.assertEqual(p.rc, 1)
1063
self.assertIn('download failed', p.stderr)
1064
1065
1066
@skip_if_windows('Symlink tests only supported on mac/linux')
1067
class TestSymlinks(BaseS3IntegrationTest):
1068
"""
1069
This class test the ability to follow or not follow symlinks.
1070
"""
1071
def extra_setup(self):
1072
self.bucket_name = _SHARED_BUCKET
1073
self.nested_dir = os.path.join(self.files.rootdir, 'realfiles')
1074
os.mkdir(self.nested_dir)
1075
self.sample_file = \
1076
self.files.create_file(os.path.join(self.nested_dir, 'foo.txt'),
1077
contents='foo.txt contents')
1078
# Create a symlink to foo.txt.
1079
os.symlink(self.sample_file, os.path.join(self.files.rootdir,
1080
'a-goodsymlink'))
1081
# Create a bad symlink.
1082
os.symlink('non-existent-file', os.path.join(self.files.rootdir,
1083
'b-badsymlink'))
1084
# Create a symlink to directory where foo.txt is.
1085
os.symlink(self.nested_dir, os.path.join(self.files.rootdir,
1086
'c-goodsymlink'))
1087
1088
def test_no_follow_symlinks(self):
1089
p = aws('s3 sync %s s3://%s/ --no-follow-symlinks' % (
1090
self.files.rootdir, self.bucket_name))
1091
self.assert_no_errors(p)
1092
self.assertTrue(self.key_not_exists(self.bucket_name,
1093
'a-goodsymlink'))
1094
self.assertTrue(self.key_not_exists(self.bucket_name,
1095
'b-badsymlink'))
1096
self.assertTrue(self.key_not_exists(self.bucket_name,
1097
'c-goodsymlink/foo.txt'))
1098
self.assertEqual(self.get_key_contents(self.bucket_name,
1099
key_name='realfiles/foo.txt'),
1100
'foo.txt contents')
1101
1102
def test_follow_symlinks(self):
1103
# Get rid of the bad symlink first.
1104
os.remove(os.path.join(self.files.rootdir, 'b-badsymlink'))
1105
p = aws('s3 sync %s s3://%s/ --follow-symlinks' %
1106
(self.files.rootdir, self.bucket_name))
1107
self.assert_no_errors(p)
1108
self.assertEqual(self.get_key_contents(self.bucket_name,
1109
key_name='a-goodsymlink'),
1110
'foo.txt contents')
1111
self.assertTrue(self.key_not_exists(self.bucket_name,
1112
'b-badsymlink'))
1113
self.assertEqual(
1114
self.get_key_contents(self.bucket_name,
1115
key_name='c-goodsymlink/foo.txt'),
1116
'foo.txt contents')
1117
self.assertEqual(self.get_key_contents(self.bucket_name,
1118
key_name='realfiles/foo.txt'),
1119
'foo.txt contents')
1120
1121
def test_follow_symlinks_default(self):
1122
# Get rid of the bad symlink first.
1123
os.remove(os.path.join(self.files.rootdir, 'b-badsymlink'))
1124
p = aws('s3 sync %s s3://%s/' %
1125
(self.files.rootdir, self.bucket_name))
1126
self.assert_no_errors(p)
1127
self.assertEqual(self.get_key_contents(self.bucket_name,
1128
key_name='a-goodsymlink'),
1129
'foo.txt contents')
1130
self.assertTrue(self.key_not_exists(self.bucket_name,
1131
'b-badsymlink'))
1132
self.assertEqual(
1133
self.get_key_contents(self.bucket_name,
1134
key_name='c-goodsymlink/foo.txt'),
1135
'foo.txt contents')
1136
self.assertEqual(self.get_key_contents(self.bucket_name,
1137
key_name='realfiles/foo.txt'),
1138
'foo.txt contents')
1139
1140
def test_bad_symlink(self):
1141
p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, self.bucket_name))
1142
self.assertEqual(p.rc, 2, p.stderr)
1143
self.assertIn('warning: Skipping file %s. File does not exist.' %
1144
os.path.join(self.files.rootdir, 'b-badsymlink'),
1145
p.stderr)
1146
1147
1148
class TestUnicode(BaseS3IntegrationTest):
1149
"""
1150
The purpose of these tests are to ensure that the commands can handle
1151
unicode characters in both keyname and from those generated for both
1152
uploading and downloading files.
1153
"""
1154
def test_cp(self):
1155
bucket_name = _SHARED_BUCKET
1156
local_example1_txt = \
1157
self.files.create_file(u'\u00e9xample.txt', 'example1 contents')
1158
s3_example1_txt = 's3://%s/%s' % (bucket_name,
1159
os.path.basename(local_example1_txt))
1160
local_example2_txt = self.files.full_path(u'\u00e9xample2.txt')
1161
1162
p = aws('s3 cp %s %s' % (local_example1_txt, s3_example1_txt))
1163
self.assert_no_errors(p)
1164
1165
# Download the file to the second example2.txt filename.
1166
p = aws('s3 cp %s %s --quiet' % (s3_example1_txt, local_example2_txt))
1167
self.assert_no_errors(p)
1168
with open(local_example2_txt, 'rb') as f:
1169
self.assertEqual(f.read(), b'example1 contents')
1170
1171
def test_recursive_cp(self):
1172
bucket_name = _SHARED_BUCKET
1173
local_example1_txt = self.files.create_file(u'\u00e9xample.txt',
1174
'example1 contents')
1175
local_example2_txt = self.files.create_file(u'\u00e9xample2.txt',
1176
'example2 contents')
1177
p = aws('s3 cp %s s3://%s --recursive --quiet' % (
1178
self.files.rootdir, bucket_name))
1179
self.assert_no_errors(p)
1180
1181
os.remove(local_example1_txt)
1182
os.remove(local_example2_txt)
1183
1184
p = aws('s3 cp s3://%s %s --recursive --quiet' % (
1185
bucket_name, self.files.rootdir))
1186
self.assert_no_errors(p)
1187
self.assertEqual(open(local_example1_txt).read(), 'example1 contents')
1188
self.assertEqual(open(local_example2_txt).read(), 'example2 contents')
1189
1190
1191
class TestLs(BaseS3IntegrationTest):
1192
"""
1193
This tests using the ``ls`` command.
1194
"""
1195
1196
def assert_ls_with_prefix(self, bucket_name):
1197
self.put_object(bucket_name, 'foo.txt', 'contents')
1198
self.put_object(bucket_name, 'foo', 'contents')
1199
self.put_object(bucket_name, 'bar.txt', 'contents')
1200
self.put_object(bucket_name, 'subdir/foo.txt', 'contents')
1201
p = aws('s3 ls s3://%s' % bucket_name)
1202
self.assertIn('PRE subdir/', p.stdout)
1203
self.assertIn('8 foo.txt', p.stdout)
1204
self.assertIn('8 foo', p.stdout)
1205
self.assertIn('8 bar.txt', p.stdout)
1206
1207
def assert_ls_recursive(self, bucket_name):
1208
self.put_object(bucket_name, 'foo.txt', 'contents')
1209
self.put_object(bucket_name, 'foo', 'contents')
1210
self.put_object(bucket_name, 'bar.txt', 'contents')
1211
self.put_object(bucket_name, 'subdir/foo.txt', 'contents')
1212
p = aws('s3 ls s3://%s --recursive' % bucket_name)
1213
self.assertIn('8 foo.txt', p.stdout)
1214
self.assertIn('8 foo', p.stdout)
1215
self.assertIn('8 bar.txt', p.stdout)
1216
self.assertIn('8 subdir/foo.txt', p.stdout)
1217
1218
def test_ls_bucket(self):
1219
p = aws('s3 ls')
1220
self.assert_no_errors(p)
1221
1222
def test_ls_with_no_env_vars(self):
1223
# By default, the aws() function injects
1224
# an AWS_DEFAULT_REGION into the env var of the
1225
# process. We're verifying that a region does *not*
1226
# need to be set anywhere. If we provide our
1227
# own environ dict, then the aws() function won't
1228
# inject a region.
1229
env = os.environ.copy()
1230
p = aws('s3 ls', env_vars=env)
1231
self.assert_no_errors(p)
1232
1233
def test_ls_bucket_with_s3_prefix(self):
1234
p = aws('s3 ls s3://')
1235
self.assert_no_errors(p)
1236
1237
def test_ls_non_existent_bucket(self):
1238
p = aws(f's3 ls s3://{_NON_EXISTENT_BUCKET}')
1239
self.assertEqual(p.rc, 255)
1240
self.assertIn(
1241
('An error occurred (NoSuchBucket) when calling the '
1242
'ListObjectsV2 operation: The specified bucket does not exist'),
1243
p.stderr)
1244
# There should be no stdout if we can't find the bucket.
1245
self.assertEqual(p.stdout, '')
1246
1247
def test_ls_with_prefix(self):
1248
self.assert_ls_with_prefix(_SHARED_BUCKET)
1249
1250
def test_s3_express_ls_with_prefix(self):
1251
self.assert_ls_with_prefix(_SHARED_DIR_BUCKET)
1252
1253
def test_ls_recursive(self):
1254
self.assert_ls_recursive(_SHARED_BUCKET)
1255
1256
def test_s3_express_ls_recursive(self):
1257
self.assert_ls_recursive(_SHARED_DIR_BUCKET)
1258
1259
def test_ls_without_prefix(self):
1260
# The ls command does not require an s3:// prefix,
1261
# we're always listing s3 contents.
1262
bucket_name = _SHARED_BUCKET
1263
self.put_object(bucket_name, 'foo.txt', 'contents')
1264
p = aws('s3 ls %s' % bucket_name)
1265
self.assertEqual(p.rc, 0)
1266
self.assertIn('foo.txt', p.stdout)
1267
1268
def test_only_prefix(self):
1269
bucket_name = _SHARED_BUCKET
1270
self.put_object(bucket_name, 'temp/foo.txt', 'contents')
1271
p = aws('s3 ls s3://%s/temp/foo.txt' % bucket_name)
1272
self.assertEqual(p.rc, 0)
1273
self.assertIn('foo.txt', p.stdout)
1274
1275
def test_ls_empty_bucket(self):
1276
bucket_name = _SHARED_BUCKET
1277
p = aws('s3 ls %s' % bucket_name)
1278
# There should not be an error thrown for checking the contents of
1279
# an empty bucket because no key was specified.
1280
self.assertEqual(p.rc, 0)
1281
1282
def test_ls_fail(self):
1283
bucket_name = _SHARED_BUCKET
1284
p = aws('s3 ls s3://%s/foo' % bucket_name)
1285
self.assertEqual(p.rc, 1)
1286
1287
def test_ls_fail_recursive(self):
1288
bucket_name = _SHARED_BUCKET
1289
p = aws('s3 ls s3://%s/bar --recursive' % bucket_name)
1290
self.assertEqual(p.rc, 1)
1291
1292
1293
class TestMbRb(BaseS3IntegrationTest):
1294
"""
1295
Tests primarily using ``rb`` and ``mb`` command.
1296
"""
1297
def extra_setup(self):
1298
self.bucket_name = random_bucket_name()
1299
1300
def test_mb_rb(self):
1301
p = aws('s3 mb s3://%s' % self.bucket_name)
1302
self.assert_no_errors(p)
1303
1304
# Give the bucket time to form.
1305
time.sleep(1)
1306
response = self.list_buckets()
1307
self.assertIn(self.bucket_name, [b['Name'] for b in response])
1308
1309
p = aws('s3 rb s3://%s' % self.bucket_name)
1310
self.assert_no_errors(p)
1311
1312
def test_fail_mb_rb(self):
1313
# Choose a bucket name that already exists.
1314
p = aws('s3 mb s3://mybucket')
1315
self.assertIn("BucketAlreadyExists", p.stderr)
1316
self.assertEqual(p.rc, 1)
1317
1318
1319
class TestOutput(BaseS3IntegrationTest):
1320
"""
1321
This ensures that arguments that affect output i.e. ``--quiet`` and
1322
``--only-show-errors`` behave as expected.
1323
"""
1324
def test_normal_output(self):
1325
bucket_name = _SHARED_BUCKET
1326
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1327
1328
# Copy file into bucket.
1329
p = aws('s3 cp %s s3://%s/' % (foo_txt, bucket_name))
1330
self.assertEqual(p.rc, 0)
1331
# Check that there were no errors and that parts of the expected
1332
# progress message are written to stdout.
1333
self.assert_no_errors(p)
1334
self.assertIn('upload', p.stdout)
1335
self.assertIn('s3://%s/foo.txt' % bucket_name, p.stdout)
1336
1337
def test_normal_output_quiet(self):
1338
bucket_name = _SHARED_BUCKET
1339
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1340
1341
# Copy file into bucket.
1342
p = aws('s3 cp %s s3://%s/ --quiet' % (foo_txt, bucket_name))
1343
self.assertEqual(p.rc, 0)
1344
# Check that nothing was printed to stdout.
1345
self.assertEqual('', p.stdout)
1346
1347
def test_normal_output_only_show_errors(self):
1348
bucket_name = _SHARED_BUCKET
1349
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1350
1351
# Copy file into bucket.
1352
p = aws('s3 cp %s s3://%s/ --only-show-errors' % (foo_txt,
1353
bucket_name))
1354
self.assertEqual(p.rc, 0)
1355
# Check that nothing was printed to stdout.
1356
self.assertEqual('', p.stdout)
1357
1358
def test_normal_output_no_progress(self):
1359
bucket_name = _SHARED_BUCKET
1360
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1361
1362
# Copy file into bucket.
1363
p = aws('s3 cp %s s3://%s/ --no-progress' % (foo_txt, bucket_name))
1364
self.assertEqual(p.rc, 0)
1365
# Ensure success message was printed
1366
self.assertIn('upload', p.stdout)
1367
self.assertIn('s3://%s/foo.txt' % bucket_name, p.stdout)
1368
self.assertNotIn('Completed ', p.stdout)
1369
self.assertNotIn('calculating', p.stdout)
1370
1371
def test_error_output(self):
1372
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1373
1374
# Copy file into bucket.
1375
p = aws(f's3 cp {foo_txt} s3://{_NON_EXISTENT_BUCKET}/')
1376
# Check that there were errors and that the error was print to stderr.
1377
self.assertEqual(p.rc, 1)
1378
self.assertIn('upload failed', p.stderr)
1379
1380
def test_error_ouput_quiet(self):
1381
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1382
1383
# Copy file into bucket.
1384
p = aws(f's3 cp {foo_txt} s3://{_NON_EXISTENT_BUCKET}/ --quiet')
1385
# Check that there were errors and that the error was not
1386
# print to stderr.
1387
self.assertEqual(p.rc, 1)
1388
self.assertEqual('', p.stderr)
1389
1390
def test_error_ouput_only_show_errors(self):
1391
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1392
1393
# Copy file into bucket.
1394
p = aws(f's3 cp {foo_txt} s3://{_NON_EXISTENT_BUCKET}/ --only-show-errors')
1395
# Check that there were errors and that the error was print to stderr.
1396
self.assertEqual(p.rc, 1)
1397
self.assertIn('upload failed', p.stderr)
1398
1399
def test_error_and_success_output_only_show_errors(self):
1400
# Make a bucket.
1401
bucket_name = _SHARED_BUCKET
1402
1403
# Create one file.
1404
self.files.create_file('f', 'foo contents')
1405
1406
# Create another file that has a slightly longer name than the first.
1407
self.files.create_file('bar.txt', 'bar contents')
1408
1409
# Create a prefix that will cause the second created file to have a key
1410
# longer than 1024 bytes which is not allowed in s3.
1411
long_prefix = 'd' * 1022
1412
1413
p = aws('s3 cp %s s3://%s/%s/ --only-show-errors --recursive'
1414
% (self.files.rootdir, bucket_name, long_prefix))
1415
1416
# Check that there was at least one error.
1417
self.assertEqual(p.rc, 1)
1418
1419
# Check that there was nothing written to stdout for successful upload.
1420
self.assertEqual('', p.stdout)
1421
1422
# Check that the failed message showed up in stderr.
1423
self.assertIn('upload failed', p.stderr)
1424
1425
# Ensure the expected successful key exists in the bucket.
1426
self.assertTrue(self.key_exists(bucket_name, long_prefix + '/f'))
1427
1428
1429
class TestDryrun(BaseS3IntegrationTest):
1430
"""
1431
This ensures that dryrun works.
1432
"""
1433
def test_dryrun(self):
1434
bucket_name = _SHARED_BUCKET
1435
foo_txt = self.files.create_file('foo.txt', 'foo contents')
1436
1437
# Copy file into bucket.
1438
p = aws('s3 cp %s s3://%s/ --dryrun' % (foo_txt, bucket_name))
1439
self.assertEqual(p.rc, 0)
1440
self.assert_no_errors(p)
1441
self.assertTrue(self.key_not_exists(bucket_name, 'foo.txt'))
1442
1443
def test_dryrun_large_files(self):
1444
bucket_name = _SHARED_BUCKET
1445
foo_txt = self.files.create_file('foo.txt', 'a' * 1024 * 1024 * 10)
1446
1447
# Copy file into bucket.
1448
p = aws('s3 cp %s s3://%s/ --dryrun' % (foo_txt, bucket_name))
1449
self.assertEqual(p.rc, 0)
1450
self.assert_no_errors(p)
1451
self.assertTrue(
1452
self.key_not_exists(bucket_name, 'foo.txt'),
1453
"The key 'foo.txt' exists in S3. It looks like the --dryrun "
1454
"argument was not obeyed.")
1455
1456
def test_dryrun_download_large_file(self):
1457
bucket_name = _SHARED_BUCKET
1458
full_path = self.files.create_file('largefile', 'a' * 1024 * 1024 * 10)
1459
with open(full_path, 'rb') as body:
1460
self.put_object(bucket_name, 'foo.txt', body)
1461
1462
foo_txt = self.files.full_path('foo.txt')
1463
p = aws('s3 cp s3://%s/foo.txt %s --dryrun' % (bucket_name, foo_txt))
1464
self.assertEqual(p.rc, 0)
1465
self.assert_no_errors(p)
1466
self.assertFalse(
1467
os.path.exists(foo_txt),
1468
"The file 'foo.txt' exists locally. It looks like the --dryrun "
1469
"argument was not obeyed.")
1470
1471
1472
@skip_if_windows('Memory tests only supported on mac/linux')
1473
class TestMemoryUtilization(BaseS3IntegrationTest):
1474
# These tests verify the memory utilization and growth are what we expect.
1475
def extra_setup(self):
1476
self.num_threads = DEFAULTS['max_concurrent_requests']
1477
self.chunk_size = DEFAULTS['multipart_chunksize']
1478
expected_memory_usage = self.num_threads * self.chunk_size
1479
# margin for things like python VM overhead, botocore service
1480
# objects, etc. 1.5 is really generous, perhaps over time this can be
1481
# lowered.
1482
runtime_margin = 1.5
1483
self.max_mem_allowed = runtime_margin * expected_memory_usage
1484
1485
def assert_max_memory_used(self, process, max_mem_allowed, full_command):
1486
peak_memory = max(process.memory_usage)
1487
if peak_memory > max_mem_allowed:
1488
failure_message = (
1489
'Exceeded max memory allowed (%s MB) for command '
1490
'"%s": %s MB' % (self.max_mem_allowed / 1024.0 / 1024.0,
1491
full_command,
1492
peak_memory / 1024.0 / 1024.0))
1493
self.fail(failure_message)
1494
1495
@pytest.mark.slow
1496
def test_transfer_single_large_file(self):
1497
# 40MB will force a multipart upload.
1498
bucket_name = _SHARED_BUCKET
1499
file_contents = 'abcdabcd' * (1024 * 1024 * 10)
1500
foo_txt = self.files.create_file('foo.txt', file_contents)
1501
full_command = 's3 mv %s s3://%s/foo.txt' % (foo_txt, bucket_name)
1502
p = aws(full_command, collect_memory=True)
1503
self.assert_no_errors(p)
1504
self.assert_max_memory_used(p, self.max_mem_allowed, full_command)
1505
1506
# Verify downloading it back down obeys memory utilization.
1507
download_full_command = 's3 mv s3://%s/foo.txt %s' % (
1508
bucket_name, foo_txt)
1509
p = aws(download_full_command, collect_memory=True)
1510
self.assert_no_errors(p)
1511
self.assert_max_memory_used(p, self.max_mem_allowed,
1512
download_full_command)
1513
1514
# Some versions of RHEL allocate memory in a way where free'd memory isn't
1515
# given back to the OS. We haven't seen behavior as bad as RHEL's to the
1516
# point where this test fails on other distros, so for now we're disabling
1517
# the test on RHEL until we come up with a better way to collect
1518
# memory usage.
1519
@pytest.mark.slow
1520
@unittest.skipIf(_running_on_rhel(),
1521
'Streaming memory tests no supported on RHEL.')
1522
def test_stream_large_file(self):
1523
"""
1524
This tests to ensure that streaming files for both uploads and
1525
downloads do not use too much memory. Note that streaming uploads
1526
will use slightly more memory than usual but should not put the
1527
entire file into memory.
1528
"""
1529
bucket_name = _SHARED_BUCKET
1530
1531
# Create a 200 MB file that will be streamed
1532
num_mb = 200
1533
foo_txt = self.files.create_file('foo.txt', '')
1534
with open(foo_txt, 'wb') as f:
1535
for i in range(num_mb):
1536
f.write(b'a' * 1024 * 1024)
1537
1538
# The current memory threshold is set at about the peak amount for
1539
# performing a streaming upload of a file larger than 100 MB. So
1540
# this maximum needs to be bumped up. The maximum memory allowance
1541
# is increased by two chunksizes because that is the maximum
1542
# amount of chunks that will be queued while not being operated on
1543
# by a thread when performing a streaming multipart upload.
1544
max_mem_allowed = self.max_mem_allowed + 2 * self.chunk_size
1545
1546
full_command = 's3 cp - s3://%s/foo.txt' % bucket_name
1547
with open(foo_txt, 'rb') as f:
1548
p = aws(full_command, input_file=f, collect_memory=True)
1549
self.assert_no_errors(p)
1550
self.assert_max_memory_used(p, max_mem_allowed, full_command)
1551
1552
# Now perform a streaming download of the file.
1553
full_command = 's3 cp s3://%s/foo.txt - > %s' % (bucket_name, foo_txt)
1554
p = aws(full_command, collect_memory=True)
1555
self.assert_no_errors(p)
1556
# Use the usual bar for maximum memory usage since a streaming
1557
# download's memory usage should be comparable to non-streaming
1558
# transfers.
1559
self.assert_max_memory_used(p, self.max_mem_allowed, full_command)
1560
1561
1562
class TestWebsiteConfiguration(BaseS3IntegrationTest):
1563
def test_create_website_index_configuration(self):
1564
bucket_name = self.create_bucket()
1565
# Supply only --index-document argument.
1566
full_command = 's3 website %s --index-document index.html' % \
1567
(bucket_name)
1568
p = aws(full_command)
1569
self.assertEqual(p.rc, 0)
1570
self.assert_no_errors(p)
1571
# Verify we have a bucket website configured.
1572
parsed = self.client.get_bucket_website(Bucket=bucket_name)
1573
self.assertEqual(parsed['IndexDocument']['Suffix'], 'index.html')
1574
self.assertNotIn('ErrorDocument', parsed)
1575
self.assertNotIn('RoutingRules', parsed)
1576
self.assertNotIn('RedirectAllRequestsTo', parsed)
1577
1578
def test_create_website_index_and_error_configuration(self):
1579
bucket_name = self.create_bucket()
1580
# Supply both --index-document and --error-document arguments.
1581
p = aws('s3 website %s --index-document index.html '
1582
'--error-document error.html' % bucket_name)
1583
self.assertEqual(p.rc, 0)
1584
self.assert_no_errors(p)
1585
# Verify we have a bucket website configured.
1586
parsed = self.client.get_bucket_website(Bucket=bucket_name)
1587
self.assertEqual(parsed['IndexDocument']['Suffix'], 'index.html')
1588
self.assertEqual(parsed['ErrorDocument']['Key'], 'error.html')
1589
self.assertNotIn('RoutingRules', parsed)
1590
self.assertNotIn('RedirectAllRequestsTo', parsed)
1591
1592
1593
class TestIncludeExcludeFilters(BaseS3IntegrationTest):
1594
def assert_no_files_would_be_uploaded(self, p):
1595
self.assert_no_errors(p)
1596
# There should be no output.
1597
self.assertEqual(p.stdout, '')
1598
self.assertEqual(p.stderr, '')
1599
1600
def test_basic_exclude_filter_for_single_file(self):
1601
full_path = self.files.create_file('foo.txt', 'this is foo.txt')
1602
# With no exclude we should upload the file.
1603
p = aws('s3 cp %s s3://random-bucket-name/ --dryrun' % full_path)
1604
self.assert_no_errors(p)
1605
self.assertIn('(dryrun) upload:', p.stdout)
1606
1607
p2 = aws("s3 cp %s s3://random-bucket-name/ --dryrun --exclude '*'"
1608
% full_path)
1609
self.assert_no_files_would_be_uploaded(p2)
1610
1611
def test_explicitly_exclude_single_file(self):
1612
full_path = self.files.create_file('foo.txt', 'this is foo.txt')
1613
p = aws('s3 cp %s s3://random-bucket-name/'
1614
' --dryrun --exclude foo.txt'
1615
% full_path)
1616
self.assert_no_files_would_be_uploaded(p)
1617
1618
def test_cwd_doesnt_matter(self):
1619
full_path = self.files.create_file('foo.txt', 'this is foo.txt')
1620
tempdir = tempfile.mkdtemp()
1621
self.addCleanup(shutil.rmtree, tempdir)
1622
with cd(tempdir):
1623
p = aws("s3 cp %s s3://random-bucket-name/ --dryrun --exclude '*'"
1624
% full_path)
1625
self.assert_no_files_would_be_uploaded(p)
1626
1627
def test_recursive_exclude(self):
1628
# create test/foo.txt
1629
nested_dir = os.path.join(self.files.rootdir, 'test')
1630
os.mkdir(nested_dir)
1631
self.files.create_file(os.path.join(nested_dir, 'foo.txt'),
1632
contents='foo.txt contents')
1633
# Then create test-123.txt, test-321.txt, test.txt.
1634
self.files.create_file('test-123.txt', 'test-123.txt contents')
1635
self.files.create_file('test-321.txt', 'test-321.txt contents')
1636
self.files.create_file('test.txt', 'test.txt contents')
1637
# An --exclude test* should exclude everything here.
1638
p = aws("s3 cp %s s3://random-bucket-name/ --dryrun --exclude '*' "
1639
"--recursive" % self.files.rootdir)
1640
self.assert_no_files_would_be_uploaded(p)
1641
1642
# We can include the test directory though.
1643
p = aws("s3 cp %s s3://random-bucket-name/ --dryrun "
1644
"--exclude '*' --include 'test/*' --recursive"
1645
% self.files.rootdir)
1646
self.assert_no_errors(p)
1647
self.assertRegex(p.stdout, r'\(dryrun\) upload:.*test/foo.txt.*')
1648
1649
def test_s3_filtering(self):
1650
# Should behave the same as local file filtering.
1651
bucket_name = _SHARED_BUCKET
1652
self.put_object(bucket_name, key_name='foo.txt')
1653
self.put_object(bucket_name, key_name='bar.txt')
1654
self.put_object(bucket_name, key_name='baz.jpg')
1655
p = aws("s3 rm s3://%s/ --dryrun --exclude '*' --recursive"
1656
% bucket_name)
1657
self.assert_no_files_would_be_uploaded(p)
1658
1659
p = aws(
1660
"s3 rm s3://%s/ --dryrun --exclude '*.jpg' --exclude '*.txt' "
1661
"--recursive" % bucket_name)
1662
self.assert_no_files_would_be_uploaded(p)
1663
1664
p = aws("s3 rm s3://%s/ --dryrun --exclude '*.txt' --recursive"
1665
% bucket_name)
1666
self.assert_no_errors(p)
1667
self.assertRegex(p.stdout, r'\(dryrun\) delete:.*baz.jpg.*')
1668
self.assertNotIn(p.stdout, 'bar.txt')
1669
self.assertNotIn(p.stdout, 'foo.txt')
1670
1671
def test_exclude_filter_with_delete(self):
1672
# Test for: https://github.com/aws/aws-cli/issues/778
1673
bucket_name = _SHARED_BUCKET
1674
self.files.create_file('foo.txt', 'contents')
1675
second = self.files.create_file('bar.py', 'contents')
1676
p = aws("s3 sync %s s3://%s/" % (self.files.rootdir, bucket_name))
1677
self.assert_no_errors(p)
1678
self.assertTrue(self.key_exists(bucket_name, key_name='bar.py'))
1679
os.remove(second)
1680
# We now have the same state as specified in the bug:
1681
# local remote
1682
# ----- ------
1683
#
1684
# foo.txt foo.txt
1685
# bar.py
1686
#
1687
# If we now run --exclude '*.py' --delete, then we should *not*
1688
# delete bar.py and the remote side.
1689
p = aws("s3 sync %s s3://%s/ --exclude '*.py' --delete" % (
1690
self.files.rootdir, bucket_name))
1691
self.assert_no_errors(p)
1692
self.assertTrue(
1693
self.key_exists(bucket_name, key_name='bar.py'),
1694
("The --delete flag was not applied to the receiving "
1695
"end, the 'bar.py' file was deleted even though it"
1696
" was excluded."))
1697
1698
def test_exclude_filter_with_relative_path(self):
1699
# Same test as test_exclude_filter_with_delete, except we don't
1700
# use an absolute path on the source dir.
1701
bucket_name = _SHARED_BUCKET
1702
self.files.create_file('foo.txt', 'contents')
1703
second = self.files.create_file('bar.py', 'contents')
1704
p = aws("s3 sync %s s3://%s/" % (self.files.rootdir, bucket_name))
1705
self.assert_no_errors(p)
1706
self.assertTrue(self.key_exists(bucket_name, key_name='bar.py'))
1707
os.remove(second)
1708
cwd = os.getcwd()
1709
try:
1710
os.chdir(self.files.rootdir)
1711
# Note how we're using "." for the source directory.
1712
p = aws("s3 sync . s3://%s/ --exclude '*.py' --delete"
1713
% bucket_name)
1714
finally:
1715
os.chdir(cwd)
1716
self.assert_no_errors(p)
1717
self.assertTrue(
1718
self.key_exists(bucket_name, key_name='bar.py'),
1719
("The --delete flag was not applied to the receiving "
1720
"end, the 'bar.py' file was deleted even though"
1721
" it was excluded."))
1722
1723
def test_filter_s3_with_prefix(self):
1724
bucket_name = _SHARED_BUCKET
1725
self.put_object(bucket_name, key_name='temp/test')
1726
p = aws('s3 cp s3://%s/temp/ %s --recursive --exclude test --dryrun'
1727
% (bucket_name, self.files.rootdir))
1728
self.assert_no_files_would_be_uploaded(p)
1729
1730
def test_filter_no_resync(self):
1731
# This specifically tests for the issue described here:
1732
# https://github.com/aws/aws-cli/issues/794
1733
bucket_name = _SHARED_BUCKET
1734
dir_name = os.path.join(self.files.rootdir, 'temp')
1735
self.files.create_file(os.path.join(dir_name, 'test.txt'),
1736
contents='foo')
1737
# Sync a local directory to an s3 prefix.
1738
p = aws('s3 sync %s s3://%s/temp' % (dir_name, bucket_name))
1739
self.assert_no_errors(p)
1740
self.assertTrue(self.key_exists(bucket_name, key_name='temp/test.txt'))
1741
1742
# Nothing should be synced down if filters are used.
1743
p = aws("s3 sync s3://%s/temp %s --exclude '*' --include test.txt"
1744
% (bucket_name, dir_name))
1745
self.assert_no_files_would_be_uploaded(p)
1746
1747
1748
class TestFileWithSpaces(BaseS3IntegrationTest):
1749
def test_upload_download_file_with_spaces(self):
1750
bucket_name = _SHARED_BUCKET
1751
filename = self.files.create_file('with space.txt', 'contents')
1752
p = aws('s3 cp %s s3://%s/ --recursive' % (self.files.rootdir,
1753
bucket_name))
1754
self.assert_no_errors(p)
1755
os.remove(filename)
1756
# Now download the file back down locally.
1757
p = aws('s3 cp s3://%s/ %s --recursive' % (bucket_name,
1758
self.files.rootdir))
1759
self.assert_no_errors(p)
1760
self.assertEqual(os.listdir(self.files.rootdir)[0], 'with space.txt')
1761
1762
def test_sync_file_with_spaces(self):
1763
bucket_name = _SHARED_BUCKET
1764
self.files.create_file('with space.txt',
1765
'contents', mtime=time.time() - 300)
1766
p = aws('s3 sync %s s3://%s/' % (self.files.rootdir,
1767
bucket_name))
1768
self.assert_no_errors(p)
1769
time.sleep(1)
1770
# Now syncing again should *not* trigger any uploads (i.e we should
1771
# get nothing on stdout).
1772
p2 = aws('s3 sync %s s3://%s/' % (self.files.rootdir,
1773
bucket_name))
1774
self.assertEqual(p2.stdout, '')
1775
self.assertEqual(p2.stderr, '')
1776
self.assertEqual(p2.rc, 0)
1777
1778
1779
class TestStreams(BaseS3IntegrationTest):
1780
def test_upload(self):
1781
"""
1782
This tests uploading a small stream from stdin.
1783
"""
1784
bucket_name = _SHARED_BUCKET
1785
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1786
input_data=b'This is a test')
1787
self.assert_no_errors(p)
1788
self.assertTrue(self.key_exists(bucket_name, 'stream'))
1789
self.assertEqual(self.get_key_contents(bucket_name, 'stream'),
1790
'This is a test')
1791
1792
def test_unicode_upload(self):
1793
"""
1794
This tests being able to upload unicode from stdin.
1795
"""
1796
unicode_str = u'\u00e9 This is a test'
1797
byte_str = unicode_str.encode('utf-8')
1798
bucket_name = _SHARED_BUCKET
1799
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1800
input_data=byte_str)
1801
self.assert_no_errors(p)
1802
self.assertTrue(self.key_exists(bucket_name, 'stream'))
1803
self.assertEqual(self.get_key_contents(bucket_name, 'stream'),
1804
unicode_str)
1805
1806
@pytest.mark.slow
1807
def test_multipart_upload(self):
1808
"""
1809
This tests the ability to multipart upload streams from stdin.
1810
The data has some unicode in it to avoid having to do a separate
1811
multipart upload test just for unicode.
1812
"""
1813
bucket_name = _SHARED_BUCKET
1814
data = u'\u00e9bcd' * (1024 * 1024 * 10)
1815
data_encoded = data.encode('utf-8')
1816
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1817
input_data=data_encoded)
1818
self.assert_no_errors(p)
1819
self.assertTrue(self.key_exists(bucket_name, 'stream'))
1820
self.assert_key_contents_equal(bucket_name, 'stream', data)
1821
1822
def test_download(self):
1823
"""
1824
This tests downloading a small stream from stdout.
1825
"""
1826
bucket_name = _SHARED_BUCKET
1827
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1828
input_data=b'This is a test')
1829
self.assert_no_errors(p)
1830
1831
p = aws('s3 cp s3://%s/stream -' % bucket_name)
1832
self.assert_no_errors(p)
1833
self.assertEqual(p.stdout, 'This is a test')
1834
1835
def test_unicode_download(self):
1836
"""
1837
This tests downloading a small unicode stream from stdout.
1838
"""
1839
bucket_name = _SHARED_BUCKET
1840
1841
data = u'\u00e9 This is a test'
1842
data_encoded = data.encode('utf-8')
1843
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1844
input_data=data_encoded)
1845
self.assert_no_errors(p)
1846
1847
# Downloading the unicode stream to standard out.
1848
p = aws('s3 cp s3://%s/stream -' % bucket_name)
1849
self.assert_no_errors(p)
1850
self.assertEqual(p.stdout, data_encoded.decode(get_stdout_encoding()))
1851
1852
@pytest.mark.slow
1853
def test_multipart_download(self):
1854
"""
1855
This tests the ability to multipart download streams to stdout.
1856
The data has some unicode in it to avoid having to do a separate
1857
multipart download test just for unicode.
1858
"""
1859
bucket_name = _SHARED_BUCKET
1860
1861
# First lets upload some data via streaming since
1862
# its faster and we do not have to write to a file!
1863
data = u'\u00e9bcd' * (1024 * 1024 * 10)
1864
data_encoded = data.encode('utf-8')
1865
p = aws('s3 cp - s3://%s/stream' % bucket_name,
1866
input_data=data_encoded)
1867
1868
# Download the unicode stream to standard out.
1869
p = aws('s3 cp s3://%s/stream -' % bucket_name)
1870
self.assert_no_errors(p)
1871
self.assertEqual(p.stdout, data_encoded.decode(get_stdout_encoding()))
1872
1873
1874
class TestLSWithProfile(BaseS3IntegrationTest):
1875
def extra_setup(self):
1876
self.config_file = os.path.join(self.files.rootdir, 'tmpconfig')
1877
with open(self.config_file, 'w') as f:
1878
creds = self.session.get_credentials()
1879
f.write(
1880
"[profile testprofile]\n"
1881
"aws_access_key_id=%s\n"
1882
"aws_secret_access_key=%s\n" % (
1883
creds.access_key,
1884
creds.secret_key)
1885
)
1886
if creds.token is not None:
1887
f.write("aws_session_token=%s\n" % creds.token)
1888
1889
def test_can_ls_with_profile(self):
1890
env_vars = os.environ.copy()
1891
env_vars['AWS_CONFIG_FILE'] = self.config_file
1892
p = aws('s3 ls s3:// --profile testprofile', env_vars=env_vars)
1893
self.assert_no_errors(p)
1894
1895
1896
class TestNoSignRequests(BaseS3IntegrationTest):
1897
def test_no_sign_request(self):
1898
bucket_name = _SHARED_BUCKET
1899
self.put_object(bucket_name, 'foo', contents='bar',
1900
extra_args={'ACL': 'public-read-write'})
1901
env_vars = os.environ.copy()
1902
env_vars['AWS_ACCESS_KEY_ID'] = 'foo'
1903
env_vars['AWS_SECRET_ACCESS_KEY'] = 'bar'
1904
p = aws('s3 cp s3://%s/foo %s/ --region %s' %
1905
(bucket_name, self.files.rootdir, self.region),
1906
env_vars=env_vars)
1907
# Should have credential issues
1908
self.assertEqual(p.rc, 1)
1909
1910
p = aws('s3 cp s3://%s/foo %s/ --region %s --no-sign-request' %
1911
(bucket_name, self.files.rootdir, self.region),
1912
env_vars=env_vars)
1913
# Should be able to download the file when not signing the request.
1914
self.assert_no_errors(p)
1915
1916
1917
class TestHonorsEndpointUrl(BaseS3IntegrationTest):
1918
def test_verify_endpoint_url_is_used(self):
1919
# We're going to verify this indirectly by looking at the
1920
# debug logs. The endpoint url we specify should be in the
1921
# debug logs, and the endpoint url that botocore would have
1922
# used if we didn't provide the endpoint-url should not
1923
# be in the debug logs. The other alternative is to actually
1924
# watch what connections are made in the process, which is not
1925
# easy.
1926
p = aws('s3 ls s3://dnscompat/ '
1927
'--endpoint-url http://localhost:51515 '
1928
'--debug')
1929
debug_logs = p.stderr
1930
original_hostname = 'dnscompat.s3.amazonaws.com'
1931
expected = 'localhost'
1932
self.assertNotIn(original_hostname, debug_logs,
1933
'--endpoint-url is being ignored in s3 commands.')
1934
self.assertIn(expected, debug_logs)
1935
1936
1937
class TestSSERelatedParams(BaseS3IntegrationTest):
1938
def download_and_assert_kms_object_integrity(self, bucket, key, contents):
1939
self.wait_until_key_exists(bucket, key)
1940
# Ensure the kms object can be download it by downloading it
1941
# with --sse aws:kms is enabled to ensure sigv4 is used on the
1942
# download, as it is required for kms.
1943
download_filename = os.path.join(self.files.rootdir, 'tmp', key)
1944
p = aws('s3 cp s3://%s/%s %s --sse aws:kms' % (
1945
bucket, key, download_filename))
1946
self.assert_no_errors(p)
1947
1948
self.assertTrue(os.path.isfile(download_filename))
1949
with open(download_filename, 'r') as f:
1950
self.assertEqual(f.read(), contents)
1951
1952
def test_sse_upload(self):
1953
bucket = _SHARED_BUCKET
1954
key = 'foo.txt'
1955
contents = 'contents'
1956
file_name = self.files.create_file(key, contents)
1957
1958
# Upload the file using AES256
1959
p = aws('s3 cp %s s3://%s/%s --sse AES256' % (file_name, bucket, key))
1960
self.assert_no_errors(p)
1961
1962
# Ensure the file was uploaded correctly
1963
self.assert_key_contents_equal(bucket, key, contents)
1964
1965
def test_large_file_sse_upload(self):
1966
bucket = _SHARED_BUCKET
1967
key = 'foo.txt'
1968
contents = 'a' * (10 * (1024 * 1024))
1969
file_name = self.files.create_file(key, contents)
1970
1971
# Upload the file using AES256
1972
p = aws('s3 cp %s s3://%s/%s --sse AES256' % (file_name, bucket, key))
1973
self.assert_no_errors(p)
1974
1975
# Ensure the file was uploaded correctly
1976
self.assert_key_contents_equal(bucket, key, contents)
1977
1978
def test_sse_with_kms_upload(self):
1979
bucket = _SHARED_BUCKET
1980
key = 'foo.txt'
1981
contents = 'contents'
1982
file_name = self.files.create_file(key, contents)
1983
1984
# Upload the file using KMS
1985
p = aws('s3 cp %s s3://%s/%s --sse aws:kms' % (file_name, bucket, key))
1986
self.assert_no_errors(p)
1987
1988
self.download_and_assert_kms_object_integrity(bucket, key, contents)
1989
1990
def test_large_file_sse_kms_upload(self):
1991
bucket = _SHARED_BUCKET
1992
key = 'foo.txt'
1993
contents = 'a' * (10 * (1024 * 1024))
1994
file_name = self.files.create_file(key, contents)
1995
1996
# Upload the file using KMS
1997
p = aws('s3 cp %s s3://%s/%s --sse aws:kms' % (file_name, bucket, key))
1998
self.assert_no_errors(p)
1999
2000
self.download_and_assert_kms_object_integrity(bucket, key, contents)
2001
2002
def test_sse_copy(self):
2003
bucket = _SHARED_BUCKET
2004
key = 'foo.txt'
2005
new_key = 'bar.txt'
2006
contents = 'contents'
2007
self.put_object(bucket, key, contents)
2008
2009
# Copy the file using AES256
2010
p = aws('s3 cp s3://%s/%s s3://%s/%s --sse AES256' % (
2011
bucket, key, bucket, new_key))
2012
self.assert_no_errors(p)
2013
2014
# Ensure the file was copied correctly
2015
self.assert_key_contents_equal(bucket, new_key, contents)
2016
2017
def test_large_file_sse_copy(self):
2018
bucket = _SHARED_BUCKET
2019
key = 'foo.txt'
2020
new_key = 'bar.txt'
2021
contents = 'a' * (10 * (1024 * 1024))
2022
2023
# This is a little faster and more efficient than
2024
# calling self.put_object()
2025
file_name = self.files.create_file(key, contents)
2026
p = aws('s3 cp %s s3://%s/%s' % (file_name, bucket, key))
2027
self.assert_no_errors(p)
2028
2029
# Copy the file using AES256
2030
p = aws('s3 cp s3://%s/%s s3://%s/%s --sse AES256' % (
2031
bucket, key, bucket, new_key))
2032
self.assert_no_errors(p)
2033
2034
# Ensure the file was copied correctly
2035
self.assert_key_contents_equal(bucket, new_key, contents)
2036
2037
def test_sse_kms_copy(self):
2038
bucket = _SHARED_BUCKET
2039
key = 'foo.txt'
2040
new_key = 'bar.txt'
2041
contents = 'contents'
2042
self.put_object(bucket, key, contents)
2043
2044
# Copy the file using KMS
2045
p = aws('s3 cp s3://%s/%s s3://%s/%s --sse aws:kms' % (
2046
bucket, key, bucket, new_key))
2047
self.assert_no_errors(p)
2048
self.download_and_assert_kms_object_integrity(bucket, key, contents)
2049
2050
def test_large_file_sse_kms_copy(self):
2051
bucket = _SHARED_BUCKET
2052
key = 'foo.txt'
2053
new_key = 'bar.txt'
2054
contents = 'a' * (10 * (1024 * 1024))
2055
2056
# This is a little faster and more efficient than
2057
# calling self.put_object()
2058
file_name = self.files.create_file(key, contents)
2059
p = aws('s3 cp %s s3://%s/%s' % (file_name, bucket, key))
2060
self.assert_no_errors(p)
2061
2062
# Copy the file using KMS
2063
p = aws('s3 cp s3://%s/%s s3://%s/%s --sse aws:kms' % (
2064
bucket, key, bucket, new_key))
2065
self.assert_no_errors(p)
2066
self.download_and_assert_kms_object_integrity(bucket, key, contents)
2067
2068
def test_smoke_sync_sse(self):
2069
bucket = _SHARED_BUCKET
2070
key = 'foo.txt'
2071
contents = 'contents'
2072
file_name = self.files.create_file(key, contents)
2073
2074
# Upload sync
2075
p = aws('s3 sync %s s3://%s/foo/ --sse AES256' % (
2076
self.files.rootdir, bucket))
2077
self.assert_no_errors(p)
2078
self.wait_until_key_exists(bucket, 'foo/foo.txt')
2079
2080
# Copy sync
2081
p = aws('s3 sync s3://%s/foo/ s3://%s/bar/ --sse AES256' % (
2082
bucket, bucket))
2083
self.assert_no_errors(p)
2084
self.wait_until_key_exists(bucket, 'bar/foo.txt')
2085
2086
# Remove the original file
2087
os.remove(file_name)
2088
2089
# Download sync
2090
p = aws('s3 sync s3://%s/bar/ %s --sse AES256' % (
2091
bucket, self.files.rootdir))
2092
self.assert_no_errors(p)
2093
2094
self.assertTrue(os.path.isfile(file_name))
2095
with open(file_name, 'r') as f:
2096
self.assertEqual(f.read(), contents)
2097
2098
def test_smoke_sync_sse_kms(self):
2099
bucket = _SHARED_BUCKET
2100
key = 'foo.txt'
2101
contents = 'contents'
2102
file_name = self.files.create_file(key, contents)
2103
2104
# Upload sync
2105
p = aws('s3 sync %s s3://%s/foo/ --sse aws:kms' % (
2106
self.files.rootdir, bucket))
2107
self.assert_no_errors(p)
2108
2109
# Copy sync
2110
p = aws('s3 sync s3://%s/foo/ s3://%s/bar/ --sse aws:kms' % (
2111
bucket, bucket))
2112
self.assert_no_errors(p)
2113
2114
# Remove the original file
2115
os.remove(file_name)
2116
2117
# Download sync
2118
p = aws('s3 sync s3://%s/bar/ %s --sse aws:kms' % (
2119
bucket, self.files.rootdir))
2120
self.assert_no_errors(p)
2121
2122
self.assertTrue(os.path.isfile(file_name))
2123
with open(file_name, 'r') as f:
2124
self.assertEqual(f.read(), contents)
2125
2126
2127
class TestSSECRelatedParams(BaseS3IntegrationTest):
2128
def setUp(self):
2129
super(TestSSECRelatedParams, self).setUp()
2130
self.encrypt_key = 'a' * 32
2131
self.other_encrypt_key = 'b' * 32
2132
self.bucket = _SHARED_BUCKET
2133
2134
def download_and_assert_sse_c_object_integrity(
2135
self, bucket, key, encrypt_key, contents):
2136
self.wait_until_key_exists(bucket, key,
2137
{'SSECustomerKey': encrypt_key,
2138
'SSECustomerAlgorithm': 'AES256'})
2139
download_filename = os.path.join(self.files.rootdir, 'tmp', key)
2140
p = aws('s3 cp s3://%s/%s %s --sse-c AES256 --sse-c-key %s' % (
2141
bucket, key, download_filename, encrypt_key))
2142
self.assert_no_errors(p)
2143
2144
self.assertTrue(os.path.isfile(download_filename))
2145
with open(download_filename, 'r') as f:
2146
self.assertEqual(f.read(), contents)
2147
2148
def test_sse_c_upload_and_download(self):
2149
key = 'foo.txt'
2150
contents = 'contents'
2151
file_name = self.files.create_file(key, contents)
2152
2153
# Upload the file using SSE-C
2154
p = aws('s3 cp %s s3://%s --sse-c AES256 --sse-c-key %s' % (
2155
file_name, self.bucket, self.encrypt_key))
2156
self.assert_no_errors(p)
2157
2158
self.download_and_assert_sse_c_object_integrity(
2159
self.bucket, key, self.encrypt_key, contents)
2160
2161
def test_can_delete_single_sse_c_object(self):
2162
key = 'foo.txt'
2163
contents = 'contents'
2164
self.put_object(
2165
self.bucket, key, contents,
2166
extra_args={
2167
'SSECustomerKey': self.encrypt_key,
2168
'SSECustomerAlgorithm': 'AES256'
2169
}
2170
)
2171
p = aws('s3 rm s3://%s/%s' % (self.bucket, key))
2172
self.assert_no_errors(p)
2173
self.assertFalse(self.key_exists(self.bucket, key))
2174
2175
def test_sse_c_upload_and_download_large_file(self):
2176
key = 'foo.txt'
2177
contents = 'a' * (10 * (1024 * 1024))
2178
file_name = self.files.create_file(key, contents)
2179
2180
# Upload the file using SSE-C
2181
p = aws('s3 cp %s s3://%s --sse-c AES256 --sse-c-key %s' % (
2182
file_name, self.bucket, self.encrypt_key))
2183
self.assert_no_errors(p)
2184
2185
self.download_and_assert_sse_c_object_integrity(
2186
self.bucket, key, self.encrypt_key, contents)
2187
2188
def test_sse_c_copy(self):
2189
key = 'foo.txt'
2190
new_key = 'bar.txt'
2191
contents = 'contents'
2192
file_name = self.files.create_file(key, contents)
2193
2194
# Upload the file using SSE-C
2195
p = aws('s3 cp %s s3://%s --sse-c AES256 --sse-c-key %s' % (
2196
file_name, self.bucket, self.encrypt_key))
2197
self.assert_no_errors(p)
2198
2199
# Copy the file using SSE-C and a new encryption key
2200
p = aws(
2201
's3 cp s3://%s/%s s3://%s/%s --sse-c AES256 --sse-c-key %s '
2202
'--sse-c-copy-source AES256 --sse-c-copy-source-key %s' % (
2203
self.bucket, key, self.bucket, new_key, self.other_encrypt_key,
2204
self.encrypt_key))
2205
self.assert_no_errors(p)
2206
self.download_and_assert_sse_c_object_integrity(
2207
self.bucket, new_key, self.other_encrypt_key, contents)
2208
2209
def test_sse_c_copy_large_file(self):
2210
key = 'foo.txt'
2211
new_key = 'bar.txt'
2212
contents = 'a' * (10 * (1024 * 1024))
2213
file_name = self.files.create_file(key, contents)
2214
2215
# Upload the file using SSE-C
2216
p = aws('s3 cp %s s3://%s --sse-c AES256 --sse-c-key %s' % (
2217
file_name, self.bucket, self.encrypt_key))
2218
self.assert_no_errors(p)
2219
2220
# Copy the file using SSE-C and a new encryption key
2221
p = aws(
2222
's3 cp s3://%s/%s s3://%s/%s --sse-c AES256 --sse-c-key %s '
2223
'--sse-c-copy-source AES256 --sse-c-copy-source-key %s' % (
2224
self.bucket, key, self.bucket, new_key, self.other_encrypt_key,
2225
self.encrypt_key))
2226
self.assert_no_errors(p)
2227
self.download_and_assert_sse_c_object_integrity(
2228
self.bucket, new_key, self.other_encrypt_key, contents)
2229
2230
def test_smoke_sync_sse_c(self):
2231
key = 'foo.txt'
2232
contents = 'contents'
2233
file_name = self.files.create_file(key, contents)
2234
2235
# Upload sync
2236
p = aws('s3 sync %s s3://%s/foo/ --sse-c AES256 --sse-c-key %s' % (
2237
self.files.rootdir, self.bucket, self.encrypt_key))
2238
self.assert_no_errors(p)
2239
2240
# Copy sync
2241
p = aws('s3 sync s3://%s/foo/ s3://%s/bar/ --sse-c AES256 '
2242
'--sse-c-key %s --sse-c-copy-source AES256 '
2243
'--sse-c-copy-source-key %s' % (
2244
self.bucket, self.bucket, self.other_encrypt_key,
2245
self.encrypt_key))
2246
self.assert_no_errors(p)
2247
2248
# Remove the original file
2249
os.remove(file_name)
2250
2251
# Download sync
2252
p = aws('s3 sync s3://%s/bar/ %s --sse-c AES256 --sse-c-key %s' % (
2253
self.bucket, self.files.rootdir, self.other_encrypt_key))
2254
self.assert_no_errors(p)
2255
2256
self.assertTrue(os.path.isfile(file_name))
2257
with open(file_name, 'r') as f:
2258
self.assertEqual(f.read(), contents)
2259
2260
2261
class TestPresignCommand(BaseS3IntegrationTest):
2262
2263
def test_can_retrieve_presigned_url(self):
2264
bucket_name = _SHARED_BUCKET
2265
original_contents = b'this is foo.txt'
2266
self.put_object(bucket_name, 'foo.txt', original_contents)
2267
p = aws('s3 presign s3://%s/foo.txt' % (bucket_name,))
2268
self.assert_no_errors(p)
2269
url = p.stdout.strip()
2270
contents = urlopen(url).read()
2271
self.assertEqual(contents, original_contents)
2272
2273