Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/s3/test_utils.py
1569 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
from awscli.testutils import mock, unittest, temporary_file
14
import argparse
15
import errno
16
import os
17
import tempfile
18
import shutil
19
import ntpath
20
import time
21
import datetime
22
23
import pytest
24
from dateutil.tz import tzlocal
25
from s3transfer.futures import TransferMeta, TransferFuture
26
from s3transfer.compat import seekable
27
from botocore.hooks import HierarchicalEmitter
28
29
from awscli.compat import queue
30
from awscli.compat import StringIO
31
from awscli.testutils import FileCreator
32
from awscli.customizations.s3.utils import (
33
find_bucket_key,
34
guess_content_type, relative_path, block_unsupported_resources,
35
StablePriorityQueue, BucketLister, get_file_stat, AppendFilter,
36
create_warning, human_readable_size, human_readable_to_bytes,
37
set_file_utime, SetFileUtimeError, RequestParamsMapper, StdoutBytesWriter,
38
ProvideSizeSubscriber, ProvideETagSubscriber, OnDoneFilteredSubscriber,
39
ProvideUploadContentTypeSubscriber, ProvideCopyContentTypeSubscriber,
40
ProvideLastModifiedTimeSubscriber, DirectoryCreatorSubscriber,
41
DeleteSourceObjectSubscriber, DeleteSourceFileSubscriber,
42
DeleteCopySourceObjectSubscriber, NonSeekableStream, CreateDirectoryError,
43
S3PathResolver)
44
from awscli.customizations.s3.results import WarningResult
45
from tests.unit.customizations.s3 import FakeTransferFuture
46
from tests.unit.customizations.s3 import FakeTransferFutureMeta
47
from tests.unit.customizations.s3 import FakeTransferFutureCallArgs
48
49
50
@pytest.fixture
51
def s3control_client():
52
client = mock.MagicMock()
53
client.get_access_point.return_value = {
54
"Bucket": "mybucket"
55
}
56
client.list_multi_region_access_points.return_value = {
57
"AccessPoints": [{
58
"Alias": "myalias.mrap",
59
"Regions": [{"Bucket": "mybucket"}]
60
}]
61
}
62
return client
63
64
65
@pytest.fixture
66
def sts_client():
67
client = mock.MagicMock()
68
client.get_caller_identity.return_value = {
69
"Account": "123456789012"
70
}
71
return client
72
73
74
@pytest.fixture
75
def s3_path_resolver(s3control_client, sts_client):
76
return S3PathResolver(s3control_client, sts_client)
77
78
79
@pytest.mark.parametrize(
80
'bytes_int, expected',
81
(
82
(1, '1 Byte'),
83
(10, '10 Bytes'),
84
(1000, '1000 Bytes'),
85
(1024, '1.0 KiB'),
86
(1024 ** 2, '1.0 MiB'),
87
(1024 ** 3, '1.0 GiB'),
88
(1024 ** 4, '1.0 TiB'),
89
(1024 ** 5, '1.0 PiB'),
90
(1024 ** 6, '1.0 EiB'),
91
(1024 ** 2 - 1, '1.0 MiB'),
92
(1024 ** 3 - 1, '1.0 GiB'),
93
)
94
)
95
def test_human_readable_size(bytes_int, expected):
96
assert human_readable_size(bytes_int) == expected
97
98
99
@pytest.mark.parametrize(
100
'size_str, expected',
101
(
102
("1", 1),
103
("1024", 1024),
104
("1KB", 1024),
105
("1kb", 1024),
106
("1MB", 1024 ** 2),
107
("1GB", 1024 ** 3),
108
("1TB", 1024 ** 4),
109
("1KiB", 1024),
110
("1kib", 1024),
111
("1MiB", 1024 ** 2),
112
("1GiB", 1024 ** 3),
113
("1TiB", 1024 ** 4),
114
)
115
)
116
def test_convert_human_readable_to_bytes(size_str, expected):
117
assert human_readable_to_bytes(size_str) == expected
118
119
120
class AppendFilterTest(unittest.TestCase):
121
def test_call(self):
122
parser = argparse.ArgumentParser()
123
124
parser.add_argument('--include', action=AppendFilter, nargs=1,
125
dest='path')
126
parser.add_argument('--exclude', action=AppendFilter, nargs=1,
127
dest='path')
128
parsed_args = parser.parse_args(['--include', 'a', '--exclude', 'b'])
129
self.assertEqual(parsed_args.path, [['--include', 'a'],
130
['--exclude', 'b']])
131
132
133
class TestFindBucketKey(unittest.TestCase):
134
def test_unicode(self):
135
s3_path = '\u1234' + u'/' + '\u5678'
136
bucket, key = find_bucket_key(s3_path)
137
self.assertEqual(bucket, '\u1234')
138
self.assertEqual(key, '\u5678')
139
140
def test_bucket(self):
141
bucket, key = find_bucket_key('bucket')
142
self.assertEqual(bucket, 'bucket')
143
self.assertEqual(key, '')
144
145
def test_bucket_with_slash(self):
146
bucket, key = find_bucket_key('bucket/')
147
self.assertEqual(bucket, 'bucket')
148
self.assertEqual(key, '')
149
150
def test_bucket_with_key(self):
151
bucket, key = find_bucket_key('bucket/key')
152
self.assertEqual(bucket, 'bucket')
153
self.assertEqual(key, 'key')
154
155
def test_bucket_with_key_and_prefix(self):
156
bucket, key = find_bucket_key('bucket/prefix/key')
157
self.assertEqual(bucket, 'bucket')
158
self.assertEqual(key, 'prefix/key')
159
160
def test_accesspoint_arn(self):
161
bucket, key = find_bucket_key(
162
'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint')
163
self.assertEqual(
164
bucket, 'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint')
165
self.assertEqual(key, '')
166
167
def test_accesspoint_arn_with_slash(self):
168
bucket, key = find_bucket_key(
169
'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint/')
170
self.assertEqual(
171
bucket, 'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint')
172
self.assertEqual(key, '')
173
174
def test_accesspoint_arn_with_key(self):
175
bucket, key = find_bucket_key(
176
'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint/key')
177
self.assertEqual(
178
bucket, 'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint')
179
self.assertEqual(key, 'key')
180
181
def test_accesspoint_arn_with_key_and_prefix(self):
182
bucket, key = find_bucket_key(
183
'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint/pre/key')
184
self.assertEqual(
185
bucket, 'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint')
186
self.assertEqual(key, 'pre/key')
187
188
def test_outpost_arn_with_colon(self):
189
bucket, key = find_bucket_key(
190
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
191
'accesspoint:my-accesspoint'
192
)
193
self.assertEqual(
194
bucket,
195
(
196
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
197
'accesspoint:my-accesspoint'
198
)
199
)
200
self.assertEqual(key, '')
201
202
def test_outpost_arn_with_colon_and_key(self):
203
bucket, key = find_bucket_key(
204
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
205
'accesspoint:my-accesspoint/key'
206
)
207
self.assertEqual(
208
bucket,
209
(
210
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
211
'accesspoint:my-accesspoint'
212
)
213
)
214
self.assertEqual(key, 'key')
215
216
def test_outpost_arn_with_colon_and_key_with_colon_in_name(self):
217
bucket, key = find_bucket_key(
218
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
219
'accesspoint:my-accesspoint/key:name'
220
)
221
self.assertEqual(
222
bucket,
223
(
224
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
225
'accesspoint:my-accesspoint'
226
)
227
)
228
self.assertEqual(key, 'key:name')
229
230
def test_outpost_arn_with_colon_and_key_with_slash_in_name(self):
231
bucket, key = find_bucket_key(
232
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
233
'accesspoint:my-accesspoint/key/name'
234
)
235
self.assertEqual(
236
bucket,
237
(
238
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
239
'accesspoint:my-accesspoint'
240
)
241
)
242
self.assertEqual(key, 'key/name')
243
244
def test_outpost_arn_with_colon_and_key_with_slash_and_colon_in_name(self):
245
bucket, key = find_bucket_key(
246
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
247
'accesspoint:my-accesspoint/prefix/key:name'
248
)
249
self.assertEqual(
250
bucket,
251
(
252
'arn:aws:s3-outposts:us-west-2:123456789012:outpost:op-12334:'
253
'accesspoint:my-accesspoint'
254
)
255
)
256
self.assertEqual(key, 'prefix/key:name')
257
258
def test_outpost_arn_with_slash(self):
259
bucket, key = find_bucket_key(
260
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
261
'accesspoint/my-accesspoint'
262
)
263
self.assertEqual(
264
bucket,
265
(
266
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
267
'accesspoint/my-accesspoint'
268
)
269
)
270
self.assertEqual(key, '')
271
272
def test_outpost_arn_with_slash_and_key(self):
273
bucket, key = find_bucket_key(
274
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
275
'accesspoint/my-accesspoint/key'
276
)
277
self.assertEqual(
278
bucket,
279
(
280
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
281
'accesspoint/my-accesspoint'
282
)
283
)
284
self.assertEqual(key, 'key')
285
286
def test_outpost_arn_with_slash_and_key_with_colon_in_name(self):
287
bucket, key = find_bucket_key(
288
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
289
'accesspoint/my-accesspoint/key:name'
290
)
291
self.assertEqual(
292
bucket,
293
(
294
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
295
'accesspoint/my-accesspoint'
296
)
297
)
298
self.assertEqual(key, 'key:name')
299
300
def test_outpost_arn_with_slash_and_key_with_slash_in_name(self):
301
bucket, key = find_bucket_key(
302
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
303
'accesspoint/my-accesspoint/key/name'
304
)
305
self.assertEqual(
306
bucket,
307
(
308
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
309
'accesspoint/my-accesspoint'
310
)
311
)
312
self.assertEqual(key, 'key/name')
313
314
def test_outpost_arn_with_slash_and_key_with_slash_and_colon_in_name(self):
315
bucket, key = find_bucket_key(
316
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
317
'accesspoint/my-accesspoint/prefix/key:name'
318
)
319
self.assertEqual(
320
bucket,
321
(
322
'arn:aws:s3-outposts:us-west-2:123456789012:outpost/op-12334/'
323
'accesspoint/my-accesspoint'
324
)
325
)
326
self.assertEqual(key, 'prefix/key:name')
327
328
329
class TestBlockUnsupportedResources(unittest.TestCase):
330
def test_object_lambda_arn_with_colon_raises_exception(self):
331
with self.assertRaisesRegex(
332
ValueError, 'Use s3api commands instead'):
333
block_unsupported_resources(
334
'arn:aws:s3-object-lambda:us-west-2:123456789012:'
335
'accesspoint:my-accesspoint'
336
)
337
338
def test_object_lambda_arn_with_slash_raises_exception(self):
339
with self.assertRaisesRegex(
340
ValueError, 'Use s3api commands instead'):
341
block_unsupported_resources(
342
'arn:aws:s3-object-lambda:us-west-2:123456789012:'
343
'accesspoint/my-accesspoint'
344
)
345
346
def test_outpost_bucket_arn_with_colon_raises_exception(self):
347
with self.assertRaisesRegex(
348
ValueError, 'Use s3control commands instead'):
349
block_unsupported_resources(
350
'arn:aws:s3-outposts:us-west-2:123456789012:'
351
'outpost/op-0a12345678abcdefg:bucket/bucket-foo'
352
)
353
354
def test_outpost_bucket_arn_with_slash_raises_exception(self):
355
with self.assertRaisesRegex(
356
ValueError, 'Use s3control commands instead'):
357
block_unsupported_resources(
358
'arn:aws:s3-outposts:us-west-2:123456789012:'
359
'outpost/op-0a12345678abcdefg/bucket/bucket-foo'
360
)
361
362
363
class TestCreateWarning(unittest.TestCase):
364
def test_create_warning(self):
365
path = '/foo/'
366
error_message = 'There was an error'
367
warning_message = create_warning(path, error_message)
368
self.assertEqual(warning_message.message,
369
'warning: Skipping file /foo/. There was an error')
370
self.assertFalse(warning_message.error)
371
self.assertTrue(warning_message.warning)
372
373
374
class TestGuessContentType(unittest.TestCase):
375
def test_guess_content_type(self):
376
self.assertEqual(guess_content_type('foo.txt'), 'text/plain')
377
378
def test_guess_content_type_with_no_valid_matches(self):
379
self.assertEqual(guess_content_type('no-extension'), None)
380
381
def test_guess_content_type_with_unicode_error_returns_no_match(self):
382
with mock.patch('mimetypes.guess_type') as guess_type_patch:
383
# This should throw a UnicodeDecodeError.
384
guess_type_patch.side_effect = lambda x: b'\xe2'.decode('ascii')
385
self.assertEqual(guess_content_type('foo.txt'), None)
386
387
388
class TestRelativePath(unittest.TestCase):
389
def test_relpath_normal(self):
390
self.assertEqual(relative_path('/tmp/foo/bar', '/tmp/foo'),
391
'.' + os.sep + 'bar')
392
393
# We need to patch out relpath with the ntpath version so
394
# we can simulate testing drives on windows.
395
@mock.patch('os.path.relpath', ntpath.relpath)
396
def test_relpath_with_error(self):
397
# Just want to check we don't get an exception raised,
398
# which is what was happening previously.
399
self.assertIn(r'foo\bar', relative_path(r'c:\foo\bar'))
400
401
402
class TestStablePriorityQueue(unittest.TestCase):
403
def test_fifo_order_of_same_priorities(self):
404
a = mock.Mock()
405
a.PRIORITY = 5
406
b = mock.Mock()
407
b.PRIORITY = 5
408
c = mock.Mock()
409
c.PRIORITY = 1
410
411
q = StablePriorityQueue(maxsize=10, max_priority=20)
412
q.put(a)
413
q.put(b)
414
q.put(c)
415
416
# First we should get c because it's the lowest priority.
417
# We're using assertIs because we want the *exact* object.
418
self.assertIs(q.get(), c)
419
# Then a and b are the same priority, but we should get
420
# a first because it was inserted first.
421
self.assertIs(q.get(), a)
422
self.assertIs(q.get(), b)
423
424
def test_queue_length(self):
425
a = mock.Mock()
426
a.PRIORITY = 5
427
428
q = StablePriorityQueue(maxsize=10, max_priority=20)
429
self.assertEqual(q.qsize(), 0)
430
431
q.put(a)
432
self.assertEqual(q.qsize(), 1)
433
434
q.get()
435
self.assertEqual(q.qsize(), 0)
436
437
def test_insert_max_priority_capped(self):
438
q = StablePriorityQueue(maxsize=10, max_priority=20)
439
a = mock.Mock()
440
a.PRIORITY = 100
441
q.put(a)
442
443
self.assertIs(q.get(), a)
444
445
def test_priority_attr_is_missing(self):
446
# If priority attr is missing, we should add it
447
# to the lowest priority.
448
q = StablePriorityQueue(maxsize=10, max_priority=20)
449
a = object()
450
b = mock.Mock()
451
b.PRIORITY = 5
452
453
q.put(a)
454
q.put(b)
455
456
self.assertIs(q.get(), b)
457
self.assertIs(q.get(), a)
458
459
460
class TestBucketList(unittest.TestCase):
461
def setUp(self):
462
self.client = mock.Mock()
463
self.emitter = HierarchicalEmitter()
464
self.client.meta.events = self.emitter
465
self.date_parser = mock.Mock()
466
self.date_parser.return_value = mock.sentinel.now
467
self.responses = []
468
469
def fake_paginate(self, *args, **kwargs):
470
for response in self.responses:
471
self.emitter.emit('after-call.s3.ListObjectsV2', parsed=response)
472
return self.responses
473
474
def test_list_objects(self):
475
now = mock.sentinel.now
476
self.client.get_paginator.return_value.paginate = self.fake_paginate
477
individual_response_elements = [
478
{'LastModified': '2014-02-27T04:20:38.000Z',
479
'Key': 'a', 'Size': 1},
480
{'LastModified': '2014-02-27T04:20:38.000Z',
481
'Key': 'b', 'Size': 2},
482
{'LastModified': '2014-02-27T04:20:38.000Z',
483
'Key': 'c', 'Size': 3}
484
]
485
self.responses = [
486
{'Contents': individual_response_elements[0:2]},
487
{'Contents': [individual_response_elements[2]]}
488
]
489
lister = BucketLister(self.client, self.date_parser)
490
objects = list(lister.list_objects(bucket='foo'))
491
self.assertEqual(objects,
492
[('foo/a', individual_response_elements[0]),
493
('foo/b', individual_response_elements[1]),
494
('foo/c', individual_response_elements[2])])
495
for individual_response in individual_response_elements:
496
self.assertEqual(individual_response['LastModified'], now)
497
498
def test_list_objects_passes_in_extra_args(self):
499
self.client.get_paginator.return_value.paginate.return_value = [
500
{'Contents': [
501
{'LastModified': '2014-02-27T04:20:38.000Z',
502
'Key': 'mykey', 'Size': 3}
503
]}
504
]
505
lister = BucketLister(self.client, self.date_parser)
506
list(
507
lister.list_objects(
508
bucket='mybucket', extra_args={'RequestPayer': 'requester'}
509
)
510
)
511
self.client.get_paginator.return_value.paginate.assert_called_with(
512
Bucket='mybucket', PaginationConfig={'PageSize': None},
513
RequestPayer='requester'
514
)
515
516
517
class TestGetFileStat(unittest.TestCase):
518
519
def test_get_file_stat(self):
520
now = datetime.datetime.now(tzlocal())
521
epoch_now = time.mktime(now.timetuple())
522
with temporary_file('w') as f:
523
f.write('foo')
524
f.flush()
525
os.utime(f.name, (epoch_now, epoch_now))
526
size, update_time = get_file_stat(f.name)
527
self.assertEqual(size, 3)
528
self.assertEqual(time.mktime(update_time.timetuple()), epoch_now)
529
530
def test_error_message(self):
531
with mock.patch('os.stat', mock.Mock(side_effect=IOError('msg'))):
532
with self.assertRaisesRegex(ValueError, r'myfilename\.txt'):
533
get_file_stat('myfilename.txt')
534
535
def assert_handles_fromtimestamp_error(self, error):
536
patch_attribute = 'awscli.customizations.s3.utils.datetime'
537
with mock.patch(patch_attribute) as datetime_mock:
538
with temporary_file('w') as temp_file:
539
temp_file.write('foo')
540
temp_file.flush()
541
datetime_mock.fromtimestamp.side_effect = error
542
size, update_time = get_file_stat(temp_file.name)
543
self.assertIsNone(update_time)
544
545
def test_returns_epoch_on_invalid_timestamp(self):
546
self.assert_handles_fromtimestamp_error(ValueError())
547
548
def test_returns_epoch_on_invalid_timestamp_os_error(self):
549
self.assert_handles_fromtimestamp_error(OSError())
550
551
def test_returns_epoch_on_invalid_timestamp_overflow_error(self):
552
self.assert_handles_fromtimestamp_error(OverflowError())
553
554
555
class TestSetsFileUtime(unittest.TestCase):
556
557
def test_successfully_sets_utime(self):
558
now = datetime.datetime.now(tzlocal())
559
epoch_now = time.mktime(now.timetuple())
560
with temporary_file('w') as f:
561
set_file_utime(f.name, epoch_now)
562
_, update_time = get_file_stat(f.name)
563
self.assertEqual(time.mktime(update_time.timetuple()), epoch_now)
564
565
def test_throws_more_relevant_error_when_errno_1(self):
566
now = datetime.datetime.now(tzlocal())
567
epoch_now = time.mktime(now.timetuple())
568
with mock.patch('os.utime') as utime_mock:
569
utime_mock.side_effect = OSError(1, '')
570
with self.assertRaises(SetFileUtimeError):
571
set_file_utime('not_real_file', epoch_now)
572
573
def test_passes_through_other_os_errors(self):
574
now = datetime.datetime.now(tzlocal())
575
epoch_now = time.mktime(now.timetuple())
576
with mock.patch('os.utime') as utime_mock:
577
utime_mock.side_effect = OSError(2, '')
578
with self.assertRaises(OSError):
579
set_file_utime('not_real_file', epoch_now)
580
581
582
class TestRequestParamsMapperSSE(unittest.TestCase):
583
def setUp(self):
584
self.cli_params = {
585
'sse': 'AES256',
586
'sse_kms_key_id': 'my-kms-key',
587
'sse_c': 'AES256',
588
'sse_c_key': 'my-sse-c-key',
589
'sse_c_copy_source': 'AES256',
590
'sse_c_copy_source_key': 'my-sse-c-copy-source-key'
591
}
592
593
def test_head_object(self):
594
params = {}
595
RequestParamsMapper.map_head_object_params(params, self.cli_params)
596
self.assertEqual(
597
params,
598
{'SSECustomerAlgorithm': 'AES256',
599
'SSECustomerKey': 'my-sse-c-key'}
600
)
601
602
def test_put_object(self):
603
params = {}
604
RequestParamsMapper.map_put_object_params(params, self.cli_params)
605
self.assertEqual(
606
params,
607
{'SSECustomerAlgorithm': 'AES256',
608
'SSECustomerKey': 'my-sse-c-key',
609
'SSEKMSKeyId': 'my-kms-key',
610
'ServerSideEncryption': 'AES256'}
611
)
612
613
def test_get_object(self):
614
params = {}
615
RequestParamsMapper.map_get_object_params(params, self.cli_params)
616
self.assertEqual(
617
params,
618
{'SSECustomerAlgorithm': 'AES256',
619
'SSECustomerKey': 'my-sse-c-key'}
620
)
621
622
def test_copy_object(self):
623
params = {}
624
RequestParamsMapper.map_copy_object_params(params, self.cli_params)
625
self.assertEqual(
626
params,
627
{'CopySourceSSECustomerAlgorithm': 'AES256',
628
'CopySourceSSECustomerKey': 'my-sse-c-copy-source-key',
629
'SSECustomerAlgorithm': 'AES256',
630
'SSECustomerKey': 'my-sse-c-key',
631
'SSEKMSKeyId': 'my-kms-key',
632
'ServerSideEncryption': 'AES256'}
633
)
634
635
def test_create_multipart_upload(self):
636
params = {}
637
RequestParamsMapper.map_create_multipart_upload_params(
638
params, self.cli_params)
639
self.assertEqual(
640
params,
641
{'SSECustomerAlgorithm': 'AES256',
642
'SSECustomerKey': 'my-sse-c-key',
643
'SSEKMSKeyId': 'my-kms-key',
644
'ServerSideEncryption': 'AES256'}
645
)
646
647
def test_upload_part(self):
648
params = {}
649
RequestParamsMapper.map_upload_part_params(params, self.cli_params)
650
self.assertEqual(
651
params,
652
{'SSECustomerAlgorithm': 'AES256',
653
'SSECustomerKey': 'my-sse-c-key'}
654
)
655
656
def test_upload_part_copy(self):
657
params = {}
658
RequestParamsMapper.map_upload_part_copy_params(
659
params, self.cli_params)
660
self.assertEqual(
661
params,
662
{'CopySourceSSECustomerAlgorithm': 'AES256',
663
'CopySourceSSECustomerKey': 'my-sse-c-copy-source-key',
664
'SSECustomerAlgorithm': 'AES256',
665
'SSECustomerKey': 'my-sse-c-key'})
666
667
668
class TestRequestParamsMapperChecksumAlgorithm:
669
@pytest.fixture
670
def cli_params(self):
671
return {'checksum_algorithm': 'CRC32'}
672
673
@pytest.fixture
674
def cli_params_no_algorithm(self):
675
return {}
676
677
def test_put_object(self, cli_params):
678
request_params = {}
679
RequestParamsMapper.map_put_object_params(request_params, cli_params)
680
assert request_params == {'ChecksumAlgorithm': 'CRC32'}
681
682
def test_put_object_no_checksum(self, cli_params_no_algorithm):
683
request_params = {}
684
RequestParamsMapper.map_put_object_params(request_params, cli_params_no_algorithm)
685
assert 'ChecksumAlgorithm' not in request_params
686
687
def test_copy_object(self, cli_params):
688
request_params = {}
689
RequestParamsMapper.map_copy_object_params(request_params, cli_params)
690
assert request_params == {'ChecksumAlgorithm': 'CRC32'}
691
692
def test_copy_object_no_checksum(self, cli_params_no_algorithm):
693
request_params = {}
694
RequestParamsMapper.map_put_object_params(request_params, cli_params_no_algorithm)
695
assert 'ChecksumAlgorithm' not in request_params
696
697
698
class TestRequestParamsMapperChecksumMode:
699
@pytest.fixture
700
def cli_params(self):
701
return {'checksum_mode': 'ENABLED'}
702
703
@pytest.fixture
704
def cli_params_no_checksum(self):
705
return {}
706
707
def test_get_object(self, cli_params):
708
request_params = {}
709
RequestParamsMapper.map_get_object_params(request_params, cli_params)
710
assert request_params == {'ChecksumMode': 'ENABLED'}
711
712
def test_get_object_no_checksums(self, cli_params_no_checksum):
713
request_params = {}
714
RequestParamsMapper.map_get_object_params(request_params, cli_params_no_checksum)
715
assert 'ChecksumMode' not in request_params
716
717
718
class TestRequestParamsMapperRequestPayer(unittest.TestCase):
719
def setUp(self):
720
self.cli_params = {'request_payer': 'requester'}
721
722
def test_head_object(self):
723
params = {}
724
RequestParamsMapper.map_head_object_params(params, self.cli_params)
725
self.assertEqual(params, {'RequestPayer': 'requester'})
726
727
def test_put_object(self):
728
params = {}
729
RequestParamsMapper.map_put_object_params(params, self.cli_params)
730
self.assertEqual(params, {'RequestPayer': 'requester'})
731
732
def test_get_object(self):
733
params = {}
734
RequestParamsMapper.map_get_object_params(params, self.cli_params)
735
self.assertEqual(params, {'RequestPayer': 'requester'})
736
737
def test_copy_object(self):
738
params = {}
739
RequestParamsMapper.map_copy_object_params(params, self.cli_params)
740
self.assertEqual(params, {'RequestPayer': 'requester'})
741
742
def test_create_multipart_upload(self):
743
params = {}
744
RequestParamsMapper.map_create_multipart_upload_params(
745
params, self.cli_params)
746
self.assertEqual(params, {'RequestPayer': 'requester'})
747
748
def test_upload_part(self):
749
params = {}
750
RequestParamsMapper.map_upload_part_params(params, self.cli_params)
751
self.assertEqual(params, {'RequestPayer': 'requester'})
752
753
def test_upload_part_copy(self):
754
params = {}
755
RequestParamsMapper.map_upload_part_copy_params(
756
params, self.cli_params)
757
self.assertEqual(params, {'RequestPayer': 'requester'})
758
759
def test_delete_object(self):
760
params = {}
761
RequestParamsMapper.map_delete_object_params(
762
params, self.cli_params)
763
self.assertEqual(params, {'RequestPayer': 'requester'})
764
765
def test_list_objects(self):
766
params = {}
767
RequestParamsMapper.map_list_objects_v2_params(
768
params, self.cli_params)
769
self.assertEqual(params, {'RequestPayer': 'requester'})
770
771
772
class TestBytesPrint(unittest.TestCase):
773
def setUp(self):
774
self.stdout = mock.Mock()
775
self.stdout.buffer = self.stdout
776
777
def test_stdout_wrapper(self):
778
wrapper = StdoutBytesWriter(self.stdout)
779
wrapper.write(b'foo')
780
self.assertTrue(self.stdout.write.called)
781
self.assertEqual(self.stdout.write.call_args[0][0], b'foo')
782
783
784
class TestProvideSizeSubscriber(unittest.TestCase):
785
def setUp(self):
786
self.transfer_future = mock.Mock(spec=TransferFuture)
787
self.transfer_meta = TransferMeta()
788
self.transfer_future.meta = self.transfer_meta
789
790
def test_size_set(self):
791
self.transfer_meta.provide_transfer_size(5)
792
subscriber = ProvideSizeSubscriber(10)
793
subscriber.on_queued(self.transfer_future)
794
self.assertEqual(self.transfer_meta.size, 10)
795
796
797
class TestProvideEtagSubscriber:
798
def test_etag_set(self):
799
transfer_meta = TransferMeta()
800
transfer_future = mock.Mock(spec=TransferFuture)
801
transfer_future.meta = transfer_meta
802
etag = 'myetag'
803
804
transfer_meta.provide_object_etag('oldetag')
805
subscriber = ProvideETagSubscriber(etag)
806
subscriber.on_queued(transfer_future)
807
assert transfer_meta.etag == etag
808
809
810
class OnDoneFilteredRecordingSubscriber(OnDoneFilteredSubscriber):
811
def __init__(self):
812
self.on_success_calls = []
813
self.on_failure_calls = []
814
815
def _on_success(self, future):
816
self.on_success_calls.append(future)
817
818
def _on_failure(self, future, exception):
819
self.on_failure_calls.append((future, exception))
820
821
822
class TestOnDoneFilteredSubscriber(unittest.TestCase):
823
def test_on_success(self):
824
subscriber = OnDoneFilteredRecordingSubscriber()
825
future = FakeTransferFuture('return-value')
826
subscriber.on_done(future)
827
self.assertEqual(subscriber.on_success_calls, [future])
828
self.assertEqual(subscriber.on_failure_calls, [])
829
830
def test_on_failure(self):
831
subscriber = OnDoneFilteredRecordingSubscriber()
832
exception = Exception('my exception')
833
future = FakeTransferFuture(exception=exception)
834
subscriber.on_done(future)
835
self.assertEqual(subscriber.on_failure_calls, [(future, exception)])
836
self.assertEqual(subscriber.on_success_calls, [])
837
838
839
class TestProvideUploadContentTypeSubscriber(unittest.TestCase):
840
def setUp(self):
841
self.filename = 'myfile.txt'
842
self.extra_args = {}
843
self.future = self.set_future()
844
self.subscriber = ProvideUploadContentTypeSubscriber()
845
846
def set_future(self):
847
call_args = FakeTransferFutureCallArgs(
848
fileobj=self.filename, extra_args=self.extra_args)
849
meta = FakeTransferFutureMeta(call_args=call_args)
850
return FakeTransferFuture(meta=meta)
851
852
def test_on_queued_provides_content_type(self):
853
self.subscriber.on_queued(self.future)
854
self.assertEqual(self.extra_args, {'ContentType': 'text/plain'})
855
856
def test_on_queued_does_not_provide_content_type_when_unknown(self):
857
self.filename = 'file-with-no-extension'
858
self.future = self.set_future()
859
self.subscriber.on_queued(self.future)
860
self.assertEqual(self.extra_args, {})
861
862
863
class TestProvideCopyContentTypeSubscriber(
864
TestProvideUploadContentTypeSubscriber):
865
def setUp(self):
866
self.filename = 'myfile.txt'
867
self.extra_args = {}
868
self.future = self.set_future()
869
self.subscriber = ProvideCopyContentTypeSubscriber()
870
871
def set_future(self):
872
copy_source = {'Bucket': 'mybucket', 'Key': self.filename}
873
call_args = FakeTransferFutureCallArgs(
874
copy_source=copy_source, extra_args=self.extra_args)
875
meta = FakeTransferFutureMeta(call_args=call_args)
876
return FakeTransferFuture(meta=meta)
877
878
879
class BaseTestWithFileCreator(unittest.TestCase):
880
def setUp(self):
881
self.file_creator = FileCreator()
882
883
def tearDown(self):
884
self.file_creator.remove_all()
885
886
887
class TestProvideLastModifiedTimeSubscriber(BaseTestWithFileCreator):
888
def setUp(self):
889
super(TestProvideLastModifiedTimeSubscriber, self).setUp()
890
self.filename = self.file_creator.create_file('myfile', 'my contents')
891
self.desired_utime = datetime.datetime(
892
2016, 1, 18, 7, 0, 0, tzinfo=tzlocal())
893
self.result_queue = queue.Queue()
894
self.subscriber = ProvideLastModifiedTimeSubscriber(
895
self.desired_utime, self.result_queue)
896
897
call_args = FakeTransferFutureCallArgs(fileobj=self.filename)
898
meta = FakeTransferFutureMeta(call_args=call_args)
899
self.future = FakeTransferFuture(meta=meta)
900
901
def test_on_success_modifies_utime(self):
902
self.subscriber.on_done(self.future)
903
_, utime = get_file_stat(self.filename)
904
self.assertEqual(utime, self.desired_utime)
905
906
def test_on_success_failure_in_utime_mod_raises_warning(self):
907
self.subscriber = ProvideLastModifiedTimeSubscriber(
908
None, self.result_queue)
909
self.subscriber.on_done(self.future)
910
# Because the time to provide was None it will throw an exception
911
# which results in the a warning about the utime not being able
912
# to be set being placed in the result queue.
913
result = self.result_queue.get()
914
self.assertIsInstance(result, WarningResult)
915
self.assertIn(
916
'unable to update the last modified time', result.message)
917
918
919
class TestDirectoryCreatorSubscriber(BaseTestWithFileCreator):
920
def setUp(self):
921
super(TestDirectoryCreatorSubscriber, self).setUp()
922
self.directory_to_create = os.path.join(
923
self.file_creator.rootdir, 'new-directory')
924
self.filename = os.path.join(self.directory_to_create, 'myfile')
925
926
call_args = FakeTransferFutureCallArgs(fileobj=self.filename)
927
meta = FakeTransferFutureMeta(call_args=call_args)
928
self.future = FakeTransferFuture(meta=meta)
929
930
self.subscriber = DirectoryCreatorSubscriber()
931
932
def test_on_queued_creates_directories_if_do_not_exist(self):
933
self.subscriber.on_queued(self.future)
934
self.assertTrue(os.path.exists(self.directory_to_create))
935
936
def test_on_queued_does_not_create_directories_if_exist(self):
937
os.makedirs(self.directory_to_create)
938
# This should not cause any issues if the directory already exists
939
self.subscriber.on_queued(self.future)
940
# The directory should still exist
941
self.assertTrue(os.path.exists(self.directory_to_create))
942
943
def test_on_queued_failure_propagates_create_directory_error(self):
944
# If makedirs() raises an OSError of exception, we should
945
# propagate the exception with a better worded CreateDirectoryError.
946
with mock.patch('os.makedirs') as makedirs_patch:
947
makedirs_patch.side_effect = OSError()
948
with self.assertRaises(CreateDirectoryError):
949
self.subscriber.on_queued(self.future)
950
self.assertFalse(os.path.exists(self.directory_to_create))
951
952
def test_on_queued_failure_propagates_clear_error_message(self):
953
# If makedirs() raises an OSError of exception, we should
954
# propagate the exception.
955
with mock.patch('os.makedirs') as makedirs_patch:
956
os_error = OSError()
957
os_error.errno = errno.EEXIST
958
makedirs_patch.side_effect = os_error
959
# The on_queued should not raise an error if the directory
960
# already exists
961
try:
962
self.subscriber.on_queued(self.future)
963
except Exception as e:
964
self.fail(
965
'on_queued should not have raised an exception related '
966
'to directory creation especially if one already existed '
967
'but got %s' % e)
968
969
970
class TestDeleteSourceObjectSubscriber(unittest.TestCase):
971
def setUp(self):
972
self.client = mock.Mock()
973
self.bucket = 'mybucket'
974
self.key = 'mykey'
975
call_args = FakeTransferFutureCallArgs(
976
bucket=self.bucket, key=self.key, extra_args={})
977
meta = FakeTransferFutureMeta(call_args=call_args)
978
self.future = mock.Mock()
979
self.future.meta = meta
980
981
def test_deletes_object(self):
982
DeleteSourceObjectSubscriber(self.client).on_done(self.future)
983
self.client.delete_object.assert_called_once_with(
984
Bucket=self.bucket, Key=self.key)
985
self.future.set_exception.assert_not_called()
986
987
def test_sets_exception_on_error(self):
988
exception = ValueError()
989
self.client.delete_object.side_effect = exception
990
DeleteSourceObjectSubscriber(self.client).on_done(self.future)
991
self.client.delete_object.assert_called_once_with(
992
Bucket=self.bucket, Key=self.key)
993
self.future.set_exception.assert_called_once_with(exception)
994
995
def test_with_request_payer(self):
996
self.future.meta.call_args.extra_args = {'RequestPayer': 'requester'}
997
DeleteSourceObjectSubscriber(self.client).on_done(self.future)
998
self.client.delete_object.assert_called_once_with(
999
Bucket=self.bucket, Key=self.key, RequestPayer='requester')
1000
1001
1002
class TestDeleteCopySourceObjectSubscriber(unittest.TestCase):
1003
def setUp(self):
1004
self.client = mock.Mock()
1005
self.bucket = 'mybucket'
1006
self.key = 'mykey'
1007
copy_source = {'Bucket': self.bucket, 'Key': self.key}
1008
call_args = FakeTransferFutureCallArgs(
1009
copy_source=copy_source, extra_args={})
1010
meta = FakeTransferFutureMeta(call_args=call_args)
1011
self.future = mock.Mock()
1012
self.future.meta = meta
1013
1014
def test_deletes_object(self):
1015
DeleteCopySourceObjectSubscriber(self.client).on_done(self.future)
1016
self.client.delete_object.assert_called_once_with(
1017
Bucket=self.bucket, Key=self.key)
1018
self.future.set_exception.assert_not_called()
1019
1020
def test_sets_exception_on_error(self):
1021
exception = ValueError()
1022
self.client.delete_object.side_effect = exception
1023
DeleteCopySourceObjectSubscriber(self.client).on_done(self.future)
1024
self.client.delete_object.assert_called_once_with(
1025
Bucket=self.bucket, Key=self.key)
1026
self.future.set_exception.assert_called_once_with(exception)
1027
1028
def test_with_request_payer(self):
1029
self.future.meta.call_args.extra_args = {'RequestPayer': 'requester'}
1030
DeleteCopySourceObjectSubscriber(self.client).on_done(self.future)
1031
self.client.delete_object.assert_called_once_with(
1032
Bucket=self.bucket, Key=self.key, RequestPayer='requester')
1033
1034
1035
class TestDeleteSourceFileSubscriber(unittest.TestCase):
1036
def setUp(self):
1037
self.tempdir = tempfile.mkdtemp()
1038
self.filename = os.path.join(self.tempdir, 'myfile')
1039
call_args = FakeTransferFutureCallArgs(fileobj=self.filename)
1040
meta = FakeTransferFutureMeta(call_args=call_args)
1041
self.future = mock.Mock()
1042
self.future.meta = meta
1043
1044
def tearDown(self):
1045
shutil.rmtree(self.tempdir)
1046
1047
def test_deletes_file(self):
1048
with open(self.filename, 'w') as f:
1049
f.write('data')
1050
DeleteSourceFileSubscriber().on_done(self.future)
1051
self.assertFalse(os.path.exists(self.filename))
1052
self.future.set_exception.assert_not_called()
1053
1054
def test_sets_exception_on_error(self):
1055
DeleteSourceFileSubscriber().on_done(self.future)
1056
self.assertFalse(os.path.exists(self.filename))
1057
call_args = self.future.set_exception.call_args[0]
1058
self.assertIsInstance(call_args[0], EnvironmentError)
1059
1060
1061
class TestNonSeekableStream(unittest.TestCase):
1062
def test_can_make_stream_unseekable(self):
1063
fileobj = StringIO('foobar')
1064
self.assertTrue(seekable(fileobj))
1065
nonseekable_fileobj = NonSeekableStream(fileobj)
1066
self.assertFalse(seekable(nonseekable_fileobj))
1067
self.assertEqual(nonseekable_fileobj.read(), 'foobar')
1068
1069
def test_can_specify_amount_for_nonseekable_stream(self):
1070
nonseekable_fileobj = NonSeekableStream(StringIO('foobar'))
1071
self.assertEqual(nonseekable_fileobj.read(3), 'foo')
1072
1073
1074
class TestS3PathResolver:
1075
_BASE_ACCESSPOINT_ARN = (
1076
"s3://arn:aws:s3:us-west-2:123456789012:accesspoint/myaccesspoint")
1077
_BASE_OUTPOST_ACCESSPOINT_ARN = (
1078
"s3://arn:aws:s3-outposts:us-east-1:123456789012:outpost"
1079
"/op-foo/accesspoint/myaccesspoint")
1080
_BASE_ACCESSPOINT_ALIAS = "s3://myaccesspoint-foobar12345-s3alias"
1081
_BASE_OUTPOST_ACCESSPOINT_ALIAS = "s3://myaccesspoint-foobar12345--op-s3"
1082
_BASE_MRAP_ARN = "s3://arn:aws:s3::123456789012:accesspoint/myalias.mrap"
1083
1084
@pytest.mark.parametrize(
1085
"path,resolved",
1086
[(_BASE_ACCESSPOINT_ARN,"s3://mybucket/"),
1087
(f"{_BASE_ACCESSPOINT_ARN}/","s3://mybucket/"),
1088
(f"{_BASE_ACCESSPOINT_ARN}/mykey","s3://mybucket/mykey"),
1089
(f"{_BASE_ACCESSPOINT_ARN}/myprefix/","s3://mybucket/myprefix/"),
1090
(f"{_BASE_ACCESSPOINT_ARN}/myprefix/mykey",
1091
"s3://mybucket/myprefix/mykey")]
1092
)
1093
def test_resolves_accesspoint_arn(
1094
self, path, resolved, s3_path_resolver, s3control_client
1095
):
1096
resolved_paths = s3_path_resolver.resolve_underlying_s3_paths(path)
1097
assert resolved_paths == [resolved]
1098
s3control_client.get_access_point.assert_called_with(
1099
AccountId="123456789012",
1100
Name="myaccesspoint"
1101
)
1102
1103
@pytest.mark.parametrize(
1104
"path,resolved",
1105
[(_BASE_OUTPOST_ACCESSPOINT_ARN,"s3://mybucket/"),
1106
(f"{_BASE_OUTPOST_ACCESSPOINT_ARN}/","s3://mybucket/"),
1107
(f"{_BASE_OUTPOST_ACCESSPOINT_ARN}/mykey","s3://mybucket/mykey"),
1108
(f"{_BASE_OUTPOST_ACCESSPOINT_ARN}/myprefix/",
1109
"s3://mybucket/myprefix/"),
1110
(f"{_BASE_OUTPOST_ACCESSPOINT_ARN}/myprefix/mykey",
1111
"s3://mybucket/myprefix/mykey")]
1112
)
1113
def test_resolves_outpost_accesspoint_arn(
1114
self, path, resolved, s3_path_resolver, s3control_client
1115
):
1116
resolved_paths = s3_path_resolver.resolve_underlying_s3_paths(path)
1117
assert resolved_paths == [resolved]
1118
s3control_client.get_access_point.assert_called_with(
1119
AccountId="123456789012",
1120
Name=("arn:aws:s3-outposts:us-east-1:123456789012:outpost"
1121
"/op-foo/accesspoint/myaccesspoint")
1122
)
1123
1124
@pytest.mark.parametrize(
1125
"path,resolved",
1126
[(_BASE_ACCESSPOINT_ALIAS,"s3://mybucket/"),
1127
(f"{_BASE_ACCESSPOINT_ALIAS}/","s3://mybucket/"),
1128
(f"{_BASE_ACCESSPOINT_ALIAS}/mykey","s3://mybucket/mykey"),
1129
(f"{_BASE_ACCESSPOINT_ALIAS}/myprefix/","s3://mybucket/myprefix/"),
1130
(f"{_BASE_ACCESSPOINT_ALIAS}/myprefix/mykey",
1131
"s3://mybucket/myprefix/mykey")]
1132
)
1133
def test_resolves_accesspoint_alias(
1134
self, path, resolved, s3_path_resolver, s3control_client, sts_client
1135
):
1136
resolved_paths = s3_path_resolver.resolve_underlying_s3_paths(path)
1137
assert resolved_paths == [resolved]
1138
sts_client.get_caller_identity.assert_called_once()
1139
s3control_client.get_access_point.assert_called_with(
1140
AccountId="123456789012",
1141
Name="myaccesspoint-foobar12345-s3alias"
1142
)
1143
1144
@pytest.mark.parametrize(
1145
"path",
1146
[(_BASE_OUTPOST_ACCESSPOINT_ALIAS),
1147
(f"{_BASE_OUTPOST_ACCESSPOINT_ALIAS}/"),
1148
(f"{_BASE_OUTPOST_ACCESSPOINT_ALIAS}/mykey"),
1149
(f"{_BASE_OUTPOST_ACCESSPOINT_ALIAS}/myprefix/"),
1150
(f"{_BASE_OUTPOST_ACCESSPOINT_ALIAS}/myprefix/mykey")]
1151
)
1152
def test_outpost_accesspoint_alias_raises_exception(
1153
self, path, s3_path_resolver
1154
):
1155
with pytest.raises(ValueError) as e:
1156
s3_path_resolver.resolve_underlying_s3_paths(path)
1157
assert "Can't resolve underlying bucket name" in str(e.value)
1158
1159
@pytest.mark.parametrize(
1160
"path,resolved",
1161
[(_BASE_MRAP_ARN,"s3://mybucket/"),
1162
(f"{_BASE_MRAP_ARN}/","s3://mybucket/"),
1163
(f"{_BASE_MRAP_ARN}/mykey","s3://mybucket/mykey"),
1164
(f"{_BASE_MRAP_ARN}/myprefix/","s3://mybucket/myprefix/"),
1165
(f"{_BASE_MRAP_ARN}/myprefix/mykey","s3://mybucket/myprefix/mykey")]
1166
)
1167
def test_resolves_mrap_arn(
1168
self, path, resolved, s3_path_resolver, s3control_client
1169
):
1170
resolved_paths = s3_path_resolver.resolve_underlying_s3_paths(path)
1171
assert resolved_paths == [resolved]
1172
s3control_client.list_multi_region_access_points.assert_called_with(
1173
AccountId="123456789012"
1174
)
1175
1176
@pytest.mark.parametrize(
1177
"path,resolved,name",
1178
[(f"{_BASE_ACCESSPOINT_ARN}-s3alias/mykey","s3://mybucket/mykey",
1179
"myaccesspoint-s3alias"),
1180
(f"{_BASE_OUTPOST_ACCESSPOINT_ARN}--op-s3/mykey",
1181
"s3://mybucket/mykey",
1182
f"{_BASE_OUTPOST_ACCESSPOINT_ARN[5:]}--op-s3")]
1183
)
1184
def test_alias_suffixes_dont_match_accesspoint_arns(
1185
self, path, resolved, name, s3_path_resolver, s3control_client
1186
):
1187
resolved_paths = s3_path_resolver.resolve_underlying_s3_paths(path)
1188
assert resolved_paths == [resolved]
1189
s3control_client.get_access_point.assert_called_with(
1190
AccountId="123456789012",
1191
Name=name
1192
)
1193
1194
@pytest.mark.parametrize(
1195
"path,expected_has_underlying_s3_path",
1196
[(_BASE_ACCESSPOINT_ARN,True),
1197
(f"{_BASE_ACCESSPOINT_ARN}/mykey",True),
1198
(f"{_BASE_ACCESSPOINT_ARN}/myprefix/mykey",True),
1199
(_BASE_ACCESSPOINT_ALIAS,True),
1200
(_BASE_OUTPOST_ACCESSPOINT_ARN,True),
1201
(_BASE_OUTPOST_ACCESSPOINT_ALIAS,True),
1202
(_BASE_MRAP_ARN,True),
1203
("s3://mybucket/",False),
1204
("s3://mybucket/mykey",False),
1205
("s3://mybucket/myprefix/mykey",False)]
1206
)
1207
def test_has_underlying_s3_path(self, path, expected_has_underlying_s3_path):
1208
has_underlying_s3_path = S3PathResolver.has_underlying_s3_path(path)
1209
assert has_underlying_s3_path == expected_has_underlying_s3_path
1210
1211