Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/test_s3uploader.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0e
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import os
14
import random
15
import string
16
import tempfile
17
import hashlib
18
import shutil
19
20
import botocore
21
import botocore.session
22
import botocore.exceptions
23
from botocore.stub import Stubber
24
from s3transfer import S3Transfer
25
26
from awscli.compat import OrderedDict
27
from awscli.testutils import mock, unittest
28
from awscli.customizations.s3uploader import S3Uploader
29
from awscli.customizations.s3uploader import NoSuchBucketError
30
31
32
class TestS3Uploader(unittest.TestCase):
33
34
def setUp(self):
35
self._construct_uploader("us-east-1")
36
37
def _construct_uploader(self, region):
38
self.s3client = botocore.session.get_session().create_client(
39
's3', region_name=region)
40
self.s3client_stub = Stubber(self.s3client)
41
self.transfer_manager_mock = mock.Mock(spec=S3Transfer)
42
self.transfer_manager_mock.upload = mock.Mock()
43
self.bucket_name = "bucketname"
44
self.prefix = None
45
46
self.s3uploader = S3Uploader(
47
self.s3client, self.bucket_name, self.prefix, None, False,
48
self.transfer_manager_mock)
49
50
@mock.patch('os.path.getsize', return_value=1)
51
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
52
def test_upload_successful(self, progress_percentage_mock, get_size_patch):
53
file_name = "filename"
54
remote_path = "remotepath"
55
prefix = "SomePrefix"
56
remote_path_with_prefix = "{0}/{1}".format(prefix, remote_path)
57
s3uploader = S3Uploader(
58
self.s3client, self.bucket_name, prefix, None, False,
59
self.transfer_manager_mock)
60
expected_upload_url = "s3://{0}/{1}/{2}".format(
61
self.bucket_name, prefix, remote_path)
62
63
# Setup mock to fake that file does not exist
64
s3uploader.file_exists = mock.Mock()
65
s3uploader.file_exists.return_value = False
66
# set the metadata used by the uploader when uploading
67
artifact_metadata = {"key": "val"}
68
s3uploader.artifact_metadata = artifact_metadata
69
70
upload_url = s3uploader.upload(file_name, remote_path)
71
self.assertEqual(expected_upload_url, upload_url)
72
73
expected_extra_args = {
74
# expected encryption args
75
"ServerSideEncryption": "AES256",
76
# expected metadata
77
"Metadata": artifact_metadata
78
}
79
self.transfer_manager_mock.upload.assert_called_once_with(
80
file_name, self.bucket_name, remote_path_with_prefix,
81
expected_extra_args, mock.ANY)
82
s3uploader.file_exists.assert_called_once_with(remote_path_with_prefix)
83
84
@mock.patch('os.path.getsize', return_value=1)
85
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
86
def test_upload_successful_odict(self, progress_percentage_mock, get_size_patch):
87
file_name = "filename"
88
remote_path = "remotepath"
89
prefix = "SomePrefix"
90
remote_path_with_prefix = "{0}/{1}".format(prefix, remote_path)
91
s3uploader = S3Uploader(
92
self.s3client, self.bucket_name, prefix, None, False,
93
self.transfer_manager_mock)
94
expected_upload_url = "s3://{0}/{1}/{2}".format(
95
self.bucket_name, prefix, remote_path)
96
97
# Setup mock to fake that file does not exist
98
s3uploader.file_exists = mock.Mock()
99
s3uploader.file_exists.return_value = False
100
# set the metadata used by the uploader when uploading
101
artifact_metadata = OrderedDict({"key": "val"})
102
s3uploader.artifact_metadata = artifact_metadata
103
104
upload_url = s3uploader.upload(file_name, remote_path)
105
self.assertEqual(expected_upload_url, upload_url)
106
107
expected_extra_args = {
108
# expected encryption args
109
"ServerSideEncryption": "AES256",
110
# expected metadata
111
"Metadata": artifact_metadata
112
}
113
self.transfer_manager_mock.upload.assert_called_once_with(
114
file_name, self.bucket_name, remote_path_with_prefix,
115
expected_extra_args, mock.ANY)
116
s3uploader.file_exists.assert_called_once_with(remote_path_with_prefix)
117
118
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
119
def test_upload_idempotency(self, progress_percentage_mock):
120
file_name = "filename"
121
remote_path = "remotepath"
122
123
# Setup mock to fake that file was already uploaded
124
self.s3uploader.file_exists = mock.Mock()
125
self.s3uploader.file_exists.return_value = True
126
127
self.s3uploader.upload(file_name, remote_path)
128
129
self.transfer_manager_mock.upload.assert_not_called()
130
self.s3uploader.file_exists.assert_called_once_with(remote_path)
131
132
@mock.patch('os.path.getsize', return_value=1)
133
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
134
def test_upload_force_upload(self, progress_percentage_mock, get_size_patch):
135
file_name = "filename"
136
remote_path = "remotepath"
137
expected_upload_url = "s3://{0}/{1}".format(self.bucket_name,
138
remote_path)
139
140
# Set ForceUpload = True
141
self.s3uploader = S3Uploader(
142
self.s3client, self.bucket_name, self.prefix,
143
None, True, self.transfer_manager_mock)
144
145
# Pretend file already exists
146
self.s3uploader.file_exists = mock.Mock()
147
self.s3uploader.file_exists.return_value = True
148
149
# Because we forced an update, this should reupload even if file exists
150
upload_url = self.s3uploader.upload(file_name, remote_path)
151
self.assertEqual(expected_upload_url, upload_url)
152
153
expected_encryption_args = {
154
"ServerSideEncryption": "AES256"
155
}
156
self.transfer_manager_mock.upload.assert_called_once_with(
157
file_name, self.bucket_name, remote_path,
158
expected_encryption_args, mock.ANY)
159
160
# Since ForceUpload=True, we should NEVER do the file-exists check
161
self.s3uploader.file_exists.assert_not_called()
162
163
@mock.patch('os.path.getsize', return_value=1)
164
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
165
def test_upload_successful_custom_kms_key(self, progress_percentage_mock, get_size_patch):
166
file_name = "filename"
167
remote_path = "remotepath"
168
kms_key_id = "kms_id"
169
expected_upload_url = "s3://{0}/{1}".format(self.bucket_name,
170
remote_path)
171
# Set KMS Key Id
172
self.s3uploader = S3Uploader(
173
self.s3client, self.bucket_name, self.prefix,
174
kms_key_id, False, self.transfer_manager_mock)
175
176
# Setup mock to fake that file does not exist
177
self.s3uploader.file_exists = mock.Mock()
178
self.s3uploader.file_exists.return_value = False
179
180
upload_url = self.s3uploader.upload(file_name, remote_path)
181
self.assertEqual(expected_upload_url, upload_url)
182
183
expected_encryption_args = {
184
"ServerSideEncryption": "aws:kms",
185
"SSEKMSKeyId": kms_key_id
186
}
187
self.transfer_manager_mock.upload.assert_called_once_with(
188
file_name, self.bucket_name, remote_path,
189
expected_encryption_args, mock.ANY)
190
self.s3uploader.file_exists.assert_called_once_with(remote_path)
191
192
@mock.patch('os.path.getsize', return_value=1)
193
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
194
def test_upload_successful_nobucket(self, progress_percentage_mock, get_size_patch):
195
file_name = "filename"
196
remote_path = "remotepath"
197
198
# Setup mock to fake that file does not exist
199
self.s3uploader.file_exists = mock.Mock()
200
self.s3uploader.file_exists.return_value = False
201
202
# Setup uploader to return a NOSuchBucket exception
203
exception = botocore.exceptions.ClientError(
204
{"Error": {"Code": "NoSuchBucket"}}, "OpName")
205
self.transfer_manager_mock.upload.side_effect = exception
206
207
with self.assertRaises(NoSuchBucketError):
208
self.s3uploader.upload(file_name, remote_path)
209
210
@mock.patch('os.path.getsize', return_value=1)
211
@mock.patch("awscli.customizations.s3uploader.ProgressPercentage")
212
def test_upload_successful_exceptions(self, progress_percentage_mock, get_size_patch):
213
file_name = "filename"
214
remote_path = "remotepath"
215
216
# Setup mock to fake that file does not exist
217
self.s3uploader.file_exists = mock.Mock()
218
self.s3uploader.file_exists.return_value = False
219
220
# Raise an unrecognized botocore error
221
exception = botocore.exceptions.ClientError(
222
{"Error": {"Code": "SomeError"}}, "OpName")
223
self.transfer_manager_mock.upload.side_effect = exception
224
225
with self.assertRaises(botocore.exceptions.ClientError):
226
self.s3uploader.upload(file_name, remote_path)
227
228
# Some other exception
229
self.transfer_manager_mock.upload.side_effect = FloatingPointError()
230
with self.assertRaises(FloatingPointError):
231
self.s3uploader.upload(file_name, remote_path)
232
233
def test_upload_with_dedup(self):
234
235
checksum = "some md5 checksum"
236
filename = "filename"
237
extension = "extn"
238
239
self.s3uploader.file_checksum = mock.Mock()
240
self.s3uploader.file_checksum.return_value = checksum
241
242
self.s3uploader.upload = mock.Mock()
243
244
self.s3uploader.upload_with_dedup(filename, extension)
245
246
remotepath = "{0}.{1}".format(checksum, extension)
247
self.s3uploader.upload.assert_called_once_with(filename, remotepath)
248
249
def test_file_exists(self):
250
key = "some/path"
251
expected_params = {
252
"Bucket": self.bucket_name,
253
"Key": key
254
}
255
response = {
256
"AcceptRanges": "bytes",
257
"ContentType": "text/html",
258
"LastModified": "Thu, 16 Apr 2015 18:19:14 GMT",
259
"ContentLength": 77,
260
"VersionId": "null",
261
"ETag": "\"30a6ec7e1a9ad79c203d05a589c8b400\"",
262
"Metadata": {}
263
}
264
265
# Let's pretend file exists
266
self.s3client_stub.add_response("head_object",
267
response,
268
expected_params)
269
270
with self.s3client_stub:
271
self.assertTrue(self.s3uploader.file_exists(key))
272
273
# Let's pretend file does not exist
274
self.s3client_stub.add_client_error(
275
'head_object', "ClientError", "some error")
276
with self.s3client_stub:
277
self.assertFalse(self.s3uploader.file_exists(key))
278
279
# Let's pretend some other unknown exception happened
280
s3mock = mock.Mock()
281
uploader = S3Uploader(s3mock, self.bucket_name)
282
s3mock.head_object = mock.Mock()
283
s3mock.head_object.side_effect = RuntimeError()
284
285
with self.assertRaises(RuntimeError):
286
uploader.file_exists(key)
287
288
def test_file_checksum(self):
289
num_chars = 4096*5
290
data = ''.join(random.choice(string.ascii_uppercase)
291
for _ in range(num_chars)).encode('utf-8')
292
md5 = hashlib.md5()
293
md5.update(data)
294
expected_checksum = md5.hexdigest()
295
296
tempdir = tempfile.mkdtemp()
297
try:
298
filename = os.path.join(tempdir, 'tempfile')
299
with open(filename, 'wb') as f:
300
f.write(data)
301
302
actual_checksum = self.s3uploader.file_checksum(filename)
303
self.assertEqual(expected_checksum, actual_checksum)
304
finally:
305
shutil.rmtree(tempdir)
306
307
@mock.patch("awscli.customizations.s3uploader.get_md5")
308
def test_file_checksum_fips_fallback(self, get_md5_mock):
309
num_chars = 4096*5
310
data = ''.join(random.choice(string.ascii_uppercase)
311
for _ in range(num_chars)).encode('utf-8')
312
checksum = hashlib.sha256(usedforsecurity=False)
313
checksum.update(data)
314
expected_checksum = checksum.hexdigest()
315
316
tempdir = tempfile.mkdtemp()
317
get_md5_mock.side_effect = botocore.exceptions.MD5UnavailableError()
318
try:
319
filename = os.path.join(tempdir, 'tempfile')
320
with open(filename, 'wb') as f:
321
f.write(data)
322
323
actual_checksum = self.s3uploader.file_checksum(filename)
324
self.assertEqual(expected_checksum, actual_checksum)
325
finally:
326
shutil.rmtree(tempdir)
327
328
def test_make_url(self):
329
path = "Hello/how/are/you"
330
expected = "s3://{0}/{1}".format(self.bucket_name, path)
331
self.assertEqual(expected, self.s3uploader.make_url(path))
332
333
def test_to_path_style_s3_url_us_east_1(self):
334
key = "path/to/file"
335
version = "someversion"
336
region = "us-east-1"
337
self._construct_uploader(region)
338
339
s3uploader = S3Uploader(self.s3client, self.bucket_name)
340
result = s3uploader.to_path_style_s3_url(key, version)
341
self.assertEqual(
342
result,
343
"https://s3.amazonaws.com/{0}/{1}?versionId={2}".format(
344
self.bucket_name, key, version))
345
346
# Without versionId, that query parameter should be omitted
347
s3uploader = S3Uploader(self.s3client, self.bucket_name, region)
348
result = s3uploader.to_path_style_s3_url(key)
349
self.assertEqual(
350
result,
351
"https://s3.amazonaws.com/{0}/{1}".format(
352
self.bucket_name, key, version))
353
354
def test_to_path_style_s3_url_other_regions(self):
355
key = "path/to/file"
356
version = "someversion"
357
region = "us-west-2"
358
self._construct_uploader(region)
359
360
s3uploader = S3Uploader(self.s3client, self.bucket_name, region)
361
result = s3uploader.to_path_style_s3_url(key, version)
362
self.assertEqual(
363
result,
364
"https://s3.{0}.amazonaws.com/{1}/{2}?versionId={3}".format(
365
region, self.bucket_name, key, version))
366
367
# Without versionId, that query parameter should be omitted
368
s3uploader = S3Uploader(self.s3client, self.bucket_name, region)
369
result = s3uploader.to_path_style_s3_url(key)
370
self.assertEqual(
371
result,
372
"https://s3.{0}.amazonaws.com/{1}/{2}".format(
373
region, self.bucket_name, key))
374
375
376
def test_to_path_style_s3_url_china_regions(self):
377
key = "path/to/file"
378
version = "someversion"
379
region = "cn-northwest-1"
380
self._construct_uploader(region)
381
382
s3uploader = S3Uploader(self.s3client, self.bucket_name, region)
383
result = s3uploader.to_path_style_s3_url(key, version)
384
self.assertEqual(
385
result,
386
"https://s3.{0}.amazonaws.com.cn/{1}/{2}?versionId={3}".format(
387
region, self.bucket_name, key, version))
388
389
# Without versionId, that query parameter should be omitted
390
s3uploader = S3Uploader(self.s3client, self.bucket_name, region)
391
result = s3uploader.to_path_style_s3_url(key)
392
self.assertEqual(
393
result,
394
"https://s3.{0}.amazonaws.com.cn/{1}/{2}".format(
395
region, self.bucket_name, key))
396
397
def test_artifact_metadata_invalid_type(self):
398
prefix = "SomePrefix"
399
s3uploader = S3Uploader(
400
self.s3client, self.bucket_name, prefix, None, False,
401
self.transfer_manager_mock)
402
invalid_metadata = ["key", "val"]
403
with self.assertRaises(TypeError):
404
s3uploader.artifact_metadata = invalid_metadata
405
406