Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/s3/test_s3handler.py
2620 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import os
14
15
from s3transfer.manager import TransferManager
16
17
from awscli.testutils import mock
18
from awscli.testutils import unittest
19
from awscli.testutils import FileCreator
20
from awscli.compat import queue
21
from awscli.customizations.s3.s3handler import S3TransferHandler
22
from awscli.customizations.s3.s3handler import S3TransferHandlerFactory
23
from awscli.customizations.s3.s3handler import UploadRequestSubmitter
24
from awscli.customizations.s3.s3handler import DownloadRequestSubmitter
25
from awscli.customizations.s3.s3handler import CopyRequestSubmitter
26
from awscli.customizations.s3.s3handler import UploadStreamRequestSubmitter
27
from awscli.customizations.s3.s3handler import DownloadStreamRequestSubmitter
28
from awscli.customizations.s3.s3handler import DeleteRequestSubmitter
29
from awscli.customizations.s3.s3handler import LocalDeleteRequestSubmitter
30
from awscli.customizations.s3.fileinfo import FileInfo
31
from awscli.customizations.s3.results import QueuedResult
32
from awscli.customizations.s3.results import SuccessResult
33
from awscli.customizations.s3.results import FailureResult
34
from awscli.customizations.s3.results import UploadResultSubscriber
35
from awscli.customizations.s3.results import DownloadResultSubscriber
36
from awscli.customizations.s3.results import CopyResultSubscriber
37
from awscli.customizations.s3.results import UploadStreamResultSubscriber
38
from awscli.customizations.s3.results import DownloadStreamResultSubscriber
39
from awscli.customizations.s3.results import DeleteResultSubscriber
40
from awscli.customizations.s3.results import ResultRecorder
41
from awscli.customizations.s3.results import ResultProcessor
42
from awscli.customizations.s3.results import CommandResultRecorder
43
from awscli.customizations.s3.results import DryRunResult
44
from awscli.customizations.s3.utils import MAX_UPLOAD_SIZE
45
from awscli.customizations.s3.utils import NonSeekableStream
46
from awscli.customizations.s3.utils import StdoutBytesWriter
47
from awscli.customizations.s3.utils import WarningResult
48
from awscli.customizations.s3.utils import ProvideSizeSubscriber
49
from awscli.customizations.s3.utils import ProvideETagSubscriber
50
from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber
51
from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber
52
from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber
53
from awscli.customizations.s3.utils import DirectoryCreatorSubscriber
54
from awscli.customizations.s3.utils import DeleteSourceFileSubscriber
55
from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber
56
from awscli.customizations.s3.transferconfig import RuntimeConfig
57
58
59
def runtime_config(**kwargs):
60
return RuntimeConfig().build_config(**kwargs)
61
62
63
class TestS3TransferHandlerFactory(unittest.TestCase):
64
def setUp(self):
65
self.cli_params = {}
66
self.runtime_config = runtime_config()
67
self.client = mock.Mock()
68
self.result_queue = queue.Queue()
69
70
def test_call(self):
71
factory = S3TransferHandlerFactory(
72
self.cli_params, self.runtime_config)
73
self.assertIsInstance(
74
factory(self.client, self.result_queue), S3TransferHandler)
75
76
77
class TestS3TransferHandler(unittest.TestCase):
78
def setUp(self):
79
self.result_queue = queue.Queue()
80
self.result_recorder = ResultRecorder()
81
self.processed_results = []
82
self.result_processor = ResultProcessor(
83
self.result_queue,
84
[self.result_recorder, self.processed_results.append]
85
)
86
self.command_result_recorder = CommandResultRecorder(
87
self.result_queue, self.result_recorder, self.result_processor)
88
89
self.transfer_manager = mock.Mock(spec=TransferManager)
90
self.transfer_manager.__enter__ = mock.Mock()
91
self.transfer_manager.__exit__ = mock.Mock()
92
self.parameters = {}
93
self.s3_transfer_handler = S3TransferHandler(
94
self.transfer_manager, self.parameters,
95
self.command_result_recorder
96
)
97
98
def test_call_return_command_result(self):
99
num_failures = 5
100
num_warnings = 3
101
self.result_recorder.files_failed = num_failures
102
self.result_recorder.files_warned = num_warnings
103
command_result = self.s3_transfer_handler.call([])
104
self.assertEqual(command_result, (num_failures, num_warnings))
105
106
def test_enqueue_uploads(self):
107
fileinfos = []
108
num_transfers = 5
109
for _ in range(num_transfers):
110
fileinfos.append(
111
FileInfo(src='filename', dest='bucket/key',
112
operation_name='upload'))
113
114
self.s3_transfer_handler.call(fileinfos)
115
self.assertEqual(
116
self.transfer_manager.upload.call_count, num_transfers)
117
118
def test_enqueue_downloads(self):
119
fileinfos = []
120
num_transfers = 5
121
for _ in range(num_transfers):
122
fileinfos.append(
123
FileInfo(src='bucket/key', dest='filename',
124
compare_key='key',
125
operation_name='download'))
126
127
self.s3_transfer_handler.call(fileinfos)
128
self.assertEqual(
129
self.transfer_manager.download.call_count, num_transfers)
130
131
def test_enqueue_copies(self):
132
fileinfos = []
133
num_transfers = 5
134
for _ in range(num_transfers):
135
fileinfos.append(
136
FileInfo(src='sourcebucket/sourcekey', dest='bucket/key',
137
compare_key='key',
138
operation_name='copy'))
139
140
self.s3_transfer_handler.call(fileinfos)
141
self.assertEqual(
142
self.transfer_manager.copy.call_count, num_transfers)
143
144
def test_exception_when_enqueuing(self):
145
fileinfos = [
146
FileInfo(src='filename', dest='bucket/key',
147
operation_name='upload')
148
]
149
self.transfer_manager.__exit__.side_effect = Exception(
150
'some exception')
151
command_result = self.s3_transfer_handler.call(fileinfos)
152
# Exception should have been raised casing the command result to
153
# have failed results of one.
154
self.assertEqual(command_result, (1, 0))
155
156
def test_enqueue_upload_stream(self):
157
self.parameters['is_stream'] = True
158
self.s3_transfer_handler.call(
159
[FileInfo(src='-', dest='bucket/key', operation_name='upload')])
160
self.assertEqual(
161
self.transfer_manager.upload.call_count, 1)
162
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
163
self.assertIsInstance(
164
upload_call_kwargs['fileobj'], NonSeekableStream)
165
166
def test_enqueue_dowload_stream(self):
167
self.parameters['is_stream'] = True
168
self.s3_transfer_handler.call(
169
[FileInfo(src='bucket/key', dest='-',
170
compare_key='key',
171
operation_name='download')])
172
self.assertEqual(
173
self.transfer_manager.download.call_count, 1)
174
download_call_kwargs = self.transfer_manager.download.call_args[1]
175
self.assertIsInstance(
176
download_call_kwargs['fileobj'], StdoutBytesWriter)
177
178
def test_enqueue_deletes(self):
179
fileinfos = []
180
num_transfers = 5
181
for _ in range(num_transfers):
182
fileinfos.append(
183
FileInfo(src='bucket/key', dest=None, operation_name='delete',
184
src_type='s3'))
185
186
self.s3_transfer_handler.call(fileinfos)
187
self.assertEqual(
188
self.transfer_manager.delete.call_count, num_transfers)
189
190
def test_enqueue_local_deletes(self):
191
fileinfos = []
192
num_transfers = 5
193
for _ in range(num_transfers):
194
fileinfos.append(
195
FileInfo(src='myfile', dest=None, operation_name='delete',
196
src_type='local'))
197
198
self.s3_transfer_handler.call(fileinfos)
199
# The number of processed results will be equal to:
200
# number_of_local_deletes * 2 + 1
201
# The 2 represents the QueuedResult and SuccessResult/FailureResult
202
# for each transfer
203
# The 1 represents the TotalFinalSubmissionResult
204
self.assertEqual(len(self.processed_results), 11)
205
206
# Make sure that the results are as expected by checking just one
207
# of them
208
first_submitted_result = self.processed_results[0]
209
self.assertEqual(first_submitted_result.transfer_type, 'delete')
210
self.assertTrue(first_submitted_result.src.endswith('myfile'))
211
212
# Also make sure that transfer manager's delete() was never called
213
self.assertEqual(self.transfer_manager.delete.call_count, 0)
214
215
def test_notifies_total_submissions(self):
216
fileinfos = []
217
num_transfers = 5
218
for _ in range(num_transfers):
219
fileinfos.append(
220
FileInfo(src='bucket/key', dest='filename',
221
compare_key='key',
222
operation_name='download'))
223
224
self.s3_transfer_handler.call(fileinfos)
225
self.assertEqual(
226
self.result_recorder.final_expected_files_transferred,
227
num_transfers
228
)
229
230
def test_notifies_total_submissions_accounts_for_skips(self):
231
fileinfos = []
232
num_transfers = 5
233
for _ in range(num_transfers):
234
fileinfos.append(
235
FileInfo(src='bucket/key', dest='filename',
236
compare_key='key',
237
operation_name='download'))
238
239
# Add a fileinfo that should get skipped. To skip, we do a glacier
240
# download.
241
fileinfos.append(FileInfo(
242
src='bucket/key', dest='filename', operation_name='download',
243
compare_key='key',
244
associated_response_data={'StorageClass': 'GLACIER'}))
245
self.s3_transfer_handler.call(fileinfos)
246
# Since the last glacier download was skipped the final expected
247
# total should be equal to the number of transfers provided in the
248
# for loop.
249
self.assertEqual(
250
self.result_recorder.final_expected_files_transferred,
251
num_transfers
252
)
253
254
255
class BaseTransferRequestSubmitterTest(unittest.TestCase):
256
def setUp(self):
257
self.transfer_manager = mock.Mock(spec=TransferManager)
258
self.result_queue = queue.Queue()
259
self.cli_params = {}
260
self.filename = 'myfile'
261
self.bucket = 'mybucket'
262
self.key = 'mykey'
263
264
265
class TestUploadRequestSubmitter(BaseTransferRequestSubmitterTest):
266
def setUp(self):
267
super(TestUploadRequestSubmitter, self).setUp()
268
self.transfer_request_submitter = UploadRequestSubmitter(
269
self.transfer_manager, self.result_queue, self.cli_params)
270
271
def test_can_submit(self):
272
fileinfo = FileInfo(
273
src=self.filename, dest=self.bucket+'/'+self.key,
274
operation_name='upload')
275
self.assertTrue(
276
self.transfer_request_submitter.can_submit(fileinfo))
277
fileinfo.operation_name = 'foo'
278
self.assertFalse(
279
self.transfer_request_submitter.can_submit(fileinfo))
280
281
def test_submit(self):
282
fileinfo = FileInfo(
283
src=self.filename, dest=self.bucket+'/'+self.key)
284
self.cli_params['guess_mime_type'] = True # Default settings
285
future = self.transfer_request_submitter.submit(fileinfo)
286
287
self.assertIs(self.transfer_manager.upload.return_value, future)
288
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
289
self.assertEqual(upload_call_kwargs['fileobj'], self.filename)
290
self.assertEqual(upload_call_kwargs['bucket'], self.bucket)
291
self.assertEqual(upload_call_kwargs['key'], self.key)
292
self.assertEqual(upload_call_kwargs['extra_args'], {})
293
294
# Make sure the subscriber applied are of the correct type and order
295
ref_subscribers = [
296
ProvideSizeSubscriber,
297
ProvideUploadContentTypeSubscriber,
298
UploadResultSubscriber
299
]
300
actual_subscribers = upload_call_kwargs['subscribers']
301
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
302
for i, actual_subscriber in enumerate(actual_subscribers):
303
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
304
305
def test_submit_with_extra_args(self):
306
fileinfo = FileInfo(
307
src=self.filename, dest=self.bucket+'/'+self.key)
308
# Set some extra argument like storage_class to make sure cli
309
# params get mapped to request parameters.
310
self.cli_params['storage_class'] = 'STANDARD_IA'
311
self.transfer_request_submitter.submit(fileinfo)
312
313
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
314
self.assertEqual(
315
upload_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})
316
317
def test_submit_when_content_type_specified(self):
318
fileinfo = FileInfo(
319
src=self.filename, dest=self.bucket+'/'+self.key)
320
self.cli_params['content_type'] = 'text/plain'
321
self.transfer_request_submitter.submit(fileinfo)
322
323
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
324
self.assertEqual(
325
upload_call_kwargs['extra_args'], {'ContentType': 'text/plain'})
326
ref_subscribers = [
327
ProvideSizeSubscriber,
328
UploadResultSubscriber
329
]
330
actual_subscribers = upload_call_kwargs['subscribers']
331
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
332
for i, actual_subscriber in enumerate(actual_subscribers):
333
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
334
335
def test_submit_when_no_guess_content_mime_type(self):
336
fileinfo = FileInfo(
337
src=self.filename, dest=self.bucket+'/'+self.key)
338
self.cli_params['guess_mime_type'] = False
339
self.transfer_request_submitter.submit(fileinfo)
340
341
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
342
ref_subscribers = [
343
ProvideSizeSubscriber,
344
UploadResultSubscriber
345
]
346
actual_subscribers = upload_call_kwargs['subscribers']
347
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
348
for i, actual_subscriber in enumerate(actual_subscribers):
349
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
350
351
def test_warn_on_too_large_transfer(self):
352
fileinfo = FileInfo(
353
src=self.filename, dest=self.bucket+'/'+self.key,
354
size=MAX_UPLOAD_SIZE+1)
355
future = self.transfer_request_submitter.submit(fileinfo)
356
357
# A warning should have been submitted because it is too large.
358
warning_result = self.result_queue.get()
359
self.assertIsInstance(warning_result, WarningResult)
360
self.assertIn('exceeds s3 upload limit', warning_result.message)
361
362
# Make sure that the transfer was still attempted
363
self.assertIs(self.transfer_manager.upload.return_value, future)
364
self.assertEqual(len(self.transfer_manager.upload.call_args_list), 1)
365
366
def test_dry_run(self):
367
self.cli_params['dryrun'] = True
368
self.transfer_request_submitter = UploadRequestSubmitter(
369
self.transfer_manager, self.result_queue, self.cli_params)
370
fileinfo = FileInfo(
371
src=self.filename, src_type='local', operation_name='upload',
372
dest=self.bucket + '/' + self.key, dest_type='s3')
373
self.transfer_request_submitter.submit(fileinfo)
374
375
result = self.result_queue.get()
376
self.assertIsInstance(result, DryRunResult)
377
self.assertEqual(result.transfer_type, 'upload')
378
self.assertTrue(result.src.endswith(self.filename))
379
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
380
381
def test_submit_move_adds_delete_source_subscriber(self):
382
fileinfo = FileInfo(
383
src=self.filename, dest=self.bucket+'/'+self.key)
384
self.cli_params['guess_mime_type'] = True # Default settings
385
self.cli_params['is_move'] = True
386
self.transfer_request_submitter.submit(fileinfo)
387
ref_subscribers = [
388
ProvideSizeSubscriber,
389
ProvideUploadContentTypeSubscriber,
390
DeleteSourceFileSubscriber,
391
UploadResultSubscriber,
392
]
393
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
394
actual_subscribers = upload_call_kwargs['subscribers']
395
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
396
for i, actual_subscriber in enumerate(actual_subscribers):
397
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
398
399
400
class TestDownloadRequestSubmitter(BaseTransferRequestSubmitterTest):
401
def setUp(self):
402
super(TestDownloadRequestSubmitter, self).setUp()
403
self.transfer_request_submitter = DownloadRequestSubmitter(
404
self.transfer_manager, self.result_queue, self.cli_params)
405
406
def assert_no_downloads_happened(self):
407
self.assertEqual(len(self.transfer_manager.download.call_args_list), 0)
408
409
def create_file_info(self, key, associated_response_data=None):
410
kwargs = {
411
'src': self.bucket + '/' + key,
412
'src_type': 's3',
413
'dest': self.filename,
414
'dest_type': 'local',
415
'operation_name': 'download',
416
'compare_key': key,
417
}
418
if associated_response_data is not None:
419
kwargs['associated_response_data'] = associated_response_data
420
return FileInfo(**kwargs)
421
422
def test_can_submit(self):
423
fileinfo = FileInfo(
424
src=self.bucket+'/'+self.key, dest=self.filename,
425
operation_name='download')
426
self.assertTrue(
427
self.transfer_request_submitter.can_submit(fileinfo))
428
fileinfo.operation_name = 'foo'
429
self.assertFalse(
430
self.transfer_request_submitter.can_submit(fileinfo))
431
432
def test_submit(self):
433
fileinfo = self.create_file_info(self.key)
434
future = self.transfer_request_submitter.submit(fileinfo)
435
436
self.assertIs(self.transfer_manager.download.return_value, future)
437
download_call_kwargs = self.transfer_manager.download.call_args[1]
438
self.assertEqual(download_call_kwargs['fileobj'], self.filename)
439
self.assertEqual(download_call_kwargs['bucket'], self.bucket)
440
self.assertEqual(download_call_kwargs['key'], self.key)
441
self.assertEqual(download_call_kwargs['extra_args'], {})
442
443
# Make sure the subscriber applied are of the correct type and order
444
ref_subscribers = [
445
ProvideSizeSubscriber,
446
ProvideETagSubscriber,
447
DirectoryCreatorSubscriber,
448
ProvideLastModifiedTimeSubscriber,
449
DownloadResultSubscriber
450
]
451
actual_subscribers = download_call_kwargs['subscribers']
452
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
453
for i, actual_subscriber in enumerate(actual_subscribers):
454
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
455
456
def test_submit_with_extra_args(self):
457
fileinfo = self.create_file_info(self.key)
458
self.cli_params['sse_c'] = 'AES256'
459
self.cli_params['sse_c_key'] = 'mykey'
460
self.transfer_request_submitter.submit(fileinfo)
461
462
# Set some extra argument like sse_c to make sure cli
463
# params get mapped to request parameters.
464
download_call_kwargs = self.transfer_manager.download.call_args[1]
465
self.assertEqual(
466
download_call_kwargs['extra_args'],
467
{'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': 'mykey'}
468
)
469
470
def test_warn_glacier_for_incompatible(self):
471
fileinfo = FileInfo(
472
src=self.bucket+'/'+self.key, dest=self.filename,
473
operation_name='download',
474
associated_response_data={
475
'StorageClass': 'GLACIER',
476
}
477
)
478
future = self.transfer_request_submitter.submit(fileinfo)
479
480
# A warning should have been submitted because it is a non-restored
481
# glacier object.
482
warning_result = self.result_queue.get()
483
self.assertIsInstance(warning_result, WarningResult)
484
self.assertIn(
485
'Unable to perform download operations on GLACIER objects',
486
warning_result.message)
487
488
# The transfer should have been skipped.
489
self.assertIsNone(future)
490
self.assert_no_downloads_happened()
491
492
def test_not_warn_glacier_for_compatible(self):
493
fileinfo = self.create_file_info(
494
self.key, associated_response_data={
495
'StorageClass': 'GLACIER',
496
'Restore': 'ongoing-request="false"'
497
}
498
)
499
future = self.transfer_request_submitter.submit(fileinfo)
500
501
# A warning should have not been submitted because it is a restored
502
# glacier object.
503
self.assertTrue(self.result_queue.empty())
504
505
# And the transfer should not have been skipped.
506
self.assertIs(self.transfer_manager.download.return_value, future)
507
self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)
508
509
def test_warn_glacier_force_glacier(self):
510
self.cli_params['force_glacier_transfer'] = True
511
fileinfo = self.create_file_info(
512
self.key,
513
associated_response_data={
514
'StorageClass': 'GLACIER',
515
}
516
)
517
future = self.transfer_request_submitter.submit(fileinfo)
518
519
# A warning should have not been submitted because it is glacier
520
# transfers were forced.
521
self.assertTrue(self.result_queue.empty())
522
self.assertIs(self.transfer_manager.download.return_value, future)
523
self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)
524
525
def test_warn_glacier_ignore_glacier_warnings(self):
526
self.cli_params['ignore_glacier_warnings'] = True
527
fileinfo = FileInfo(
528
src=self.bucket+'/'+self.key, dest=self.filename,
529
operation_name='download',
530
associated_response_data={
531
'StorageClass': 'GLACIER',
532
}
533
)
534
future = self.transfer_request_submitter.submit(fileinfo)
535
536
# A warning should have not been submitted because it was specified
537
# to ignore glacier warnings.
538
self.assertTrue(self.result_queue.empty())
539
# But the transfer still should have been skipped.
540
self.assertIsNone(future)
541
self.assert_no_downloads_happened()
542
543
def test_warn_and_ignore_on_parent_dir_reference(self):
544
fileinfo = self.create_file_info('../foo.txt')
545
future = self.transfer_request_submitter.submit(fileinfo)
546
warning_result = self.result_queue.get()
547
self.assertIsInstance(warning_result, WarningResult)
548
self.assert_no_downloads_happened()
549
550
def test_warn_and_ignore_with_leading_chars(self):
551
fileinfo = self.create_file_info('a/../../foo.txt')
552
future = self.transfer_request_submitter.submit(fileinfo)
553
warning_result = self.result_queue.get()
554
self.assertIsInstance(warning_result, WarningResult)
555
self.assert_no_downloads_happened()
556
557
def test_allow_double_dots_that_dont_escape_cwd(self):
558
self.cli_params['dryrun'] = True
559
# This is fine because it's 'foo.txt'.
560
fileinfo = self.create_file_info('a/../foo.txt')
561
future = self.transfer_request_submitter.submit(fileinfo)
562
self.assertIsInstance(self.result_queue.get(), DryRunResult)
563
564
def test_dry_run(self):
565
self.cli_params['dryrun'] = True
566
self.transfer_request_submitter = DownloadRequestSubmitter(
567
self.transfer_manager, self.result_queue, self.cli_params)
568
fileinfo = self.create_file_info(self.key)
569
self.transfer_request_submitter.submit(fileinfo)
570
571
result = self.result_queue.get()
572
self.assertIsInstance(result, DryRunResult)
573
self.assertEqual(result.transfer_type, 'download')
574
self.assertTrue(result.dest.endswith(self.filename))
575
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
576
577
def test_submit_move_adds_delete_source_subscriber(self):
578
fileinfo = self.create_file_info(self.key)
579
self.cli_params['guess_mime_type'] = True # Default settings
580
self.cli_params['is_move'] = True
581
self.transfer_request_submitter.submit(fileinfo)
582
ref_subscribers = [
583
ProvideSizeSubscriber,
584
ProvideETagSubscriber,
585
DirectoryCreatorSubscriber,
586
ProvideLastModifiedTimeSubscriber,
587
DeleteSourceObjectSubscriber,
588
DownloadResultSubscriber,
589
]
590
download_call_kwargs = self.transfer_manager.download.call_args[1]
591
actual_subscribers = download_call_kwargs['subscribers']
592
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
593
for i, actual_subscriber in enumerate(actual_subscribers):
594
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
595
596
597
class TestCopyRequestSubmitter(BaseTransferRequestSubmitterTest):
598
def setUp(self):
599
super(TestCopyRequestSubmitter, self).setUp()
600
self.source_bucket = 'mysourcebucket'
601
self.source_key = 'mysourcekey'
602
self.transfer_request_submitter = CopyRequestSubmitter(
603
self.transfer_manager, self.result_queue, self.cli_params)
604
605
def test_can_submit(self):
606
fileinfo = FileInfo(
607
src=self.source_bucket+'/'+self.source_key,
608
dest=self.bucket+'/'+self.key, operation_name='copy')
609
self.assertTrue(
610
self.transfer_request_submitter.can_submit(fileinfo))
611
fileinfo.operation_name = 'foo'
612
self.assertFalse(
613
self.transfer_request_submitter.can_submit(fileinfo))
614
615
def test_submit(self):
616
fileinfo = FileInfo(
617
src=self.source_bucket+'/'+self.source_key,
618
dest=self.bucket+'/'+self.key)
619
self.cli_params['guess_mime_type'] = True # Default settings
620
future = self.transfer_request_submitter.submit(fileinfo)
621
self.assertIs(self.transfer_manager.copy.return_value, future)
622
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
623
self.assertEqual(
624
copy_call_kwargs['copy_source'],
625
{'Bucket': self.source_bucket, 'Key': self.source_key})
626
self.assertEqual(copy_call_kwargs['bucket'], self.bucket)
627
self.assertEqual(copy_call_kwargs['key'], self.key)
628
self.assertEqual(copy_call_kwargs['extra_args'], {})
629
630
# Make sure the subscriber applied are of the correct type and order
631
ref_subscribers = [
632
ProvideSizeSubscriber,
633
ProvideETagSubscriber,
634
ProvideCopyContentTypeSubscriber,
635
CopyResultSubscriber
636
]
637
actual_subscribers = copy_call_kwargs['subscribers']
638
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
639
for i, actual_subscriber in enumerate(actual_subscribers):
640
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
641
642
def test_submit_with_extra_args(self):
643
fileinfo = FileInfo(
644
src=self.source_bucket+'/'+self.source_key,
645
dest=self.bucket+'/'+self.key)
646
# Set some extra argument like storage_class to make sure cli
647
# params get mapped to request parameters.
648
self.cli_params['storage_class'] = 'STANDARD_IA'
649
self.transfer_request_submitter.submit(fileinfo)
650
651
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
652
self.assertEqual(
653
copy_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})
654
655
def test_submit_when_content_type_specified(self):
656
fileinfo = FileInfo(
657
src=self.source_bucket+'/'+self.source_key,
658
dest=self.bucket+'/'+self.key)
659
self.cli_params['content_type'] = 'text/plain'
660
self.transfer_request_submitter.submit(fileinfo)
661
662
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
663
self.assertEqual(
664
copy_call_kwargs['extra_args'], {'ContentType': 'text/plain'})
665
ref_subscribers = [
666
ProvideSizeSubscriber,
667
ProvideETagSubscriber,
668
CopyResultSubscriber
669
]
670
actual_subscribers = copy_call_kwargs['subscribers']
671
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
672
for i, actual_subscriber in enumerate(actual_subscribers):
673
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
674
675
def test_submit_when_no_guess_content_mime_type(self):
676
fileinfo = FileInfo(
677
src=self.source_bucket+'/'+self.source_key,
678
dest=self.bucket+'/'+self.key)
679
self.cli_params['guess_mime_type'] = False
680
self.transfer_request_submitter.submit(fileinfo)
681
682
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
683
ref_subscribers = [
684
ProvideSizeSubscriber,
685
ProvideETagSubscriber,
686
CopyResultSubscriber
687
]
688
actual_subscribers = copy_call_kwargs['subscribers']
689
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
690
for i, actual_subscriber in enumerate(actual_subscribers):
691
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
692
693
def test_warn_glacier_for_incompatible(self):
694
fileinfo = FileInfo(
695
src=self.source_bucket+'/'+self.source_key,
696
dest=self.bucket+'/'+self.key,
697
operation_name='copy',
698
associated_response_data={
699
'StorageClass': 'GLACIER',
700
}
701
)
702
future = self.transfer_request_submitter.submit(fileinfo)
703
704
# A warning should have been submitted because it is a non-restored
705
# glacier object.
706
warning_result = self.result_queue.get()
707
self.assertIsInstance(warning_result, WarningResult)
708
self.assertIn(
709
'Unable to perform copy operations on GLACIER objects',
710
warning_result.message)
711
712
# The transfer request should have never been sent therefore return
713
# no future.
714
self.assertIsNone(future)
715
# The transfer should have been skipped.
716
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)
717
718
def test_not_warn_glacier_for_compatible(self):
719
fileinfo = FileInfo(
720
src=self.source_bucket+'/'+self.source_key,
721
dest=self.bucket+'/'+self.key,
722
operation_name='copy',
723
associated_response_data={
724
'StorageClass': 'GLACIER',
725
'Restore': 'ongoing-request="false"'
726
}
727
)
728
future = self.transfer_request_submitter.submit(fileinfo)
729
self.assertIs(self.transfer_manager.copy.return_value, future)
730
731
# A warning should have not been submitted because it is a restored
732
# glacier object.
733
self.assertTrue(self.result_queue.empty())
734
735
# And the transfer should not have been skipped.
736
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)
737
738
def test_warn_glacier_force_glacier(self):
739
self.cli_params['force_glacier_transfer'] = True
740
fileinfo = FileInfo(
741
src=self.source_bucket+'/'+self.source_key,
742
dest=self.bucket+'/'+self.key,
743
operation_name='copy',
744
associated_response_data={
745
'StorageClass': 'GLACIER',
746
}
747
)
748
future = self.transfer_request_submitter.submit(fileinfo)
749
self.assertIs(self.transfer_manager.copy.return_value, future)
750
751
# A warning should have not been submitted because it is glacier
752
# transfers were forced.
753
self.assertTrue(self.result_queue.empty())
754
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)
755
756
def test_warn_glacier_ignore_glacier_warnings(self):
757
self.cli_params['ignore_glacier_warnings'] = True
758
fileinfo = FileInfo(
759
src=self.source_bucket+'/'+self.source_key,
760
dest=self.bucket+'/'+self.key,
761
operation_name='copy',
762
associated_response_data={
763
'StorageClass': 'GLACIER',
764
}
765
)
766
future = self.transfer_request_submitter.submit(fileinfo)
767
768
# The transfer request should have never been sent therefore return
769
# no future.
770
self.assertIsNone(future)
771
# A warning should have not been submitted because it was specified
772
# to ignore glacier warnings.
773
self.assertTrue(self.result_queue.empty())
774
# But the transfer still should have been skipped.
775
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)
776
777
def test_dry_run(self):
778
self.cli_params['dryrun'] = True
779
self.transfer_request_submitter = CopyRequestSubmitter(
780
self.transfer_manager, self.result_queue, self.cli_params)
781
fileinfo = FileInfo(
782
src=self.source_bucket + '/' + self.source_key, src_type='s3',
783
dest=self.bucket + '/' + self.key, dest_type='s3',
784
operation_name='copy')
785
self.transfer_request_submitter.submit(fileinfo)
786
787
result = self.result_queue.get()
788
self.assertIsInstance(result, DryRunResult)
789
self.assertEqual(result.transfer_type, 'copy')
790
source = 's3://' + self.source_bucket + '/' + self.source_key
791
self.assertEqual(result.src, source)
792
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
793
794
def test_submit_move_adds_delete_source_subscriber(self):
795
fileinfo = FileInfo(
796
dest=self.source_bucket + '/' + self.source_key,
797
src=self.bucket + '/' + self.key)
798
self.cli_params['guess_mime_type'] = True # Default settings
799
self.cli_params['is_move'] = True
800
self.transfer_request_submitter.submit(fileinfo)
801
ref_subscribers = [
802
ProvideSizeSubscriber,
803
ProvideETagSubscriber,
804
ProvideCopyContentTypeSubscriber,
805
DeleteSourceObjectSubscriber,
806
CopyResultSubscriber,
807
]
808
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
809
actual_subscribers = copy_call_kwargs['subscribers']
810
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
811
for i, actual_subscriber in enumerate(actual_subscribers):
812
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
813
814
815
class TestUploadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):
816
def setUp(self):
817
super(TestUploadStreamRequestSubmitter, self).setUp()
818
self.filename = '-'
819
self.cli_params['is_stream'] = True
820
self.transfer_request_submitter = UploadStreamRequestSubmitter(
821
self.transfer_manager, self.result_queue, self.cli_params)
822
823
def test_can_submit(self):
824
fileinfo = FileInfo(
825
src=self.filename, dest=self.bucket+'/'+self.key,
826
operation_name='upload')
827
self.assertTrue(
828
self.transfer_request_submitter.can_submit(fileinfo))
829
self.cli_params['is_stream'] = False
830
self.assertFalse(
831
self.transfer_request_submitter.can_submit(fileinfo))
832
833
def test_submit(self):
834
fileinfo = FileInfo(
835
src=self.filename, dest=self.bucket+'/'+self.key)
836
future = self.transfer_request_submitter.submit(fileinfo)
837
self.assertIs(self.transfer_manager.upload.return_value, future)
838
839
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
840
self.assertIsInstance(
841
upload_call_kwargs['fileobj'], NonSeekableStream)
842
self.assertEqual(upload_call_kwargs['bucket'], self.bucket)
843
self.assertEqual(upload_call_kwargs['key'], self.key)
844
self.assertEqual(upload_call_kwargs['extra_args'], {})
845
846
ref_subscribers = [
847
UploadStreamResultSubscriber
848
]
849
actual_subscribers = upload_call_kwargs['subscribers']
850
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
851
for i, actual_subscriber in enumerate(actual_subscribers):
852
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
853
854
def test_submit_with_expected_size_provided(self):
855
provided_size = 100
856
self.cli_params['expected_size'] = provided_size
857
fileinfo = FileInfo(
858
src=self.filename, dest=self.bucket+'/'+self.key)
859
self.transfer_request_submitter.submit(fileinfo)
860
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
861
862
ref_subscribers = [
863
ProvideSizeSubscriber,
864
UploadStreamResultSubscriber
865
]
866
actual_subscribers = upload_call_kwargs['subscribers']
867
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
868
for i, actual_subscriber in enumerate(actual_subscribers):
869
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
870
# The ProvideSizeSubscriber should be providing the correct size
871
self.assertEqual(actual_subscribers[0].size, provided_size)
872
873
def test_dry_run(self):
874
self.cli_params['dryrun'] = True
875
self.transfer_request_submitter = UploadStreamRequestSubmitter(
876
self.transfer_manager, self.result_queue, self.cli_params)
877
fileinfo = FileInfo(
878
src=self.filename, src_type='local', operation_name='upload',
879
dest=self.bucket + '/' + self.key, dest_type='s3')
880
self.transfer_request_submitter.submit(fileinfo)
881
882
result = self.result_queue.get()
883
self.assertIsInstance(result, DryRunResult)
884
self.assertEqual(result.transfer_type, 'upload')
885
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
886
self.assertEqual(result.src, '-')
887
888
889
class TestDownloadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):
890
def setUp(self):
891
super(TestDownloadStreamRequestSubmitter, self).setUp()
892
self.filename = '-'
893
self.cli_params['is_stream'] = True
894
self.transfer_request_submitter = DownloadStreamRequestSubmitter(
895
self.transfer_manager, self.result_queue, self.cli_params)
896
897
def test_can_submit(self):
898
fileinfo = FileInfo(
899
src=self.bucket+'/'+self.key, dest=self.filename,
900
operation_name='download')
901
self.assertTrue(
902
self.transfer_request_submitter.can_submit(fileinfo))
903
self.cli_params['is_stream'] = False
904
self.assertFalse(
905
self.transfer_request_submitter.can_submit(fileinfo))
906
907
def test_submit(self):
908
fileinfo = FileInfo(
909
src=self.bucket+'/'+self.key, dest=self.filename,
910
compare_key=self.key)
911
future = self.transfer_request_submitter.submit(fileinfo)
912
self.assertIs(self.transfer_manager.download.return_value, future)
913
914
download_call_kwargs = self.transfer_manager.download.call_args[1]
915
self.assertIsInstance(
916
download_call_kwargs['fileobj'], StdoutBytesWriter)
917
self.assertEqual(download_call_kwargs['bucket'], self.bucket)
918
self.assertEqual(download_call_kwargs['key'], self.key)
919
self.assertEqual(download_call_kwargs['extra_args'], {})
920
921
ref_subscribers = [
922
DownloadStreamResultSubscriber
923
]
924
actual_subscribers = download_call_kwargs['subscribers']
925
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
926
for i, actual_subscriber in enumerate(actual_subscribers):
927
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
928
929
def test_dry_run(self):
930
self.cli_params['dryrun'] = True
931
self.transfer_request_submitter = DownloadStreamRequestSubmitter(
932
self.transfer_manager, self.result_queue, self.cli_params)
933
fileinfo = FileInfo(
934
dest=self.filename, dest_type='local', operation_name='download',
935
src=self.bucket + '/' + self.key, src_type='s3',
936
compare_key=self.key)
937
self.transfer_request_submitter.submit(fileinfo)
938
939
result = self.result_queue.get()
940
self.assertIsInstance(result, DryRunResult)
941
self.assertEqual(result.transfer_type, 'download')
942
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
943
self.assertEqual(result.dest, '-')
944
945
946
class TestDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):
947
def setUp(self):
948
super(TestDeleteRequestSubmitter, self).setUp()
949
self.transfer_request_submitter = DeleteRequestSubmitter(
950
self.transfer_manager, self.result_queue, self.cli_params)
951
952
def test_can_submit(self):
953
fileinfo = FileInfo(
954
src=self.bucket+'/'+self.key, dest=None, operation_name='delete',
955
src_type='s3')
956
self.assertTrue(
957
self.transfer_request_submitter.can_submit(fileinfo))
958
fileinfo.operation_name = 'foo'
959
self.assertFalse(
960
self.transfer_request_submitter.can_submit(fileinfo))
961
962
def test_cannot_submit_local_deletes(self):
963
fileinfo = FileInfo(
964
src=self.bucket+'/'+self.key, dest=None, operation_name='delete',
965
src_type='local')
966
self.assertFalse(
967
self.transfer_request_submitter.can_submit(fileinfo))
968
969
def test_submit(self):
970
fileinfo = FileInfo(
971
src=self.bucket+'/'+self.key, dest=None, operation_name='delete')
972
future = self.transfer_request_submitter.submit(fileinfo)
973
self.assertIs(self.transfer_manager.delete.return_value, future)
974
975
delete_call_kwargs = self.transfer_manager.delete.call_args[1]
976
self.assertEqual(delete_call_kwargs['bucket'], self.bucket)
977
self.assertEqual(delete_call_kwargs['key'], self.key)
978
self.assertEqual(delete_call_kwargs['extra_args'], {})
979
980
ref_subscribers = [
981
DeleteResultSubscriber
982
]
983
actual_subscribers = delete_call_kwargs['subscribers']
984
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
985
for i, actual_subscriber in enumerate(actual_subscribers):
986
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
987
988
def test_dry_run(self):
989
self.cli_params['dryrun'] = True
990
self.transfer_request_submitter = DeleteRequestSubmitter(
991
self.transfer_manager, self.result_queue, self.cli_params)
992
fileinfo = FileInfo(
993
src=self.bucket + '/' + self.key, src_type='s3',
994
dest=self.bucket + '/' + self.key, dest_type='s3',
995
operation_name='delete')
996
self.transfer_request_submitter.submit(fileinfo)
997
998
result = self.result_queue.get()
999
self.assertIsInstance(result, DryRunResult)
1000
self.assertEqual(result.transfer_type, 'delete')
1001
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
1002
self.assertIsNone(result.dest)
1003
1004
1005
class TestLocalDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):
1006
def setUp(self):
1007
super(TestLocalDeleteRequestSubmitter, self).setUp()
1008
self.transfer_request_submitter = LocalDeleteRequestSubmitter(
1009
self.transfer_manager, self.result_queue, self.cli_params)
1010
self.file_creator = FileCreator()
1011
1012
def tearDown(self):
1013
super(TestLocalDeleteRequestSubmitter, self).tearDown()
1014
self.file_creator.remove_all()
1015
1016
def test_can_submit(self):
1017
fileinfo = FileInfo(
1018
src=self.filename, dest=None, operation_name='delete',
1019
src_type='local')
1020
self.assertTrue(
1021
self.transfer_request_submitter.can_submit(fileinfo))
1022
fileinfo.operation_name = 'foo'
1023
self.assertFalse(
1024
self.transfer_request_submitter.can_submit(fileinfo))
1025
1026
def test_cannot_submit_remote_deletes(self):
1027
fileinfo = FileInfo(
1028
src=self.filename, dest=None, operation_name='delete',
1029
src_type='s3')
1030
self.assertFalse(
1031
self.transfer_request_submitter.can_submit(fileinfo))
1032
1033
def test_submit(self):
1034
full_filename = self.file_creator.create_file(self.filename, 'content')
1035
fileinfo = FileInfo(
1036
src=full_filename, dest=None, operation_name='delete',
1037
src_type='local')
1038
rval = self.transfer_request_submitter.submit(fileinfo)
1039
self.assertTrue(rval)
1040
1041
queued_result = self.result_queue.get()
1042
self.assertIsInstance(queued_result, QueuedResult)
1043
self.assertEqual(queued_result.transfer_type, 'delete')
1044
self.assertTrue(queued_result.src.endswith(self.filename))
1045
self.assertIsNone(queued_result.dest)
1046
self.assertEqual(queued_result.total_transfer_size, 0)
1047
1048
failure_result = self.result_queue.get()
1049
self.assertIsInstance(failure_result, SuccessResult)
1050
self.assertEqual(failure_result.transfer_type, 'delete')
1051
self.assertTrue(failure_result.src.endswith(self.filename))
1052
self.assertIsNone(failure_result.dest)
1053
1054
self.assertFalse(os.path.exists(full_filename))
1055
1056
def test_submit_with_exception(self):
1057
fileinfo = FileInfo(
1058
src=self.filename, dest=None, operation_name='delete',
1059
src_type='local')
1060
# The file was never created so it should trigger an exception
1061
# when it is attempted to be deleted in the submitter.
1062
rval = self.transfer_request_submitter.submit(fileinfo)
1063
self.assertTrue(rval)
1064
1065
queued_result = self.result_queue.get()
1066
self.assertIsInstance(queued_result, QueuedResult)
1067
self.assertEqual(queued_result.transfer_type, 'delete')
1068
self.assertTrue(queued_result.src.endswith(self.filename))
1069
self.assertIsNone(queued_result.dest)
1070
self.assertEqual(queued_result.total_transfer_size, 0)
1071
1072
failure_result = self.result_queue.get()
1073
self.assertIsInstance(failure_result, FailureResult)
1074
self.assertEqual(failure_result.transfer_type, 'delete')
1075
self.assertTrue(failure_result.src.endswith(self.filename))
1076
self.assertIsNone(failure_result.dest)
1077
1078
def test_dry_run(self):
1079
self.cli_params['dryrun'] = True
1080
fileinfo = FileInfo(
1081
src=self.filename, src_type='local',
1082
dest=self.filename, dest_type='local',
1083
operation_name='delete')
1084
self.transfer_request_submitter.submit(fileinfo)
1085
1086
result = self.result_queue.get()
1087
self.assertIsInstance(result, DryRunResult)
1088
self.assertEqual(result.transfer_type, 'delete')
1089
self.assertTrue(result.src.endswith(self.filename))
1090
self.assertIsNone(result.dest)
1091
1092