Path: blob/develop/tests/unit/customizations/s3/test_s3handler.py
1569 views
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.12import os1314from s3transfer.manager import TransferManager1516from awscli.testutils import mock17from awscli.testutils import unittest18from awscli.testutils import FileCreator19from awscli.compat import queue20from awscli.customizations.s3.s3handler import S3TransferHandler21from awscli.customizations.s3.s3handler import S3TransferHandlerFactory22from awscli.customizations.s3.s3handler import UploadRequestSubmitter23from awscli.customizations.s3.s3handler import DownloadRequestSubmitter24from awscli.customizations.s3.s3handler import CopyRequestSubmitter25from awscli.customizations.s3.s3handler import UploadStreamRequestSubmitter26from awscli.customizations.s3.s3handler import DownloadStreamRequestSubmitter27from awscli.customizations.s3.s3handler import DeleteRequestSubmitter28from awscli.customizations.s3.s3handler import LocalDeleteRequestSubmitter29from awscli.customizations.s3.fileinfo import FileInfo30from awscli.customizations.s3.results import QueuedResult31from awscli.customizations.s3.results import SuccessResult32from awscli.customizations.s3.results import FailureResult33from awscli.customizations.s3.results import UploadResultSubscriber34from awscli.customizations.s3.results import DownloadResultSubscriber35from awscli.customizations.s3.results import CopyResultSubscriber36from awscli.customizations.s3.results import UploadStreamResultSubscriber37from awscli.customizations.s3.results import DownloadStreamResultSubscriber38from awscli.customizations.s3.results import DeleteResultSubscriber39from awscli.customizations.s3.results import ResultRecorder40from awscli.customizations.s3.results import ResultProcessor41from awscli.customizations.s3.results import CommandResultRecorder42from awscli.customizations.s3.results import DryRunResult43from awscli.customizations.s3.utils import MAX_UPLOAD_SIZE44from awscli.customizations.s3.utils import NonSeekableStream45from awscli.customizations.s3.utils import StdoutBytesWriter46from awscli.customizations.s3.utils import WarningResult47from awscli.customizations.s3.utils import ProvideSizeSubscriber48from awscli.customizations.s3.utils import ProvideETagSubscriber49from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber50from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber51from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber52from awscli.customizations.s3.utils import DirectoryCreatorSubscriber53from awscli.customizations.s3.utils import DeleteSourceFileSubscriber54from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber55from awscli.customizations.s3.transferconfig import RuntimeConfig565758def runtime_config(**kwargs):59return RuntimeConfig().build_config(**kwargs)606162class TestS3TransferHandlerFactory(unittest.TestCase):63def setUp(self):64self.cli_params = {}65self.runtime_config = runtime_config()66self.client = mock.Mock()67self.result_queue = queue.Queue()6869def test_call(self):70factory = S3TransferHandlerFactory(71self.cli_params, self.runtime_config)72self.assertIsInstance(73factory(self.client, self.result_queue), S3TransferHandler)747576class TestS3TransferHandler(unittest.TestCase):77def setUp(self):78self.result_queue = queue.Queue()79self.result_recorder = ResultRecorder()80self.processed_results = []81self.result_processor = ResultProcessor(82self.result_queue,83[self.result_recorder, self.processed_results.append]84)85self.command_result_recorder = CommandResultRecorder(86self.result_queue, self.result_recorder, self.result_processor)8788self.transfer_manager = mock.Mock(spec=TransferManager)89self.transfer_manager.__enter__ = mock.Mock()90self.transfer_manager.__exit__ = mock.Mock()91self.parameters = {}92self.s3_transfer_handler = S3TransferHandler(93self.transfer_manager, self.parameters,94self.command_result_recorder95)9697def test_call_return_command_result(self):98num_failures = 599num_warnings = 3100self.result_recorder.files_failed = num_failures101self.result_recorder.files_warned = num_warnings102command_result = self.s3_transfer_handler.call([])103self.assertEqual(command_result, (num_failures, num_warnings))104105def test_enqueue_uploads(self):106fileinfos = []107num_transfers = 5108for _ in range(num_transfers):109fileinfos.append(110FileInfo(src='filename', dest='bucket/key',111operation_name='upload'))112113self.s3_transfer_handler.call(fileinfos)114self.assertEqual(115self.transfer_manager.upload.call_count, num_transfers)116117def test_enqueue_downloads(self):118fileinfos = []119num_transfers = 5120for _ in range(num_transfers):121fileinfos.append(122FileInfo(src='bucket/key', dest='filename',123compare_key='key',124operation_name='download'))125126self.s3_transfer_handler.call(fileinfos)127self.assertEqual(128self.transfer_manager.download.call_count, num_transfers)129130def test_enqueue_copies(self):131fileinfos = []132num_transfers = 5133for _ in range(num_transfers):134fileinfos.append(135FileInfo(src='sourcebucket/sourcekey', dest='bucket/key',136compare_key='key',137operation_name='copy'))138139self.s3_transfer_handler.call(fileinfos)140self.assertEqual(141self.transfer_manager.copy.call_count, num_transfers)142143def test_exception_when_enqueuing(self):144fileinfos = [145FileInfo(src='filename', dest='bucket/key',146operation_name='upload')147]148self.transfer_manager.__exit__.side_effect = Exception(149'some exception')150command_result = self.s3_transfer_handler.call(fileinfos)151# Exception should have been raised casing the command result to152# have failed results of one.153self.assertEqual(command_result, (1, 0))154155def test_enqueue_upload_stream(self):156self.parameters['is_stream'] = True157self.s3_transfer_handler.call(158[FileInfo(src='-', dest='bucket/key', operation_name='upload')])159self.assertEqual(160self.transfer_manager.upload.call_count, 1)161upload_call_kwargs = self.transfer_manager.upload.call_args[1]162self.assertIsInstance(163upload_call_kwargs['fileobj'], NonSeekableStream)164165def test_enqueue_dowload_stream(self):166self.parameters['is_stream'] = True167self.s3_transfer_handler.call(168[FileInfo(src='bucket/key', dest='-',169compare_key='key',170operation_name='download')])171self.assertEqual(172self.transfer_manager.download.call_count, 1)173download_call_kwargs = self.transfer_manager.download.call_args[1]174self.assertIsInstance(175download_call_kwargs['fileobj'], StdoutBytesWriter)176177def test_enqueue_deletes(self):178fileinfos = []179num_transfers = 5180for _ in range(num_transfers):181fileinfos.append(182FileInfo(src='bucket/key', dest=None, operation_name='delete',183src_type='s3'))184185self.s3_transfer_handler.call(fileinfos)186self.assertEqual(187self.transfer_manager.delete.call_count, num_transfers)188189def test_enqueue_local_deletes(self):190fileinfos = []191num_transfers = 5192for _ in range(num_transfers):193fileinfos.append(194FileInfo(src='myfile', dest=None, operation_name='delete',195src_type='local'))196197self.s3_transfer_handler.call(fileinfos)198# The number of processed results will be equal to:199# number_of_local_deletes * 2 + 1200# The 2 represents the QueuedResult and SuccessResult/FailureResult201# for each transfer202# The 1 represents the TotalFinalSubmissionResult203self.assertEqual(len(self.processed_results), 11)204205# Make sure that the results are as expected by checking just one206# of them207first_submitted_result = self.processed_results[0]208self.assertEqual(first_submitted_result.transfer_type, 'delete')209self.assertTrue(first_submitted_result.src.endswith('myfile'))210211# Also make sure that transfer manager's delete() was never called212self.assertEqual(self.transfer_manager.delete.call_count, 0)213214def test_notifies_total_submissions(self):215fileinfos = []216num_transfers = 5217for _ in range(num_transfers):218fileinfos.append(219FileInfo(src='bucket/key', dest='filename',220compare_key='key',221operation_name='download'))222223self.s3_transfer_handler.call(fileinfos)224self.assertEqual(225self.result_recorder.final_expected_files_transferred,226num_transfers227)228229def test_notifies_total_submissions_accounts_for_skips(self):230fileinfos = []231num_transfers = 5232for _ in range(num_transfers):233fileinfos.append(234FileInfo(src='bucket/key', dest='filename',235compare_key='key',236operation_name='download'))237238# Add a fileinfo that should get skipped. To skip, we do a glacier239# download.240fileinfos.append(FileInfo(241src='bucket/key', dest='filename', operation_name='download',242compare_key='key',243associated_response_data={'StorageClass': 'GLACIER'}))244self.s3_transfer_handler.call(fileinfos)245# Since the last glacier download was skipped the final expected246# total should be equal to the number of transfers provided in the247# for loop.248self.assertEqual(249self.result_recorder.final_expected_files_transferred,250num_transfers251)252253254class BaseTransferRequestSubmitterTest(unittest.TestCase):255def setUp(self):256self.transfer_manager = mock.Mock(spec=TransferManager)257self.result_queue = queue.Queue()258self.cli_params = {}259self.filename = 'myfile'260self.bucket = 'mybucket'261self.key = 'mykey'262263264class TestUploadRequestSubmitter(BaseTransferRequestSubmitterTest):265def setUp(self):266super(TestUploadRequestSubmitter, self).setUp()267self.transfer_request_submitter = UploadRequestSubmitter(268self.transfer_manager, self.result_queue, self.cli_params)269270def test_can_submit(self):271fileinfo = FileInfo(272src=self.filename, dest=self.bucket+'/'+self.key,273operation_name='upload')274self.assertTrue(275self.transfer_request_submitter.can_submit(fileinfo))276fileinfo.operation_name = 'foo'277self.assertFalse(278self.transfer_request_submitter.can_submit(fileinfo))279280def test_submit(self):281fileinfo = FileInfo(282src=self.filename, dest=self.bucket+'/'+self.key)283self.cli_params['guess_mime_type'] = True # Default settings284future = self.transfer_request_submitter.submit(fileinfo)285286self.assertIs(self.transfer_manager.upload.return_value, future)287upload_call_kwargs = self.transfer_manager.upload.call_args[1]288self.assertEqual(upload_call_kwargs['fileobj'], self.filename)289self.assertEqual(upload_call_kwargs['bucket'], self.bucket)290self.assertEqual(upload_call_kwargs['key'], self.key)291self.assertEqual(upload_call_kwargs['extra_args'], {})292293# Make sure the subscriber applied are of the correct type and order294ref_subscribers = [295ProvideSizeSubscriber,296ProvideUploadContentTypeSubscriber,297UploadResultSubscriber298]299actual_subscribers = upload_call_kwargs['subscribers']300self.assertEqual(len(ref_subscribers), len(actual_subscribers))301for i, actual_subscriber in enumerate(actual_subscribers):302self.assertIsInstance(actual_subscriber, ref_subscribers[i])303304def test_submit_with_extra_args(self):305fileinfo = FileInfo(306src=self.filename, dest=self.bucket+'/'+self.key)307# Set some extra argument like storage_class to make sure cli308# params get mapped to request parameters.309self.cli_params['storage_class'] = 'STANDARD_IA'310self.transfer_request_submitter.submit(fileinfo)311312upload_call_kwargs = self.transfer_manager.upload.call_args[1]313self.assertEqual(314upload_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})315316def test_submit_when_content_type_specified(self):317fileinfo = FileInfo(318src=self.filename, dest=self.bucket+'/'+self.key)319self.cli_params['content_type'] = 'text/plain'320self.transfer_request_submitter.submit(fileinfo)321322upload_call_kwargs = self.transfer_manager.upload.call_args[1]323self.assertEqual(324upload_call_kwargs['extra_args'], {'ContentType': 'text/plain'})325ref_subscribers = [326ProvideSizeSubscriber,327UploadResultSubscriber328]329actual_subscribers = upload_call_kwargs['subscribers']330self.assertEqual(len(ref_subscribers), len(actual_subscribers))331for i, actual_subscriber in enumerate(actual_subscribers):332self.assertIsInstance(actual_subscriber, ref_subscribers[i])333334def test_submit_when_no_guess_content_mime_type(self):335fileinfo = FileInfo(336src=self.filename, dest=self.bucket+'/'+self.key)337self.cli_params['guess_mime_type'] = False338self.transfer_request_submitter.submit(fileinfo)339340upload_call_kwargs = self.transfer_manager.upload.call_args[1]341ref_subscribers = [342ProvideSizeSubscriber,343UploadResultSubscriber344]345actual_subscribers = upload_call_kwargs['subscribers']346self.assertEqual(len(ref_subscribers), len(actual_subscribers))347for i, actual_subscriber in enumerate(actual_subscribers):348self.assertIsInstance(actual_subscriber, ref_subscribers[i])349350def test_warn_on_too_large_transfer(self):351fileinfo = FileInfo(352src=self.filename, dest=self.bucket+'/'+self.key,353size=MAX_UPLOAD_SIZE+1)354future = self.transfer_request_submitter.submit(fileinfo)355356# A warning should have been submitted because it is too large.357warning_result = self.result_queue.get()358self.assertIsInstance(warning_result, WarningResult)359self.assertIn('exceeds s3 upload limit', warning_result.message)360361# Make sure that the transfer was still attempted362self.assertIs(self.transfer_manager.upload.return_value, future)363self.assertEqual(len(self.transfer_manager.upload.call_args_list), 1)364365def test_dry_run(self):366self.cli_params['dryrun'] = True367self.transfer_request_submitter = UploadRequestSubmitter(368self.transfer_manager, self.result_queue, self.cli_params)369fileinfo = FileInfo(370src=self.filename, src_type='local', operation_name='upload',371dest=self.bucket + '/' + self.key, dest_type='s3')372self.transfer_request_submitter.submit(fileinfo)373374result = self.result_queue.get()375self.assertIsInstance(result, DryRunResult)376self.assertEqual(result.transfer_type, 'upload')377self.assertTrue(result.src.endswith(self.filename))378self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)379380def test_submit_move_adds_delete_source_subscriber(self):381fileinfo = FileInfo(382src=self.filename, dest=self.bucket+'/'+self.key)383self.cli_params['guess_mime_type'] = True # Default settings384self.cli_params['is_move'] = True385self.transfer_request_submitter.submit(fileinfo)386ref_subscribers = [387ProvideSizeSubscriber,388ProvideUploadContentTypeSubscriber,389DeleteSourceFileSubscriber,390UploadResultSubscriber,391]392upload_call_kwargs = self.transfer_manager.upload.call_args[1]393actual_subscribers = upload_call_kwargs['subscribers']394self.assertEqual(len(ref_subscribers), len(actual_subscribers))395for i, actual_subscriber in enumerate(actual_subscribers):396self.assertIsInstance(actual_subscriber, ref_subscribers[i])397398399class TestDownloadRequestSubmitter(BaseTransferRequestSubmitterTest):400def setUp(self):401super(TestDownloadRequestSubmitter, self).setUp()402self.transfer_request_submitter = DownloadRequestSubmitter(403self.transfer_manager, self.result_queue, self.cli_params)404405def assert_no_downloads_happened(self):406self.assertEqual(len(self.transfer_manager.download.call_args_list), 0)407408def create_file_info(self, key, associated_response_data=None):409kwargs = {410'src': self.bucket + '/' + key,411'src_type': 's3',412'dest': self.filename,413'dest_type': 'local',414'operation_name': 'download',415'compare_key': key,416}417if associated_response_data is not None:418kwargs['associated_response_data'] = associated_response_data419return FileInfo(**kwargs)420421def test_can_submit(self):422fileinfo = FileInfo(423src=self.bucket+'/'+self.key, dest=self.filename,424operation_name='download')425self.assertTrue(426self.transfer_request_submitter.can_submit(fileinfo))427fileinfo.operation_name = 'foo'428self.assertFalse(429self.transfer_request_submitter.can_submit(fileinfo))430431def test_submit(self):432fileinfo = self.create_file_info(self.key)433future = self.transfer_request_submitter.submit(fileinfo)434435self.assertIs(self.transfer_manager.download.return_value, future)436download_call_kwargs = self.transfer_manager.download.call_args[1]437self.assertEqual(download_call_kwargs['fileobj'], self.filename)438self.assertEqual(download_call_kwargs['bucket'], self.bucket)439self.assertEqual(download_call_kwargs['key'], self.key)440self.assertEqual(download_call_kwargs['extra_args'], {})441442# Make sure the subscriber applied are of the correct type and order443ref_subscribers = [444ProvideSizeSubscriber,445ProvideETagSubscriber,446DirectoryCreatorSubscriber,447ProvideLastModifiedTimeSubscriber,448DownloadResultSubscriber449]450actual_subscribers = download_call_kwargs['subscribers']451self.assertEqual(len(ref_subscribers), len(actual_subscribers))452for i, actual_subscriber in enumerate(actual_subscribers):453self.assertIsInstance(actual_subscriber, ref_subscribers[i])454455def test_submit_with_extra_args(self):456fileinfo = self.create_file_info(self.key)457self.cli_params['sse_c'] = 'AES256'458self.cli_params['sse_c_key'] = 'mykey'459self.transfer_request_submitter.submit(fileinfo)460461# Set some extra argument like sse_c to make sure cli462# params get mapped to request parameters.463download_call_kwargs = self.transfer_manager.download.call_args[1]464self.assertEqual(465download_call_kwargs['extra_args'],466{'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': 'mykey'}467)468469def test_warn_glacier_for_incompatible(self):470fileinfo = FileInfo(471src=self.bucket+'/'+self.key, dest=self.filename,472operation_name='download',473associated_response_data={474'StorageClass': 'GLACIER',475}476)477future = self.transfer_request_submitter.submit(fileinfo)478479# A warning should have been submitted because it is a non-restored480# glacier object.481warning_result = self.result_queue.get()482self.assertIsInstance(warning_result, WarningResult)483self.assertIn(484'Unable to perform download operations on GLACIER objects',485warning_result.message)486487# The transfer should have been skipped.488self.assertIsNone(future)489self.assert_no_downloads_happened()490491def test_not_warn_glacier_for_compatible(self):492fileinfo = self.create_file_info(493self.key, associated_response_data={494'StorageClass': 'GLACIER',495'Restore': 'ongoing-request="false"'496}497)498future = self.transfer_request_submitter.submit(fileinfo)499500# A warning should have not been submitted because it is a restored501# glacier object.502self.assertTrue(self.result_queue.empty())503504# And the transfer should not have been skipped.505self.assertIs(self.transfer_manager.download.return_value, future)506self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)507508def test_warn_glacier_force_glacier(self):509self.cli_params['force_glacier_transfer'] = True510fileinfo = self.create_file_info(511self.key,512associated_response_data={513'StorageClass': 'GLACIER',514}515)516future = self.transfer_request_submitter.submit(fileinfo)517518# A warning should have not been submitted because it is glacier519# transfers were forced.520self.assertTrue(self.result_queue.empty())521self.assertIs(self.transfer_manager.download.return_value, future)522self.assertEqual(len(self.transfer_manager.download.call_args_list), 1)523524def test_warn_glacier_ignore_glacier_warnings(self):525self.cli_params['ignore_glacier_warnings'] = True526fileinfo = FileInfo(527src=self.bucket+'/'+self.key, dest=self.filename,528operation_name='download',529associated_response_data={530'StorageClass': 'GLACIER',531}532)533future = self.transfer_request_submitter.submit(fileinfo)534535# A warning should have not been submitted because it was specified536# to ignore glacier warnings.537self.assertTrue(self.result_queue.empty())538# But the transfer still should have been skipped.539self.assertIsNone(future)540self.assert_no_downloads_happened()541542def test_warn_and_ignore_on_parent_dir_reference(self):543fileinfo = self.create_file_info('../foo.txt')544future = self.transfer_request_submitter.submit(fileinfo)545warning_result = self.result_queue.get()546self.assertIsInstance(warning_result, WarningResult)547self.assert_no_downloads_happened()548549def test_warn_and_ignore_with_leading_chars(self):550fileinfo = self.create_file_info('a/../../foo.txt')551future = self.transfer_request_submitter.submit(fileinfo)552warning_result = self.result_queue.get()553self.assertIsInstance(warning_result, WarningResult)554self.assert_no_downloads_happened()555556def test_allow_double_dots_that_dont_escape_cwd(self):557self.cli_params['dryrun'] = True558# This is fine because it's 'foo.txt'.559fileinfo = self.create_file_info('a/../foo.txt')560future = self.transfer_request_submitter.submit(fileinfo)561self.assertIsInstance(self.result_queue.get(), DryRunResult)562563def test_dry_run(self):564self.cli_params['dryrun'] = True565self.transfer_request_submitter = DownloadRequestSubmitter(566self.transfer_manager, self.result_queue, self.cli_params)567fileinfo = self.create_file_info(self.key)568self.transfer_request_submitter.submit(fileinfo)569570result = self.result_queue.get()571self.assertIsInstance(result, DryRunResult)572self.assertEqual(result.transfer_type, 'download')573self.assertTrue(result.dest.endswith(self.filename))574self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)575576def test_submit_move_adds_delete_source_subscriber(self):577fileinfo = self.create_file_info(self.key)578self.cli_params['guess_mime_type'] = True # Default settings579self.cli_params['is_move'] = True580self.transfer_request_submitter.submit(fileinfo)581ref_subscribers = [582ProvideSizeSubscriber,583ProvideETagSubscriber,584DirectoryCreatorSubscriber,585ProvideLastModifiedTimeSubscriber,586DeleteSourceObjectSubscriber,587DownloadResultSubscriber,588]589download_call_kwargs = self.transfer_manager.download.call_args[1]590actual_subscribers = download_call_kwargs['subscribers']591self.assertEqual(len(ref_subscribers), len(actual_subscribers))592for i, actual_subscriber in enumerate(actual_subscribers):593self.assertIsInstance(actual_subscriber, ref_subscribers[i])594595596class TestCopyRequestSubmitter(BaseTransferRequestSubmitterTest):597def setUp(self):598super(TestCopyRequestSubmitter, self).setUp()599self.source_bucket = 'mysourcebucket'600self.source_key = 'mysourcekey'601self.transfer_request_submitter = CopyRequestSubmitter(602self.transfer_manager, self.result_queue, self.cli_params)603604def test_can_submit(self):605fileinfo = FileInfo(606src=self.source_bucket+'/'+self.source_key,607dest=self.bucket+'/'+self.key, operation_name='copy')608self.assertTrue(609self.transfer_request_submitter.can_submit(fileinfo))610fileinfo.operation_name = 'foo'611self.assertFalse(612self.transfer_request_submitter.can_submit(fileinfo))613614def test_submit(self):615fileinfo = FileInfo(616src=self.source_bucket+'/'+self.source_key,617dest=self.bucket+'/'+self.key)618self.cli_params['guess_mime_type'] = True # Default settings619future = self.transfer_request_submitter.submit(fileinfo)620self.assertIs(self.transfer_manager.copy.return_value, future)621copy_call_kwargs = self.transfer_manager.copy.call_args[1]622self.assertEqual(623copy_call_kwargs['copy_source'],624{'Bucket': self.source_bucket, 'Key': self.source_key})625self.assertEqual(copy_call_kwargs['bucket'], self.bucket)626self.assertEqual(copy_call_kwargs['key'], self.key)627self.assertEqual(copy_call_kwargs['extra_args'], {})628629# Make sure the subscriber applied are of the correct type and order630ref_subscribers = [631ProvideSizeSubscriber,632ProvideCopyContentTypeSubscriber,633CopyResultSubscriber634]635actual_subscribers = copy_call_kwargs['subscribers']636self.assertEqual(len(ref_subscribers), len(actual_subscribers))637for i, actual_subscriber in enumerate(actual_subscribers):638self.assertIsInstance(actual_subscriber, ref_subscribers[i])639640def test_submit_with_extra_args(self):641fileinfo = FileInfo(642src=self.source_bucket+'/'+self.source_key,643dest=self.bucket+'/'+self.key)644# Set some extra argument like storage_class to make sure cli645# params get mapped to request parameters.646self.cli_params['storage_class'] = 'STANDARD_IA'647self.transfer_request_submitter.submit(fileinfo)648649copy_call_kwargs = self.transfer_manager.copy.call_args[1]650self.assertEqual(651copy_call_kwargs['extra_args'], {'StorageClass': 'STANDARD_IA'})652653def test_submit_when_content_type_specified(self):654fileinfo = FileInfo(655src=self.source_bucket+'/'+self.source_key,656dest=self.bucket+'/'+self.key)657self.cli_params['content_type'] = 'text/plain'658self.transfer_request_submitter.submit(fileinfo)659660copy_call_kwargs = self.transfer_manager.copy.call_args[1]661self.assertEqual(662copy_call_kwargs['extra_args'], {'ContentType': 'text/plain'})663ref_subscribers = [664ProvideSizeSubscriber,665CopyResultSubscriber666]667actual_subscribers = copy_call_kwargs['subscribers']668self.assertEqual(len(ref_subscribers), len(actual_subscribers))669for i, actual_subscriber in enumerate(actual_subscribers):670self.assertIsInstance(actual_subscriber, ref_subscribers[i])671672def test_submit_when_no_guess_content_mime_type(self):673fileinfo = FileInfo(674src=self.source_bucket+'/'+self.source_key,675dest=self.bucket+'/'+self.key)676self.cli_params['guess_mime_type'] = False677self.transfer_request_submitter.submit(fileinfo)678679copy_call_kwargs = self.transfer_manager.copy.call_args[1]680ref_subscribers = [681ProvideSizeSubscriber,682CopyResultSubscriber683]684actual_subscribers = copy_call_kwargs['subscribers']685self.assertEqual(len(ref_subscribers), len(actual_subscribers))686for i, actual_subscriber in enumerate(actual_subscribers):687self.assertIsInstance(actual_subscriber, ref_subscribers[i])688689def test_warn_glacier_for_incompatible(self):690fileinfo = FileInfo(691src=self.source_bucket+'/'+self.source_key,692dest=self.bucket+'/'+self.key,693operation_name='copy',694associated_response_data={695'StorageClass': 'GLACIER',696}697)698future = self.transfer_request_submitter.submit(fileinfo)699700# A warning should have been submitted because it is a non-restored701# glacier object.702warning_result = self.result_queue.get()703self.assertIsInstance(warning_result, WarningResult)704self.assertIn(705'Unable to perform copy operations on GLACIER objects',706warning_result.message)707708# The transfer request should have never been sent therefore return709# no future.710self.assertIsNone(future)711# The transfer should have been skipped.712self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)713714def test_not_warn_glacier_for_compatible(self):715fileinfo = FileInfo(716src=self.source_bucket+'/'+self.source_key,717dest=self.bucket+'/'+self.key,718operation_name='copy',719associated_response_data={720'StorageClass': 'GLACIER',721'Restore': 'ongoing-request="false"'722}723)724future = self.transfer_request_submitter.submit(fileinfo)725self.assertIs(self.transfer_manager.copy.return_value, future)726727# A warning should have not been submitted because it is a restored728# glacier object.729self.assertTrue(self.result_queue.empty())730731# And the transfer should not have been skipped.732self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)733734def test_warn_glacier_force_glacier(self):735self.cli_params['force_glacier_transfer'] = True736fileinfo = FileInfo(737src=self.source_bucket+'/'+self.source_key,738dest=self.bucket+'/'+self.key,739operation_name='copy',740associated_response_data={741'StorageClass': 'GLACIER',742}743)744future = self.transfer_request_submitter.submit(fileinfo)745self.assertIs(self.transfer_manager.copy.return_value, future)746747# A warning should have not been submitted because it is glacier748# transfers were forced.749self.assertTrue(self.result_queue.empty())750self.assertEqual(len(self.transfer_manager.copy.call_args_list), 1)751752def test_warn_glacier_ignore_glacier_warnings(self):753self.cli_params['ignore_glacier_warnings'] = True754fileinfo = FileInfo(755src=self.source_bucket+'/'+self.source_key,756dest=self.bucket+'/'+self.key,757operation_name='copy',758associated_response_data={759'StorageClass': 'GLACIER',760}761)762future = self.transfer_request_submitter.submit(fileinfo)763764# The transfer request should have never been sent therefore return765# no future.766self.assertIsNone(future)767# A warning should have not been submitted because it was specified768# to ignore glacier warnings.769self.assertTrue(self.result_queue.empty())770# But the transfer still should have been skipped.771self.assertEqual(len(self.transfer_manager.copy.call_args_list), 0)772773def test_dry_run(self):774self.cli_params['dryrun'] = True775self.transfer_request_submitter = CopyRequestSubmitter(776self.transfer_manager, self.result_queue, self.cli_params)777fileinfo = FileInfo(778src=self.source_bucket + '/' + self.source_key, src_type='s3',779dest=self.bucket + '/' + self.key, dest_type='s3',780operation_name='copy')781self.transfer_request_submitter.submit(fileinfo)782783result = self.result_queue.get()784self.assertIsInstance(result, DryRunResult)785self.assertEqual(result.transfer_type, 'copy')786source = 's3://' + self.source_bucket + '/' + self.source_key787self.assertEqual(result.src, source)788self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)789790def test_submit_move_adds_delete_source_subscriber(self):791fileinfo = FileInfo(792dest=self.source_bucket + '/' + self.source_key,793src=self.bucket + '/' + self.key)794self.cli_params['guess_mime_type'] = True # Default settings795self.cli_params['is_move'] = True796self.transfer_request_submitter.submit(fileinfo)797ref_subscribers = [798ProvideSizeSubscriber,799ProvideCopyContentTypeSubscriber,800DeleteSourceObjectSubscriber,801CopyResultSubscriber,802]803copy_call_kwargs = self.transfer_manager.copy.call_args[1]804actual_subscribers = copy_call_kwargs['subscribers']805self.assertEqual(len(ref_subscribers), len(actual_subscribers))806for i, actual_subscriber in enumerate(actual_subscribers):807self.assertIsInstance(actual_subscriber, ref_subscribers[i])808809810class TestUploadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):811def setUp(self):812super(TestUploadStreamRequestSubmitter, self).setUp()813self.filename = '-'814self.cli_params['is_stream'] = True815self.transfer_request_submitter = UploadStreamRequestSubmitter(816self.transfer_manager, self.result_queue, self.cli_params)817818def test_can_submit(self):819fileinfo = FileInfo(820src=self.filename, dest=self.bucket+'/'+self.key,821operation_name='upload')822self.assertTrue(823self.transfer_request_submitter.can_submit(fileinfo))824self.cli_params['is_stream'] = False825self.assertFalse(826self.transfer_request_submitter.can_submit(fileinfo))827828def test_submit(self):829fileinfo = FileInfo(830src=self.filename, dest=self.bucket+'/'+self.key)831future = self.transfer_request_submitter.submit(fileinfo)832self.assertIs(self.transfer_manager.upload.return_value, future)833834upload_call_kwargs = self.transfer_manager.upload.call_args[1]835self.assertIsInstance(836upload_call_kwargs['fileobj'], NonSeekableStream)837self.assertEqual(upload_call_kwargs['bucket'], self.bucket)838self.assertEqual(upload_call_kwargs['key'], self.key)839self.assertEqual(upload_call_kwargs['extra_args'], {})840841ref_subscribers = [842UploadStreamResultSubscriber843]844actual_subscribers = upload_call_kwargs['subscribers']845self.assertEqual(len(ref_subscribers), len(actual_subscribers))846for i, actual_subscriber in enumerate(actual_subscribers):847self.assertIsInstance(actual_subscriber, ref_subscribers[i])848849def test_submit_with_expected_size_provided(self):850provided_size = 100851self.cli_params['expected_size'] = provided_size852fileinfo = FileInfo(853src=self.filename, dest=self.bucket+'/'+self.key)854self.transfer_request_submitter.submit(fileinfo)855upload_call_kwargs = self.transfer_manager.upload.call_args[1]856857ref_subscribers = [858ProvideSizeSubscriber,859UploadStreamResultSubscriber860]861actual_subscribers = upload_call_kwargs['subscribers']862self.assertEqual(len(ref_subscribers), len(actual_subscribers))863for i, actual_subscriber in enumerate(actual_subscribers):864self.assertIsInstance(actual_subscriber, ref_subscribers[i])865# The ProvideSizeSubscriber should be providing the correct size866self.assertEqual(actual_subscribers[0].size, provided_size)867868def test_dry_run(self):869self.cli_params['dryrun'] = True870self.transfer_request_submitter = UploadStreamRequestSubmitter(871self.transfer_manager, self.result_queue, self.cli_params)872fileinfo = FileInfo(873src=self.filename, src_type='local', operation_name='upload',874dest=self.bucket + '/' + self.key, dest_type='s3')875self.transfer_request_submitter.submit(fileinfo)876877result = self.result_queue.get()878self.assertIsInstance(result, DryRunResult)879self.assertEqual(result.transfer_type, 'upload')880self.assertEqual(result.dest, 's3://' + self.bucket + '/' + self.key)881self.assertEqual(result.src, '-')882883884class TestDownloadStreamRequestSubmitter(BaseTransferRequestSubmitterTest):885def setUp(self):886super(TestDownloadStreamRequestSubmitter, self).setUp()887self.filename = '-'888self.cli_params['is_stream'] = True889self.transfer_request_submitter = DownloadStreamRequestSubmitter(890self.transfer_manager, self.result_queue, self.cli_params)891892def test_can_submit(self):893fileinfo = FileInfo(894src=self.bucket+'/'+self.key, dest=self.filename,895operation_name='download')896self.assertTrue(897self.transfer_request_submitter.can_submit(fileinfo))898self.cli_params['is_stream'] = False899self.assertFalse(900self.transfer_request_submitter.can_submit(fileinfo))901902def test_submit(self):903fileinfo = FileInfo(904src=self.bucket+'/'+self.key, dest=self.filename,905compare_key=self.key)906future = self.transfer_request_submitter.submit(fileinfo)907self.assertIs(self.transfer_manager.download.return_value, future)908909download_call_kwargs = self.transfer_manager.download.call_args[1]910self.assertIsInstance(911download_call_kwargs['fileobj'], StdoutBytesWriter)912self.assertEqual(download_call_kwargs['bucket'], self.bucket)913self.assertEqual(download_call_kwargs['key'], self.key)914self.assertEqual(download_call_kwargs['extra_args'], {})915916ref_subscribers = [917DownloadStreamResultSubscriber918]919actual_subscribers = download_call_kwargs['subscribers']920self.assertEqual(len(ref_subscribers), len(actual_subscribers))921for i, actual_subscriber in enumerate(actual_subscribers):922self.assertIsInstance(actual_subscriber, ref_subscribers[i])923924def test_dry_run(self):925self.cli_params['dryrun'] = True926self.transfer_request_submitter = DownloadStreamRequestSubmitter(927self.transfer_manager, self.result_queue, self.cli_params)928fileinfo = FileInfo(929dest=self.filename, dest_type='local', operation_name='download',930src=self.bucket + '/' + self.key, src_type='s3',931compare_key=self.key)932self.transfer_request_submitter.submit(fileinfo)933934result = self.result_queue.get()935self.assertIsInstance(result, DryRunResult)936self.assertEqual(result.transfer_type, 'download')937self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)938self.assertEqual(result.dest, '-')939940941class TestDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):942def setUp(self):943super(TestDeleteRequestSubmitter, self).setUp()944self.transfer_request_submitter = DeleteRequestSubmitter(945self.transfer_manager, self.result_queue, self.cli_params)946947def test_can_submit(self):948fileinfo = FileInfo(949src=self.bucket+'/'+self.key, dest=None, operation_name='delete',950src_type='s3')951self.assertTrue(952self.transfer_request_submitter.can_submit(fileinfo))953fileinfo.operation_name = 'foo'954self.assertFalse(955self.transfer_request_submitter.can_submit(fileinfo))956957def test_cannot_submit_local_deletes(self):958fileinfo = FileInfo(959src=self.bucket+'/'+self.key, dest=None, operation_name='delete',960src_type='local')961self.assertFalse(962self.transfer_request_submitter.can_submit(fileinfo))963964def test_submit(self):965fileinfo = FileInfo(966src=self.bucket+'/'+self.key, dest=None, operation_name='delete')967future = self.transfer_request_submitter.submit(fileinfo)968self.assertIs(self.transfer_manager.delete.return_value, future)969970delete_call_kwargs = self.transfer_manager.delete.call_args[1]971self.assertEqual(delete_call_kwargs['bucket'], self.bucket)972self.assertEqual(delete_call_kwargs['key'], self.key)973self.assertEqual(delete_call_kwargs['extra_args'], {})974975ref_subscribers = [976DeleteResultSubscriber977]978actual_subscribers = delete_call_kwargs['subscribers']979self.assertEqual(len(ref_subscribers), len(actual_subscribers))980for i, actual_subscriber in enumerate(actual_subscribers):981self.assertIsInstance(actual_subscriber, ref_subscribers[i])982983def test_dry_run(self):984self.cli_params['dryrun'] = True985self.transfer_request_submitter = DeleteRequestSubmitter(986self.transfer_manager, self.result_queue, self.cli_params)987fileinfo = FileInfo(988src=self.bucket + '/' + self.key, src_type='s3',989dest=self.bucket + '/' + self.key, dest_type='s3',990operation_name='delete')991self.transfer_request_submitter.submit(fileinfo)992993result = self.result_queue.get()994self.assertIsInstance(result, DryRunResult)995self.assertEqual(result.transfer_type, 'delete')996self.assertEqual(result.src, 's3://' + self.bucket + '/' + self.key)997self.assertIsNone(result.dest)9989991000class TestLocalDeleteRequestSubmitter(BaseTransferRequestSubmitterTest):1001def setUp(self):1002super(TestLocalDeleteRequestSubmitter, self).setUp()1003self.transfer_request_submitter = LocalDeleteRequestSubmitter(1004self.transfer_manager, self.result_queue, self.cli_params)1005self.file_creator = FileCreator()10061007def tearDown(self):1008super(TestLocalDeleteRequestSubmitter, self).tearDown()1009self.file_creator.remove_all()10101011def test_can_submit(self):1012fileinfo = FileInfo(1013src=self.filename, dest=None, operation_name='delete',1014src_type='local')1015self.assertTrue(1016self.transfer_request_submitter.can_submit(fileinfo))1017fileinfo.operation_name = 'foo'1018self.assertFalse(1019self.transfer_request_submitter.can_submit(fileinfo))10201021def test_cannot_submit_remote_deletes(self):1022fileinfo = FileInfo(1023src=self.filename, dest=None, operation_name='delete',1024src_type='s3')1025self.assertFalse(1026self.transfer_request_submitter.can_submit(fileinfo))10271028def test_submit(self):1029full_filename = self.file_creator.create_file(self.filename, 'content')1030fileinfo = FileInfo(1031src=full_filename, dest=None, operation_name='delete',1032src_type='local')1033rval = self.transfer_request_submitter.submit(fileinfo)1034self.assertTrue(rval)10351036queued_result = self.result_queue.get()1037self.assertIsInstance(queued_result, QueuedResult)1038self.assertEqual(queued_result.transfer_type, 'delete')1039self.assertTrue(queued_result.src.endswith(self.filename))1040self.assertIsNone(queued_result.dest)1041self.assertEqual(queued_result.total_transfer_size, 0)10421043failure_result = self.result_queue.get()1044self.assertIsInstance(failure_result, SuccessResult)1045self.assertEqual(failure_result.transfer_type, 'delete')1046self.assertTrue(failure_result.src.endswith(self.filename))1047self.assertIsNone(failure_result.dest)10481049self.assertFalse(os.path.exists(full_filename))10501051def test_submit_with_exception(self):1052fileinfo = FileInfo(1053src=self.filename, dest=None, operation_name='delete',1054src_type='local')1055# The file was never created so it should trigger an exception1056# when it is attempted to be deleted in the submitter.1057rval = self.transfer_request_submitter.submit(fileinfo)1058self.assertTrue(rval)10591060queued_result = self.result_queue.get()1061self.assertIsInstance(queued_result, QueuedResult)1062self.assertEqual(queued_result.transfer_type, 'delete')1063self.assertTrue(queued_result.src.endswith(self.filename))1064self.assertIsNone(queued_result.dest)1065self.assertEqual(queued_result.total_transfer_size, 0)10661067failure_result = self.result_queue.get()1068self.assertIsInstance(failure_result, FailureResult)1069self.assertEqual(failure_result.transfer_type, 'delete')1070self.assertTrue(failure_result.src.endswith(self.filename))1071self.assertIsNone(failure_result.dest)10721073def test_dry_run(self):1074self.cli_params['dryrun'] = True1075fileinfo = FileInfo(1076src=self.filename, src_type='local',1077dest=self.filename, dest_type='local',1078operation_name='delete')1079self.transfer_request_submitter.submit(fileinfo)10801081result = self.result_queue.get()1082self.assertIsInstance(result, DryRunResult)1083self.assertEqual(result.transfer_type, 'delete')1084self.assertTrue(result.src.endswith(self.filename))1085self.assertIsNone(result.dest)108610871088