Path: blob/develop/tests/integration/customizations/s3/test_filegenerator.py
1567 views
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.121314# Note that all of these functions can be found in the unit tests.15# The only difference is that these tests use botocore's actual session16# variables to communicate with s3 as these are integration tests. Therefore,17# only tests that use sessions are included as integration tests.1819import unittest20import os21import itertools2223import botocore.session24from awscli import EnvironmentVariables25from awscli.customizations.s3.filegenerator import FileGenerator, FileStat26from tests.unit.customizations.s3 import compare_files27from tests.integration.customizations.s3 import make_s3_files, s3_cleanup282930class S3FileGeneratorIntTest(unittest.TestCase):31def setUp(self):32self.session = botocore.session.get_session(EnvironmentVariables)33# Use the datetime and and blob parsing of the CLI34factory = self.session.get_component('response_parser_factory')35factory.set_parser_defaults(36blob_parser=lambda x: x,37timestamp_parser=lambda x: x)38self.client = self.session.create_client('s3', region_name='us-west-2')39self.bucket = make_s3_files(self.session)40self.file1 = self.bucket + '/' + 'text1.txt'41self.file2 = self.bucket + '/' + 'another_directory/text2.txt'4243def tearDown(self):44s3_cleanup(self.bucket, self.session)4546def test_s3_file(self):47#48# Generate a single s3 file49# Note: Size and last update are not tested because s3 generates them.50#51input_s3_file = {'src': {'path': self.file1, 'type': 's3'},52'dest': {'path': 'text1.txt', 'type': 'local'},53'dir_op': False, 'use_src_name': False}54expected_file_size = 1555result_list = list(56FileGenerator(self.client, '').call(input_s3_file))57file_stat = FileStat(src=self.file1, dest='text1.txt',58compare_key='text1.txt',59size=expected_file_size,60last_update=result_list[0].last_update,61src_type='s3',62dest_type='local', operation_name='')6364expected_list = [file_stat]65self.assertEqual(len(result_list), 1)66compare_files(self, result_list[0], expected_list[0])6768def test_s3_directory(self):69#70# Generates s3 files under a common prefix. Also it ensures that71# zero size files are ignored.72# Note: Size and last update are not tested because s3 generates them.73#74input_s3_file = {'src': {'path': self.bucket+'/', 'type': 's3'},75'dest': {'path': '', 'type': 'local'},76'dir_op': True, 'use_src_name': True}77result_list = list(78FileGenerator(self.client, '').call(input_s3_file))79file_stat = FileStat(src=self.file2,80dest='another_directory' + os.sep + 'text2.txt',81compare_key='another_directory/text2.txt',82size=21,83last_update=result_list[0].last_update,84src_type='s3',85dest_type='local', operation_name='')86file_stat2 = FileStat(src=self.file1,87dest='text1.txt',88compare_key='text1.txt',89size=15,90last_update=result_list[1].last_update,91src_type='s3',92dest_type='local', operation_name='')9394expected_result = [file_stat, file_stat2]95self.assertEqual(len(result_list), 2)96compare_files(self, result_list[0], expected_result[0])97compare_files(self, result_list[1], expected_result[1])9899def test_s3_delete_directory(self):100#101# Generates s3 files under a common prefix. Also it ensures that102# the directory itself is included because it is a delete command103# Note: Size and last update are not tested because s3 generates them.104#105input_s3_file = {'src': {'path': self.bucket+'/', 'type': 's3'},106'dest': {'path': '', 'type': 'local'},107'dir_op': True, 'use_src_name': True}108result_list = list(109FileGenerator(self.client, 'delete').call(input_s3_file))110111file_stat1 = FileStat(112src=self.bucket + '/another_directory/',113dest='another_directory' + os.sep,114compare_key='another_directory/',115size=0,116last_update=result_list[0].last_update,117src_type='s3',118dest_type='local', operation_name='delete')119file_stat2 = FileStat(120src=self.file2,121dest='another_directory' + os.sep + 'text2.txt',122compare_key='another_directory/text2.txt',123size=21,124last_update=result_list[1].last_update,125src_type='s3',126dest_type='local', operation_name='delete')127file_stat3 = FileStat(128src=self.file1,129dest='text1.txt',130compare_key='text1.txt',131size=15,132last_update=result_list[2].last_update,133src_type='s3',134dest_type='local', operation_name='delete')135136expected_list = [file_stat1, file_stat2, file_stat3]137self.assertEqual(len(result_list), 3)138compare_files(self, result_list[0], expected_list[0])139compare_files(self, result_list[1], expected_list[1])140compare_files(self, result_list[2], expected_list[2])141142def test_page_size(self):143input_s3_file = {'src': {'path': self.bucket+'/', 'type': 's3'},144'dest': {'path': '', 'type': 'local'},145'dir_op': True, 'use_src_name': True}146file_gen = FileGenerator(self.client, '',147page_size=1).call(input_s3_file)148limited_file_gen = itertools.islice(file_gen, 1)149result_list = list(limited_file_gen)150file_stat = FileStat(src=self.file2,151dest='another_directory' + os.sep + 'text2.txt',152compare_key='another_directory/text2.txt',153size=21,154last_update=result_list[0].last_update,155src_type='s3',156dest_type='local', operation_name='')157# Ensure only one item is returned from ``ListObjects``158self.assertEqual(len(result_list), 1)159compare_files(self, result_list[0], file_stat)160161162if __name__ == "__main__":163unittest.main()164165166