Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/s3/test_s3handler.py
1569 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import os
14
15
from s3transfer.manager import TransferManager
16
17
from awscli.testutils import mock
18
from awscli.testutils import unittest
19
from awscli.testutils import FileCreator
20
from awscli.compat import queue
21
from awscli.customizations.s3.s3handler import S3TransferHandler
22
from awscli.customizations.s3.s3handler import S3TransferHandlerFactory
23
from awscli.customizations.s3.s3handler import UploadRequestSubmitter
24
from awscli.customizations.s3.s3handler import DownloadRequestSubmitter
25
from awscli.customizations.s3.s3handler import CopyRequestSubmitter
26
from awscli.customizations.s3.s3handler import UploadStreamRequestSubmitter
27
from awscli.customizations.s3.s3handler import DownloadStreamRequestSubmitter
28
from awscli.customizations.s3.s3handler import DeleteRequestSubmitter
29
from awscli.customizations.s3.s3handler import LocalDeleteRequestSubmitter
30
from awscli.customizations.s3.fileinfo import FileInfo
31
from awscli.customizations.s3.results import QueuedResult
32
from awscli.customizations.s3.results import SuccessResult
33
from awscli.customizations.s3.results import FailureResult
34
from awscli.customizations.s3.results import UploadResultSubscriber
35
from awscli.customizations.s3.results import DownloadResultSubscriber
36
from awscli.customizations.s3.results import CopyResultSubscriber
37
from awscli.customizations.s3.results import UploadStreamResultSubscriber
38
from awscli.customizations.s3.results import DownloadStreamResultSubscriber
39
from awscli.customizations.s3.results import DeleteResultSubscriber
40
from awscli.customizations.s3.results import ResultRecorder
41
from awscli.customizations.s3.results import ResultProcessor
42
from awscli.customizations.s3.results import CommandResultRecorder
43
from awscli.customizations.s3.results import DryRunResult
44
from awscli.customizations.s3.utils import MAX_UPLOAD_SIZE
45
from awscli.customizations.s3.utils import NonSeekableStream
46
from awscli.customizations.s3.utils import StdoutBytesWriter
47
from awscli.customizations.s3.utils import WarningResult
48
from awscli.customizations.s3.utils import ProvideSizeSubscriber
49
from awscli.customizations.s3.utils import ProvideETagSubscriber
50
from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber
51
from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber
52
from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber
53
from awscli.customizations.s3.utils import DirectoryCreatorSubscriber
54
from awscli.customizations.s3.utils import DeleteSourceFileSubscriber
55
from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber
56
from awscli.customizations.s3.transferconfig import RuntimeConfig
57
58
59
def runtime_config(**kwargs):
60
return RuntimeConfig().build_config(**kwargs)
61
62
63
class TestS3TransferHandlerFactory(unittest.TestCase):
64
def setUp(self):
65
self.cli_params = {}
66
self.runtime_config = runtime_config()
67
self.client = mock.Mock()
68
self.result_queue = queue.Queue()
69
70
def test_call(self):
71
factory = S3TransferHandlerFactory(
72
self.cli_params, self.runtime_config)
73
self.assertIsInstance(
74
factory(self.client, self.result_queue), S3TransferHandler)
75
76
77
class TestS3TransferHandler(unittest.TestCase):
78
def setUp(self):
79
self.result_queue = queue.Queue()
80
self.result_recorder = ResultRecorder()
81
self.processed_results = []
82
self.result_processor = ResultProcessor(
83
self.result_queue,
84
[self.result_recorder, self.processed_results.append]
85
)
86
self.command_result_recorder = CommandResultRecorder(
87
self.result_queue, self.result_recorder, self.result_processor)
88
89
self.transfer_manager = mock.Mock(spec=TransferManager)
90
self.transfer_manager.__enter__ = mock.Mock()
91
self.transfer_manager.__exit__ = mock.Mock()
92
self.parameters = {}
93
self.s3_transfer_handler = S3TransferHandler(
94
self.transfer_manager, self.parameters,
95
self.command_result_recorder
96
)
97
98
def test_call_return_command_result(self):
99
num_failures = 5
100
num_warnings = 3
101
self.result_recorder.files_failed = num_failures
102
self.result_recorder.files_warned = num_warnings
103
command_result = self.s3_transfer_handler.call([])
104
self.assertEqual(command_result, (num_failures, num_warnings))
105
106
def test_enqueue_uploads(self):
107
fileinfos = []
108
num_transfers = 5
109
for _ in range(num_transfers):
110
fileinfos.append(
111
FileInfo(src='filename', dest='bucket/key',
112
operation_name='upload'))
113
114
self.s3_transfer_handler.call(fileinfos)
115
self.assertEqual(
116
self.transfer_manager.upload.call_count, num_transfers)
117
118
def test_enqueue_downloads(self):
119
fileinfos = []
120
num_transfers = 5
121
for _ in range(num_transfers):
122
fileinfos.append(
123
FileInfo(src='bucket/key', dest='filename',
124
compare_key='key',
125
operation_name='download'))
126
127
self.s3_transfer_handler.call(fileinfos)
128
self.assertEqual(
129
self.transfer_manager.download.call_count, num_transfers)
130
131
def test_enqueue_copies(self):
132
fileinfos = []
133
num_transfers = 5
134
for _ in range(num_transfers):
135
fileinfos.append(
136
FileInfo(src='sourcebucket/sourcekey', dest='bucket/key',
137
compare_key='key',
138
operation_name='copy'))
139
140
self.s3_transfer_handler.call(fileinfos)
141
self.assertEqual(
142
self.transfer_manager.copy.call_count, num_transfers)
143
144
def test_exception_when_enqueuing(self):
145
fileinfos = [
146
FileInfo(src='filename', dest='bucket/key',
147
operation_name='upload')
148
]
149
self.transfer_manager.__exit__.side_effect = Exception(
150
'some exception')
151
command_result = self.s3_transfer_handler.call(fileinfos)
152
# Exception should have been raised casing the command result to
153
# have failed results of one.
154
self.assertEqual(command_result, (1, 0))
155
156
def test_enqueue_upload_stream(self):
157
self.parameters['is_stream'] = True
158
self.s3_transfer_handler.call(
159
[FileInfo(src='-', dest='bucket/key', operation_name='upload')])
160
self.assertEqual(
161
self.transfer_manager.upload.call_count, 1)
162
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
163
self.assertIsInstance(
164
upload_call_kwargs['fileobj'], NonSeekableStream)
165
166
def test_enqueue_dowload_stream(self):
167
self.parameters['is_stream'] = True
168
self.s3_transfer_handler.call(
169
[FileInfo(src='bucket/key', dest='-',
170
compare_key='key',
171
operation_name='download')])
172
self.assertEqual(
173
self.transfer_manager.download.call_count, 1)
174
download_call_kwargs = self.transfer_manager.download.call_args[1]
175
self.assertIsInstance(
176
download_call_kwargs['fileobj'], StdoutBytesWriter)
177
178
def test_enqueue_deletes(self):
179
fileinfos = []
180
num_transfers = 5
181
for _ in range(num_transfers):
182
fileinfos.append(
183
FileInfo(src='bucket/key', dest=None, operation_name='delete',
184
src_type='s3'))
185
186
self.s3_transfer_handler.call(fileinfos)
187
self.assertEqual(
188
self.transfer_manager.delete.call_count, num_transfers)
189
190
def test_enqueue_local_deletes(self):
191
fileinfos = []
192
num_transfers = 5
193
for _ in range(num_transfers):
194
fileinfos.append(
195
FileInfo(src='myfile', dest=None, operation_name='delete',
196
src_type='local'))
197
198
self.s3_transfer_handler.call(fileinfos)
199
# The number of processed results will be equal to:
200
# number_of_local_deletes * 2 + 1
201
# The 2 represents the QueuedResult and SuccessResult/FailureResult
202
# for each transfer
203
# The 1 represents the TotalFinalSubmissionResult
204
self.assertEqual(len(self.processed_results), 11)
205
206
# Make sure that the results are as expected by checking just one
207
# of them
208
first_submitted_result = self.processed_results[0]
209
self.assertEqual(first_submitted_result.transfer_type, 'delete')
210
self.assertTrue(first_submitted_result.src.endswith('myfile'))
211
212
# Also make sure that transfer manager's delete() was never called
213
self.assertEqual(self.transfer_manager.delete.call_count, 0)
214
215
def test_notifies_total_submissions(self):
216
fileinfos = []
217
num_transfers = 5
218
for _ in range(num_transfers):
219
fileinfos.append(
220
FileInfo(src='bucket/key', dest='filename',
221
compare_key='key',
222
operation_name='download'))
223
224
self.s3_transfer_handler.call(fileinfos)
225
self.assertEqual(
226
self.result_recorder.final_expected_files_transferred,
227
num_transfers
228
)
229
230
def test_notifies_total_submissions_accounts_for_skips(self):
231
fileinfos = []
232
num_transfers = 5
233
for _ in range(num_transfers):
234
fileinfos.append(
235
FileInfo(src='bucket/key', dest='filename',
236
compare_key='key',
237
operation_name='download'))
238
239
# Add a fileinfo that should get skipped. To skip, we do a glacier
240
# download.
241
fileinfos.append(FileInfo(
242
src='bucket/key', dest='filename', operation_name='download',
243
compare_key='key',
244
associated_response_data={'StorageClass': 'GLACIER'}))
245
self.s3_transfer_handler.call(fileinfos)
246
# Since the last glacier download was skipped the final expected
247
# total should be equal to the number of transfers provided in the
248
# for loop.
249
self.assertEqual(
250
self.result_recorder.final_expected_files_transferred,
251
num_transfers
252
)
253
254
255
class BaseTransferRequestSubmitterTest(unittest.TestCase):
256
def setUp(self):
257
self.transfer_manager = mock.Mock(spec=TransferManager)
258
self.result_queue = queue.Queue()
259
self.cli_params = {}
260
self.filename = 'myfile'
261
self.bucket = 'mybucket'
262
self.key = 'mykey'
263
264
265
class TestUploadRequestSubmitter(BaseTransferRequestSubmitterTest):
266
def setUp(self):
267
super(TestUploadRequestSubmitter, self).setUp()
268
self.transfer_request_submitter = UploadRequestSubmitter(
269
self.transfer_manager, self.result_queue, self.cli_params)
270
271
def test_can_submit(self):
272
fileinfo = FileInfo(
273
src=self.filename, dest=self.bucket+'/'+self.key,
274
operation_name='upload')
275
self.assertTrue(
276
self.transfer_request_submitter.can_submit(fileinfo))
277
fileinfo.operation_name = 'foo'
278
self.assertFalse(
279
self.transfer_request_submitter.can_submit(fileinfo))
280
281
def test_submit(self):
282
fileinfo = FileInfo(
283
src=self.filename, dest=self.bucket+'/'+self.key)
284
self.cli_params['guess_mime_type'] = True # Default settings
285
future = self.transfer_request_submitter.submit(fileinfo)
286
287
self.assertIs(self.transfer_manager.upload.return_value, future)
288
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
289
self.assertEqual(upload_call_kwargs['fileobj'], self.filename)
290
self.assertEqual(upload_call_kwargs['bucket'], self.bucket)
291
self.assertEqual(upload_call_kwargs['key'], self.key)
292
self.assertEqual(upload_call_kwargs['extra_args'], {})
293
294
# Make sure the subscriber applied are of the correct type and order
295
ref_subscribers = [
296
ProvideSizeSubscriber,
297
ProvideUploadContentTypeSubscriber,
298
UploadResultSubscriber
299
]
300
actual_subscribers = upload_call_kwargs['subscribers']
301
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
302
for i, actual_subscriber in enumerate(actual_subscribers):
303
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
304
305
def test_submit_with_extra_args(self):
306
fileinfo = FileInfo(
307
src=self.filename, dest=self.bucket+'/'+self.key)
308
# Set some extra argument like storage_class to make sure cli
309
# params get mapped to request parameters.
310
self.cli_params['storage_class'] = 'STANDARD_IA'
311
self.transfer_request_submitter.submit(fileinfo)
312
313
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
314
self.assertEqual(
315
upload_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})
316
317
def test_submit_when_content_type_specified(self):
318
fileinfo = FileInfo(
319
src=self.filename, dest=self.bucket+'/'+self.key)
320
self.cli_params['content_type'] = 'text/plain'
321
self.transfer_request_submitter.submit(fileinfo)
322
323
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
324
self.assertEqual(
325
upload_call_kwargs['extra_args'], {'ContentType': 'text/plain'})
326
ref_subscribers = [
327
ProvideSizeSubscriber,
328
UploadResultSubscriber
329
]
330
actual_subscribers = upload_call_kwargs['subscribers']
331
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
332
for i, actual_subscriber in enumerate(actual_subscribers):
333
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
334
335
def test_submit_when_no_guess_content_mime_type(self):
336
fileinfo = FileInfo(
337
src=self.filename, dest=self.bucket+'/'+self.key)
338
self.cli_params['guess_mime_type'] = False
339
self.transfer_request_submitter.submit(fileinfo)
340
341
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
342
ref_subscribers = [
343
ProvideSizeSubscriber,
344
UploadResultSubscriber
345
]
346
actual_subscribers = upload_call_kwargs['subscribers']
347
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
348
for i, actual_subscriber in enumerate(actual_subscribers):
349
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
350
351
def test_warn_on_too_large_transfer(self):
352
fileinfo = FileInfo(
353
src=self.filename, dest=self.bucket+'/'+self.key,
354
size=MAX_UPLOAD_SIZE+1)
355
future = self.transfer_request_submitter.submit(fileinfo)
356
357
# A warning should have been submitted because it is too large.
358
warning_result = self.result_queue.get()
359
self.assertIsInstance(warning_result, WarningResult)
360
self.assertIn('exceeds s3 upload limit', warning_result.message)
361
362
# Make sure that the transfer was still attempted
363
self.assertIs(self.transfer_manager.upload.return_value, future)
364
self.assertEqual(len(self.transfer_manager.upload.call_args_list), 1)
365
366
def test_dry_run(self):
367
self.cli_params['dryrun'] = True
368
self.transfer_request_submitter = UploadRequestSubmitter(
369
self.transfer_manager, self.result_queue, self.cli_params)
370
fileinfo = FileInfo(
371
src=self.filename, src_type='local', operation_name='upload',
372
dest=self.bucket + '/' + self.key, dest_type='s3')
373
self.transfer_request_submitter.submit(fileinfo)
374
375
result = self.result_queue.get()
376
self.assertIsInstance(result, DryRunResult)
377
self.assertEqual(result.transfer_type, 'upload')
378
self.assertTrue(result.src.endswith(self.filename))
379
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
380
381
def test_submit_move_adds_delete_source_subscriber(self):
382
fileinfo = FileInfo(
383
src=self.filename, dest=self.bucket+'/'+self.key)
384
self.cli_params['guess_mime_type'] = True # Default settings
385
self.cli_params['is_move'] = True
386
self.transfer_request_submitter.submit(fileinfo)
387
ref_subscribers = [
388
ProvideSizeSubscriber,
389
ProvideUploadContentTypeSubscriber,
390
DeleteSourceFileSubscriber,
391
UploadResultSubscriber,
392
]
393
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
394
actual_subscribers = upload_call_kwargs['subscribers']
395
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
396
for i, actual_subscriber in enumerate(actual_subscribers):
397
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
398
399
400
class TestDownloadRequestSubmitter(BaseTransferRequestSubmitterTest):
401
def setUp(self):
402
super(TestDownloadRequestSubmitter, self).setUp()
403
self.transfer_request_submitter = DownloadRequestSubmitter(
404
self.transfer_manager, self.result_queue, self.cli_params)
405
406
def assert_no_downloads_happened(self):
407
self.assertEqual(len(self.transfer_manager.download.call_args_list), 0)
408
409
def create_file_info(self, key, associated_response_data=None):
410
kwargs = {
411
'src': self.bucket + '/' + key,
412
'src_type': 's3',
413
'dest': self.filename,
414
'dest_type': 'local',
415
'operation_name': 'download',
416
'compare_key': key,
417
}
418
if associated_response_data is not None:
419
kwargs['associated_response_data'] = associated_response_data
420
return FileInfo(**kwargs)
421
422
def test_can_submit(self):
423
fileinfo = FileInfo(
424
src=self.bucket+'/'+self.key, dest=self.filename,
425
operation_name='download')
426
self.assertTrue(
427
self.transfer_request_submitter.can_submit(fileinfo))
428
fileinfo.operation_name = 'foo'
429
self.assertFalse(
430
self.transfer_request_submitter.can_submit(fileinfo))
431
432
def test_submit(self):
433
fileinfo = self.create_file_info(self.key)
434
future = self.transfer_request_submitter.submit(fileinfo)
435
436
self.assertIs(self.transfer_manager.download.return_value, future)
437
download_call_kwargs = self.transfer_manager.download.call_args[1]
438
self.assertEqual(download_call_kwargs['fileobj'], self.filename)
439
self.assertEqual(download_call_kwargs['bucket'], self.bucket)
440
self.assertEqual(download_call_kwargs['key'], self.key)
441
self.assertEqual(download_call_kwargs['extra_args'], {})
442
443
# Make sure the subscriber applied are of the correct type and order
444
ref_subscribers = [
445
ProvideSizeSubscriber,
446
ProvideETagSubscriber,
447
DirectoryCreatorSubscriber,
448
ProvideLastModifiedTimeSubscriber,
449
DownloadResultSubscriber
450
]
451
actual_subscribers = download_call_kwargs['subscribers']
452
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
453
for i, actual_subscriber in enumerate(actual_subscribers):
454
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
455
456
def test_submit_with_extra_args(self):
457
fileinfo = self.create_file_info(self.key)
458
self.cli_params['sse_c'] = 'AES256'
459
self.cli_params['sse_c_key'] = 'mykey'
460
self.transfer_request_submitter.submit(fileinfo)
461
462
# Set some extra argument like sse_c to make sure cli
463
# params get mapped to request parameters.
464
download_call_kwargs = self.transfer_manager.download.call_args[1]
465
self.assertEqual(
466
download_call_kwargs['extra_args'],
467
{'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': 'mykey'}
468
)
469
470
def test_warn_glacier_for_incompatible(self):
471
fileinfo = FileInfo(
472
src=self.bucket+'/'+self.key, dest=self.filename,
473
operation_name='download',
474
associated_response_data={
475
'StorageClass': 'GLACIER',
476
}
477
)
478
future = self.transfer_request_submitter.submit(fileinfo)
479
480
# A warning should have been submitted because it is a non-restored
481
# glacier object.
482
warning_result = self.result_queue.get()
483
self.assertIsInstance(warning_result, WarningResult)
484
self.assertIn(
485
'Unable to perform download operations on GLACIER objects',
486
warning_result.message)
487
488
# The transfer should have been skipped.
489
self.assertIsNone(future)
490
self.assert_no_downloads_happened()
491
492
def test_not_warn_glacier_for_compatible(self):
493
fileinfo = self.create_file_info(
494
self.key, associated_response_data={
495
'StorageClass': 'GLACIER',
496
'Restore': 'ongoing-request="false"'
497
}
498
)
499
future = self.transfer_request_submitter.submit(fileinfo)
500
501
# A warning should have not been submitted because it is a restored
502
# glacier object.
503
self.assertTrue(self.result_queue.empty())
504
505
# And the transfer should not have been skipped.
506
self.assertIs(self.transfer_manager.download.return_value, future)
507
self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)
508
509
def test_warn_glacier_force_glacier(self):
510
self.cli_params['force_glacier_transfer'] = True
511
fileinfo = self.create_file_info(
512
self.key,
513
associated_response_data={
514
'StorageClass': 'GLACIER',
515
}
516
)
517
future = self.transfer_request_submitter.submit(fileinfo)
518
519
# A warning should have not been submitted because it is glacier
520
# transfers were forced.
521
self.assertTrue(self.result_queue.empty())
522
self.assertIs(self.transfer_manager.download.return_value, future)
523
self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)
524
525
def test_warn_glacier_ignore_glacier_warnings(self):
526
self.cli_params['ignore_glacier_warnings'] = True
527
fileinfo = FileInfo(
528
src=self.bucket+'/'+self.key, dest=self.filename,
529
operation_name='download',
530
associated_response_data={
531
'StorageClass': 'GLACIER',
532
}
533
)
534
future = self.transfer_request_submitter.submit(fileinfo)
535
536
# A warning should have not been submitted because it was specified
537
# to ignore glacier warnings.
538
self.assertTrue(self.result_queue.empty())
539
# But the transfer still should have been skipped.
540
self.assertIsNone(future)
541
self.assert_no_downloads_happened()
542
543
def test_warn_and_ignore_on_parent_dir_reference(self):
544
fileinfo = self.create_file_info('../foo.txt')
545
future = self.transfer_request_submitter.submit(fileinfo)
546
warning_result = self.result_queue.get()
547
self.assertIsInstance(warning_result, WarningResult)
548
self.assert_no_downloads_happened()
549
550
def test_warn_and_ignore_with_leading_chars(self):
551
fileinfo = self.create_file_info('a/../../foo.txt')
552
future = self.transfer_request_submitter.submit(fileinfo)
553
warning_result = self.result_queue.get()
554
self.assertIsInstance(warning_result, WarningResult)
555
self.assert_no_downloads_happened()
556
557
def test_allow_double_dots_that_dont_escape_cwd(self):
558
self.cli_params['dryrun'] = True
559
# This is fine because it's 'foo.txt'.
560
fileinfo = self.create_file_info('a/../foo.txt')
561
future = self.transfer_request_submitter.submit(fileinfo)
562
self.assertIsInstance(self.result_queue.get(), DryRunResult)
563
564
def test_dry_run(self):
565
self.cli_params['dryrun'] = True
566
self.transfer_request_submitter = DownloadRequestSubmitter(
567
self.transfer_manager, self.result_queue, self.cli_params)
568
fileinfo = self.create_file_info(self.key)
569
self.transfer_request_submitter.submit(fileinfo)
570
571
result = self.result_queue.get()
572
self.assertIsInstance(result, DryRunResult)
573
self.assertEqual(result.transfer_type, 'download')
574
self.assertTrue(result.dest.endswith(self.filename))
575
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
576
577
def test_submit_move_adds_delete_source_subscriber(self):
578
fileinfo = self.create_file_info(self.key)
579
self.cli_params['guess_mime_type'] = True # Default settings
580
self.cli_params['is_move'] = True
581
self.transfer_request_submitter.submit(fileinfo)
582
ref_subscribers = [
583
ProvideSizeSubscriber,
584
ProvideETagSubscriber,
585
DirectoryCreatorSubscriber,
586
ProvideLastModifiedTimeSubscriber,
587
DeleteSourceObjectSubscriber,
588
DownloadResultSubscriber,
589
]
590
download_call_kwargs = self.transfer_manager.download.call_args[1]
591
actual_subscribers = download_call_kwargs['subscribers']
592
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
593
for i, actual_subscriber in enumerate(actual_subscribers):
594
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
595
596
597
class TestCopyRequestSubmitter(BaseTransferRequestSubmitterTest):
598
def setUp(self):
599
super(TestCopyRequestSubmitter, self).setUp()
600
self.source_bucket = 'mysourcebucket'
601
self.source_key = 'mysourcekey'
602
self.transfer_request_submitter = CopyRequestSubmitter(
603
self.transfer_manager, self.result_queue, self.cli_params)
604
605
def test_can_submit(self):
606
fileinfo = FileInfo(
607
src=self.source_bucket+'/'+self.source_key,
608
dest=self.bucket+'/'+self.key, operation_name='copy')
609
self.assertTrue(
610
self.transfer_request_submitter.can_submit(fileinfo))
611
fileinfo.operation_name = 'foo'
612
self.assertFalse(
613
self.transfer_request_submitter.can_submit(fileinfo))
614
615
def test_submit(self):
616
fileinfo = FileInfo(
617
src=self.source_bucket+'/'+self.source_key,
618
dest=self.bucket+'/'+self.key)
619
self.cli_params['guess_mime_type'] = True # Default settings
620
future = self.transfer_request_submitter.submit(fileinfo)
621
self.assertIs(self.transfer_manager.copy.return_value, future)
622
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
623
self.assertEqual(
624
copy_call_kwargs['copy_source'],
625
{'Bucket': self.source_bucket, 'Key': self.source_key})
626
self.assertEqual(copy_call_kwargs['bucket'], self.bucket)
627
self.assertEqual(copy_call_kwargs['key'], self.key)
628
self.assertEqual(copy_call_kwargs['extra_args'], {})
629
630
# Make sure the subscriber applied are of the correct type and order
631
ref_subscribers = [
632
ProvideSizeSubscriber,
633
ProvideCopyContentTypeSubscriber,
634
CopyResultSubscriber
635
]
636
actual_subscribers = copy_call_kwargs['subscribers']
637
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
638
for i, actual_subscriber in enumerate(actual_subscribers):
639
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
640
641
def test_submit_with_extra_args(self):
642
fileinfo = FileInfo(
643
src=self.source_bucket+'/'+self.source_key,
644
dest=self.bucket+'/'+self.key)
645
# Set some extra argument like storage_class to make sure cli
646
# params get mapped to request parameters.
647
self.cli_params['storage_class'] = 'STANDARD_IA'
648
self.transfer_request_submitter.submit(fileinfo)
649
650
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
651
self.assertEqual(
652
copy_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})
653
654
def test_submit_when_content_type_specified(self):
655
fileinfo = FileInfo(
656
src=self.source_bucket+'/'+self.source_key,
657
dest=self.bucket+'/'+self.key)
658
self.cli_params['content_type'] = 'text/plain'
659
self.transfer_request_submitter.submit(fileinfo)
660
661
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
662
self.assertEqual(
663
copy_call_kwargs['extra_args'], {'ContentType': 'text/plain'})
664
ref_subscribers = [
665
ProvideSizeSubscriber,
666
CopyResultSubscriber
667
]
668
actual_subscribers = copy_call_kwargs['subscribers']
669
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
670
for i, actual_subscriber in enumerate(actual_subscribers):
671
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
672
673
def test_submit_when_no_guess_content_mime_type(self):
674
fileinfo = FileInfo(
675
src=self.source_bucket+'/'+self.source_key,
676
dest=self.bucket+'/'+self.key)
677
self.cli_params['guess_mime_type'] = False
678
self.transfer_request_submitter.submit(fileinfo)
679
680
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
681
ref_subscribers = [
682
ProvideSizeSubscriber,
683
CopyResultSubscriber
684
]
685
actual_subscribers = copy_call_kwargs['subscribers']
686
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
687
for i, actual_subscriber in enumerate(actual_subscribers):
688
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
689
690
def test_warn_glacier_for_incompatible(self):
691
fileinfo = FileInfo(
692
src=self.source_bucket+'/'+self.source_key,
693
dest=self.bucket+'/'+self.key,
694
operation_name='copy',
695
associated_response_data={
696
'StorageClass': 'GLACIER',
697
}
698
)
699
future = self.transfer_request_submitter.submit(fileinfo)
700
701
# A warning should have been submitted because it is a non-restored
702
# glacier object.
703
warning_result = self.result_queue.get()
704
self.assertIsInstance(warning_result, WarningResult)
705
self.assertIn(
706
'Unable to perform copy operations on GLACIER objects',
707
warning_result.message)
708
709
# The transfer request should have never been sent therefore return
710
# no future.
711
self.assertIsNone(future)
712
# The transfer should have been skipped.
713
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)
714
715
def test_not_warn_glacier_for_compatible(self):
716
fileinfo = FileInfo(
717
src=self.source_bucket+'/'+self.source_key,
718
dest=self.bucket+'/'+self.key,
719
operation_name='copy',
720
associated_response_data={
721
'StorageClass': 'GLACIER',
722
'Restore': 'ongoing-request="false"'
723
}
724
)
725
future = self.transfer_request_submitter.submit(fileinfo)
726
self.assertIs(self.transfer_manager.copy.return_value, future)
727
728
# A warning should have not been submitted because it is a restored
729
# glacier object.
730
self.assertTrue(self.result_queue.empty())
731
732
# And the transfer should not have been skipped.
733
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)
734
735
def test_warn_glacier_force_glacier(self):
736
self.cli_params['force_glacier_transfer'] = True
737
fileinfo = FileInfo(
738
src=self.source_bucket+'/'+self.source_key,
739
dest=self.bucket+'/'+self.key,
740
operation_name='copy',
741
associated_response_data={
742
'StorageClass': 'GLACIER',
743
}
744
)
745
future = self.transfer_request_submitter.submit(fileinfo)
746
self.assertIs(self.transfer_manager.copy.return_value, future)
747
748
# A warning should have not been submitted because it is glacier
749
# transfers were forced.
750
self.assertTrue(self.result_queue.empty())
751
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)
752
753
def test_warn_glacier_ignore_glacier_warnings(self):
754
self.cli_params['ignore_glacier_warnings'] = True
755
fileinfo = FileInfo(
756
src=self.source_bucket+'/'+self.source_key,
757
dest=self.bucket+'/'+self.key,
758
operation_name='copy',
759
associated_response_data={
760
'StorageClass': 'GLACIER',
761
}
762
)
763
future = self.transfer_request_submitter.submit(fileinfo)
764
765
# The transfer request should have never been sent therefore return
766
# no future.
767
self.assertIsNone(future)
768
# A warning should have not been submitted because it was specified
769
# to ignore glacier warnings.
770
self.assertTrue(self.result_queue.empty())
771
# But the transfer still should have been skipped.
772
self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)
773
774
def test_dry_run(self):
775
self.cli_params['dryrun'] = True
776
self.transfer_request_submitter = CopyRequestSubmitter(
777
self.transfer_manager, self.result_queue, self.cli_params)
778
fileinfo = FileInfo(
779
src=self.source_bucket + '/' + self.source_key, src_type='s3',
780
dest=self.bucket + '/' + self.key, dest_type='s3',
781
operation_name='copy')
782
self.transfer_request_submitter.submit(fileinfo)
783
784
result = self.result_queue.get()
785
self.assertIsInstance(result, DryRunResult)
786
self.assertEqual(result.transfer_type, 'copy')
787
source = 's3://' + self.source_bucket + '/' + self.source_key
788
self.assertEqual(result.src, source)
789
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
790
791
def test_submit_move_adds_delete_source_subscriber(self):
792
fileinfo = FileInfo(
793
dest=self.source_bucket + '/' + self.source_key,
794
src=self.bucket + '/' + self.key)
795
self.cli_params['guess_mime_type'] = True # Default settings
796
self.cli_params['is_move'] = True
797
self.transfer_request_submitter.submit(fileinfo)
798
ref_subscribers = [
799
ProvideSizeSubscriber,
800
ProvideCopyContentTypeSubscriber,
801
DeleteSourceObjectSubscriber,
802
CopyResultSubscriber,
803
]
804
copy_call_kwargs = self.transfer_manager.copy.call_args[1]
805
actual_subscribers = copy_call_kwargs['subscribers']
806
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
807
for i, actual_subscriber in enumerate(actual_subscribers):
808
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
809
810
811
class TestUploadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):
812
def setUp(self):
813
super(TestUploadStreamRequestSubmitter, self).setUp()
814
self.filename = '-'
815
self.cli_params['is_stream'] = True
816
self.transfer_request_submitter = UploadStreamRequestSubmitter(
817
self.transfer_manager, self.result_queue, self.cli_params)
818
819
def test_can_submit(self):
820
fileinfo = FileInfo(
821
src=self.filename, dest=self.bucket+'/'+self.key,
822
operation_name='upload')
823
self.assertTrue(
824
self.transfer_request_submitter.can_submit(fileinfo))
825
self.cli_params['is_stream'] = False
826
self.assertFalse(
827
self.transfer_request_submitter.can_submit(fileinfo))
828
829
def test_submit(self):
830
fileinfo = FileInfo(
831
src=self.filename, dest=self.bucket+'/'+self.key)
832
future = self.transfer_request_submitter.submit(fileinfo)
833
self.assertIs(self.transfer_manager.upload.return_value, future)
834
835
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
836
self.assertIsInstance(
837
upload_call_kwargs['fileobj'], NonSeekableStream)
838
self.assertEqual(upload_call_kwargs['bucket'], self.bucket)
839
self.assertEqual(upload_call_kwargs['key'], self.key)
840
self.assertEqual(upload_call_kwargs['extra_args'], {})
841
842
ref_subscribers = [
843
UploadStreamResultSubscriber
844
]
845
actual_subscribers = upload_call_kwargs['subscribers']
846
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
847
for i, actual_subscriber in enumerate(actual_subscribers):
848
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
849
850
def test_submit_with_expected_size_provided(self):
851
provided_size = 100
852
self.cli_params['expected_size'] = provided_size
853
fileinfo = FileInfo(
854
src=self.filename, dest=self.bucket+'/'+self.key)
855
self.transfer_request_submitter.submit(fileinfo)
856
upload_call_kwargs = self.transfer_manager.upload.call_args[1]
857
858
ref_subscribers = [
859
ProvideSizeSubscriber,
860
UploadStreamResultSubscriber
861
]
862
actual_subscribers = upload_call_kwargs['subscribers']
863
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
864
for i, actual_subscriber in enumerate(actual_subscribers):
865
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
866
# The ProvideSizeSubscriber should be providing the correct size
867
self.assertEqual(actual_subscribers[0].size, provided_size)
868
869
def test_dry_run(self):
870
self.cli_params['dryrun'] = True
871
self.transfer_request_submitter = UploadStreamRequestSubmitter(
872
self.transfer_manager, self.result_queue, self.cli_params)
873
fileinfo = FileInfo(
874
src=self.filename, src_type='local', operation_name='upload',
875
dest=self.bucket + '/' + self.key, dest_type='s3')
876
self.transfer_request_submitter.submit(fileinfo)
877
878
result = self.result_queue.get()
879
self.assertIsInstance(result, DryRunResult)
880
self.assertEqual(result.transfer_type, 'upload')
881
self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)
882
self.assertEqual(result.src, '-')
883
884
885
class TestDownloadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):
886
def setUp(self):
887
super(TestDownloadStreamRequestSubmitter, self).setUp()
888
self.filename = '-'
889
self.cli_params['is_stream'] = True
890
self.transfer_request_submitter = DownloadStreamRequestSubmitter(
891
self.transfer_manager, self.result_queue, self.cli_params)
892
893
def test_can_submit(self):
894
fileinfo = FileInfo(
895
src=self.bucket+'/'+self.key, dest=self.filename,
896
operation_name='download')
897
self.assertTrue(
898
self.transfer_request_submitter.can_submit(fileinfo))
899
self.cli_params['is_stream'] = False
900
self.assertFalse(
901
self.transfer_request_submitter.can_submit(fileinfo))
902
903
def test_submit(self):
904
fileinfo = FileInfo(
905
src=self.bucket+'/'+self.key, dest=self.filename,
906
compare_key=self.key)
907
future = self.transfer_request_submitter.submit(fileinfo)
908
self.assertIs(self.transfer_manager.download.return_value, future)
909
910
download_call_kwargs = self.transfer_manager.download.call_args[1]
911
self.assertIsInstance(
912
download_call_kwargs['fileobj'], StdoutBytesWriter)
913
self.assertEqual(download_call_kwargs['bucket'], self.bucket)
914
self.assertEqual(download_call_kwargs['key'], self.key)
915
self.assertEqual(download_call_kwargs['extra_args'], {})
916
917
ref_subscribers = [
918
DownloadStreamResultSubscriber
919
]
920
actual_subscribers = download_call_kwargs['subscribers']
921
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
922
for i, actual_subscriber in enumerate(actual_subscribers):
923
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
924
925
def test_dry_run(self):
926
self.cli_params['dryrun'] = True
927
self.transfer_request_submitter = DownloadStreamRequestSubmitter(
928
self.transfer_manager, self.result_queue, self.cli_params)
929
fileinfo = FileInfo(
930
dest=self.filename, dest_type='local', operation_name='download',
931
src=self.bucket + '/' + self.key, src_type='s3',
932
compare_key=self.key)
933
self.transfer_request_submitter.submit(fileinfo)
934
935
result = self.result_queue.get()
936
self.assertIsInstance(result, DryRunResult)
937
self.assertEqual(result.transfer_type, 'download')
938
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
939
self.assertEqual(result.dest, '-')
940
941
942
class TestDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):
943
def setUp(self):
944
super(TestDeleteRequestSubmitter, self).setUp()
945
self.transfer_request_submitter = DeleteRequestSubmitter(
946
self.transfer_manager, self.result_queue, self.cli_params)
947
948
def test_can_submit(self):
949
fileinfo = FileInfo(
950
src=self.bucket+'/'+self.key, dest=None, operation_name='delete',
951
src_type='s3')
952
self.assertTrue(
953
self.transfer_request_submitter.can_submit(fileinfo))
954
fileinfo.operation_name = 'foo'
955
self.assertFalse(
956
self.transfer_request_submitter.can_submit(fileinfo))
957
958
def test_cannot_submit_local_deletes(self):
959
fileinfo = FileInfo(
960
src=self.bucket+'/'+self.key, dest=None, operation_name='delete',
961
src_type='local')
962
self.assertFalse(
963
self.transfer_request_submitter.can_submit(fileinfo))
964
965
def test_submit(self):
966
fileinfo = FileInfo(
967
src=self.bucket+'/'+self.key, dest=None, operation_name='delete')
968
future = self.transfer_request_submitter.submit(fileinfo)
969
self.assertIs(self.transfer_manager.delete.return_value, future)
970
971
delete_call_kwargs = self.transfer_manager.delete.call_args[1]
972
self.assertEqual(delete_call_kwargs['bucket'], self.bucket)
973
self.assertEqual(delete_call_kwargs['key'], self.key)
974
self.assertEqual(delete_call_kwargs['extra_args'], {})
975
976
ref_subscribers = [
977
DeleteResultSubscriber
978
]
979
actual_subscribers = delete_call_kwargs['subscribers']
980
self.assertEqual(len(ref_subscribers), len(actual_subscribers))
981
for i, actual_subscriber in enumerate(actual_subscribers):
982
self.assertIsInstance(actual_subscriber, ref_subscribers[i])
983
984
def test_dry_run(self):
985
self.cli_params['dryrun'] = True
986
self.transfer_request_submitter = DeleteRequestSubmitter(
987
self.transfer_manager, self.result_queue, self.cli_params)
988
fileinfo = FileInfo(
989
src=self.bucket + '/' + self.key, src_type='s3',
990
dest=self.bucket + '/' + self.key, dest_type='s3',
991
operation_name='delete')
992
self.transfer_request_submitter.submit(fileinfo)
993
994
result = self.result_queue.get()
995
self.assertIsInstance(result, DryRunResult)
996
self.assertEqual(result.transfer_type, 'delete')
997
self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)
998
self.assertIsNone(result.dest)
999
1000
1001
class TestLocalDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):
1002
def setUp(self):
1003
super(TestLocalDeleteRequestSubmitter, self).setUp()
1004
self.transfer_request_submitter = LocalDeleteRequestSubmitter(
1005
self.transfer_manager, self.result_queue, self.cli_params)
1006
self.file_creator = FileCreator()
1007
1008
def tearDown(self):
1009
super(TestLocalDeleteRequestSubmitter, self).tearDown()
1010
self.file_creator.remove_all()
1011
1012
def test_can_submit(self):
1013
fileinfo = FileInfo(
1014
src=self.filename, dest=None, operation_name='delete',
1015
src_type='local')
1016
self.assertTrue(
1017
self.transfer_request_submitter.can_submit(fileinfo))
1018
fileinfo.operation_name = 'foo'
1019
self.assertFalse(
1020
self.transfer_request_submitter.can_submit(fileinfo))
1021
1022
def test_cannot_submit_remote_deletes(self):
1023
fileinfo = FileInfo(
1024
src=self.filename, dest=None, operation_name='delete',
1025
src_type='s3')
1026
self.assertFalse(
1027
self.transfer_request_submitter.can_submit(fileinfo))
1028
1029
def test_submit(self):
1030
full_filename = self.file_creator.create_file(self.filename, 'content')
1031
fileinfo = FileInfo(
1032
src=full_filename, dest=None, operation_name='delete',
1033
src_type='local')
1034
rval = self.transfer_request_submitter.submit(fileinfo)
1035
self.assertTrue(rval)
1036
1037
queued_result = self.result_queue.get()
1038
self.assertIsInstance(queued_result, QueuedResult)
1039
self.assertEqual(queued_result.transfer_type, 'delete')
1040
self.assertTrue(queued_result.src.endswith(self.filename))
1041
self.assertIsNone(queued_result.dest)
1042
self.assertEqual(queued_result.total_transfer_size, 0)
1043
1044
failure_result = self.result_queue.get()
1045
self.assertIsInstance(failure_result, SuccessResult)
1046
self.assertEqual(failure_result.transfer_type, 'delete')
1047
self.assertTrue(failure_result.src.endswith(self.filename))
1048
self.assertIsNone(failure_result.dest)
1049
1050
self.assertFalse(os.path.exists(full_filename))
1051
1052
def test_submit_with_exception(self):
1053
fileinfo = FileInfo(
1054
src=self.filename, dest=None, operation_name='delete',
1055
src_type='local')
1056
# The file was never created so it should trigger an exception
1057
# when it is attempted to be deleted in the submitter.
1058
rval = self.transfer_request_submitter.submit(fileinfo)
1059
self.assertTrue(rval)
1060
1061
queued_result = self.result_queue.get()
1062
self.assertIsInstance(queued_result, QueuedResult)
1063
self.assertEqual(queued_result.transfer_type, 'delete')
1064
self.assertTrue(queued_result.src.endswith(self.filename))
1065
self.assertIsNone(queued_result.dest)
1066
self.assertEqual(queued_result.total_transfer_size, 0)
1067
1068
failure_result = self.result_queue.get()
1069
self.assertIsInstance(failure_result, FailureResult)
1070
self.assertEqual(failure_result.transfer_type, 'delete')
1071
self.assertTrue(failure_result.src.endswith(self.filename))
1072
self.assertIsNone(failure_result.dest)
1073
1074
def test_dry_run(self):
1075
self.cli_params['dryrun'] = True
1076
fileinfo = FileInfo(
1077
src=self.filename, src_type='local',
1078
dest=self.filename, dest_type='local',
1079
operation_name='delete')
1080
self.transfer_request_submitter.submit(fileinfo)
1081
1082
result = self.result_queue.get()
1083
self.assertIsInstance(result, DryRunResult)
1084
self.assertEqual(result.transfer_type, 'delete')
1085
self.assertTrue(result.src.endswith(self.filename))
1086
self.assertIsNone(result.dest)
1087
1088