Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/integration/customizations/s3/__init__.py
1567 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
from botocore.exceptions import ClientError
14
from awscli.testutils import create_bucket
15
16
17
def make_s3_files(session, key1='text1.txt', key2='text2.txt', size=None):
18
"""
19
Creates a randomly generated bucket in s3 with the files text1.txt and
20
another_directory/text2.txt inside. The directory is manually created
21
as it tests the ability to handle directories when generating s3 files.
22
"""
23
region = 'us-west-2'
24
bucket = create_bucket(session)
25
26
if size:
27
string1 = "*" * size
28
string2 = string1
29
else:
30
string1 = "This is a test."
31
string2 = "This is another test."
32
33
client = session.create_client('s3', region_name=region)
34
client.put_object(Bucket=bucket, Key=key1, Body=string1)
35
if key2 is not None:
36
client.put_object(Bucket=bucket, Key='another_directory/')
37
client.put_object(Bucket=bucket, Key='another_directory/%s' % key2,
38
Body=string2)
39
return bucket
40
41
42
def s3_cleanup(bucket, session):
43
"""
44
Function to cleanup generated s3 bucket and files.
45
"""
46
region = 'us-west-2'
47
client = session.create_client('s3', region_name=region)
48
try:
49
client.head_bucket(Bucket=bucket)
50
except ClientError:
51
return
52
response = client.list_objects(Bucket=bucket)
53
contents = response.get('Contents', {})
54
keys = [content['Key'] for content in contents]
55
for key in keys:
56
client.delete_object(Bucket=bucket, Key=key)
57
client.delete_bucket(Bucket=bucket)
58
59