Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/cloudtrail/test_subscribe.py
1569 views
1
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import json
14
15
from botocore.client import ClientError
16
from botocore.session import Session
17
18
from tests.unit.test_clidriver import FakeSession
19
from awscli.customizations.cloudtrail.subscribe import CloudTrailError, CloudTrailSubscribe
20
from awscli.compat import BytesIO
21
from awscli.testutils import BaseAWSCommandParamsTest
22
from awscli.testutils import mock, unittest, temporary_file
23
24
25
class TestCreateSubscription(BaseAWSCommandParamsTest):
26
def test_create_subscription_has_zero_rc(self):
27
command = (
28
'cloudtrail create-subscription --s3-use-bucket foo --name bar')
29
stdout = self.run_cmd(command, expected_rc=0)[0]
30
# We don't want to overspecify here, but we'll do a quick check to make
31
# sure it says log delivery is happening.
32
self.assertIn('Logs will be delivered to foo', stdout)
33
34
@mock.patch.object(Session, 'create_client')
35
def test_policy_from_paramfile(self, create_client_mock):
36
client = mock.Mock()
37
# S3 mock calls
38
client.get_caller_identity.return_value = {'Account': ''}
39
client.head_bucket.side_effect = ClientError(
40
{'Error': {'Code': 404, 'Message': ''}}, 'HeadBucket')
41
# CloudTrail mock call
42
client.describe_trails.return_value = {}
43
create_client_mock.return_value = client
44
45
policy = '{"Statement": []}'
46
47
with temporary_file('w') as f:
48
f.write(policy)
49
f.flush()
50
command = (
51
'cloudtrail create-subscription --s3-new-bucket foo '
52
'--name bar --s3-custom-policy file://{0}'.format(f.name))
53
self.run_cmd(command, expected_rc=0)
54
55
# Ensure that the *contents* of the file are sent as the policy
56
# parameter to S3.
57
client.put_bucket_policy.assert_called_with(
58
Bucket='foo', Policy=policy)
59
60
61
class TestCloudTrailCommand(unittest.TestCase):
62
def setUp(self):
63
self.session = FakeSession({'config_file': 'myconfigfile'})
64
self.subscribe = CloudTrailSubscribe(self.session)
65
self.subscribe.region_name = 'us-east-1'
66
67
self.subscribe.sts = mock.Mock()
68
self.subscribe.sts.get_caller_identity = mock.Mock(
69
return_value={'Account': '123456'})
70
71
self.subscribe.s3 = mock.Mock()
72
self.subscribe.s3.meta.region_name = 'us-east-1'
73
policy_template = BytesIO(u'{"Statement": []}'.encode('latin-1'))
74
self.subscribe.s3.get_object = mock.Mock(
75
return_value={'Body': policy_template})
76
self.subscribe.s3.head_bucket.return_value = {}
77
78
self.subscribe.sns = mock.Mock()
79
self.subscribe.sns.meta.region_name = 'us-east-1'
80
self.subscribe.sns.list_topics = mock.Mock(
81
return_value={'Topics': [{'TopicArn': ':test2'}]})
82
self.subscribe.sns.create_topic = mock.Mock(
83
return_value={'TopicArn': 'foo'})
84
self.subscribe.sns.get_topic_attributes = mock.Mock(
85
return_value={'Attributes': {'Policy': '{"Statement": []}'}})
86
87
def test_clients_all_from_same_session(self):
88
session = mock.Mock()
89
subscribe_command = CloudTrailSubscribe(session)
90
parsed_globals = mock.Mock(region=None, verify_ssl=None,
91
endpoint_url=None)
92
subscribe_command.setup_services(None, parsed_globals)
93
create_client_calls = session.create_client.call_args_list
94
self.assertEqual(
95
create_client_calls, [
96
mock.call('sts', verify=None, region_name=None),
97
mock.call('s3', verify=None, region_name=None),
98
mock.call('sns', verify=None, region_name=None),
99
mock.call('cloudtrail', verify=None, region_name=None),
100
]
101
)
102
103
def test_endpoint_url_is_only_used_for_cloudtrail(self):
104
endpoint_url = 'https://mycloudtrail.awsamazon.com/'
105
session = mock.Mock()
106
subscribe_command = CloudTrailSubscribe(session)
107
parsed_globals = mock.Mock(region=None, verify_ssl=None,
108
endpoint_url=endpoint_url)
109
subscribe_command.setup_services(None, parsed_globals)
110
create_client_calls = session.create_client.call_args_list
111
self.assertEqual(
112
create_client_calls, [
113
mock.call('sts', verify=None, region_name=None),
114
mock.call('s3', verify=None, region_name=None),
115
mock.call('sns', verify=None, region_name=None),
116
# Here we should inject the endpoint_url only for cloudtrail.
117
mock.call('cloudtrail', verify=None, region_name=None,
118
endpoint_url=endpoint_url),
119
]
120
)
121
122
def test_s3_create(self):
123
sts = self.subscribe.sts
124
s3 = self.subscribe.s3
125
s3.head_bucket.side_effect = ClientError(
126
{'Error': {'Code': '404', 'Message': ''}}, 'HeadBucket')
127
128
self.subscribe.setup_new_bucket('test', 'logs')
129
130
sts.get_caller_identity.assert_called_with()
131
132
s3.get_object.assert_called_with(
133
Bucket='awscloudtrail-policy-us-east-1',
134
Key='policy/S3/AWSCloudTrail-S3BucketPolicy-2014-12-17.json',
135
)
136
s3.create_bucket.assert_called_with(Bucket='test')
137
s3.put_bucket_policy.assert_called_with(
138
Bucket='test', Policy=u'{"Statement": []}'
139
)
140
141
self.assertFalse(s3.delete_bucket.called)
142
143
args, kwargs = s3.create_bucket.call_args
144
self.assertNotIn('create_bucket_configuration', kwargs)
145
146
def test_s3_uses_regionalized_policy(self):
147
s3 = self.subscribe.s3
148
s3.head_bucket.side_effect = ClientError(
149
{'Error': {'Code': '404', 'Message': ''}}, 'HeadBucket')
150
151
self.subscribe.setup_new_bucket('test', 'logs')
152
153
s3.get_object.assert_called_with(
154
Bucket='awscloudtrail-policy-us-east-1', Key=mock.ANY)
155
156
def test_s3_create_non_us_east_1(self):
157
# Because this is outside of us-east-1, it should create
158
# a bucket configuration with a location constraint.
159
s3 = self.subscribe.s3
160
self.subscribe.region_name = 'us-west-2'
161
s3.head_bucket.side_effect = ClientError(
162
{'Error': {'Code': '404', 'Message': ''}}, 'HeadBucket')
163
164
self.subscribe.setup_new_bucket('test', 'logs')
165
166
args, kwargs = s3.create_bucket.call_args
167
self.assertIn('CreateBucketConfiguration', kwargs)
168
169
bucket_config = kwargs['CreateBucketConfiguration']
170
self.assertEqual(bucket_config['LocationConstraint'],
171
'us-west-2')
172
173
def test_s3_create_already_exists(self):
174
with self.assertRaises(Exception):
175
self.subscribe.setup_new_bucket('test2', 'logs')
176
177
def test_s3_custom_policy(self):
178
s3 = self.subscribe.s3
179
s3.head_bucket.side_effect = ClientError(
180
{'Error': {'Code': '404', 'Message': ''}}, 'HeadBucket')
181
182
self.subscribe.setup_new_bucket('test', 'logs', custom_policy='{}')
183
184
s3.get_object.assert_not_called()
185
s3.put_bucket_policy.assert_called_with(Bucket='test', Policy='{}')
186
187
def test_s3_create_set_policy_fail(self):
188
s3 = self.subscribe.s3
189
orig = s3.put_bucket_policy
190
s3.put_bucket_policy = mock.Mock(side_effect=Exception('Error!'))
191
192
with self.assertRaises(Exception):
193
self.subscribe.setup_new_bucket('test', 'logs')
194
195
def test_s3_get_policy_fail(self):
196
self.subscribe.s3.get_object = mock.Mock(side_effect=Exception('Foo!'))
197
198
with self.assertRaises(CloudTrailError) as cm:
199
self.subscribe.setup_new_bucket('test', 'logs')
200
201
# Exception should contain its custom message, the region
202
# where there is an issue, and the original exception message.
203
self.assertIn('us-east-1', str(cm.exception))
204
self.assertIn('Foo!', str(cm.exception))
205
206
def test_get_policy_read_timeout(self):
207
response = {
208
'Body': mock.Mock()
209
}
210
response['Body'].read.side_effect = Exception('Error!')
211
self.subscribe.s3.get_object.return_value = response
212
213
with self.assertRaises(CloudTrailError):
214
self.subscribe.setup_new_bucket('test', 'logs')
215
216
def test_sns_get_policy_fail(self):
217
self.subscribe.s3.get_object = mock.Mock(side_effect=Exception('Error!'))
218
219
with self.assertRaises(Exception):
220
self.subscribe.setup_new_bucket('test', 'logs')
221
222
def test_sns_create(self):
223
s3 = self.subscribe.s3
224
sns = self.subscribe.sns
225
226
self.subscribe.setup_new_topic('test')
227
228
s3.get_object.assert_called_with(
229
Bucket='awscloudtrail-policy-us-east-1',
230
Key='policy/SNS/AWSCloudTrail-SnsTopicPolicy-2014-12-17.json',
231
)
232
sns.list_topics.assert_called_with()
233
sns.create_topic.assert_called_with(Name='test')
234
sns.set_topic_attributes.assert_called_with(
235
AttributeName='Policy',
236
AttributeValue='{"Statement": []}',
237
TopicArn='foo',
238
)
239
240
self.assertFalse(sns.delete_topic.called)
241
242
def test_sns_uses_regionalized_policy(self):
243
s3 = self.subscribe.s3
244
245
self.subscribe.setup_new_topic('test')
246
247
s3.get_object.assert_called_with(
248
Bucket='awscloudtrail-policy-us-east-1', Key=mock.ANY)
249
250
def test_sns_custom_policy(self):
251
s3 = self.subscribe.s3
252
sns = self.subscribe.sns
253
sns.get_topic_attributes.return_value = {
254
'Attributes': {
255
'Policy': '{"Statement": []}'
256
}
257
}
258
259
policy = '{"Statement": []}'
260
261
self.subscribe.setup_new_topic('test', custom_policy=policy)
262
263
s3.get_object.assert_not_called()
264
sns.set_topic_attributes.assert_called_with(
265
TopicArn=mock.ANY, AttributeName='Policy', AttributeValue=policy)
266
267
def test_sns_create_already_exists(self):
268
with self.assertRaises(Exception):
269
self.subscribe.setup_new_topic('test2')
270
271
def test_cloudtrail_new_call_format(self):
272
self.subscribe.cloudtrail = mock.Mock()
273
self.subscribe.cloudtrail.create_trail = mock.Mock(return_value={})
274
self.subscribe.cloudtrail.describe_trail = mock.Mock(return_value={})
275
276
self.subscribe.upsert_cloudtrail_config('test', 'bucket', 'prefix',
277
'topic', True)
278
279
self.subscribe.cloudtrail.create_trail.assert_called_with(
280
Name='test',
281
S3BucketName='bucket',
282
S3KeyPrefix='prefix',
283
SnsTopicName='topic',
284
IncludeGlobalServiceEvents=True
285
)
286
287
def test_sns_policy_merge(self):
288
left = '''
289
{
290
"Version":"2008-10-17",
291
"Id":"us-east-1/698519295917/test__default_policy_ID",
292
"Statement":[
293
{
294
"Effect":"Allow",
295
"Sid":"us-east-1/698519295917/test__default_statement_ID",
296
"Principal":{
297
"AWS":"*"
298
},
299
"Action":[
300
"SNS:GetTopicAttributes",
301
"SNS:SetTopicAttributes",
302
"SNS:AddPermission",
303
"SNS:RemovePermission",
304
"SNS:DeleteTopic",
305
"SNS:Subscribe",
306
"SNS:ListSubscriptionsByTopic",
307
"SNS:Publish",
308
"SNS:Receive"
309
],
310
"Resource":"arn:aws:sns:us-east-1:698519295917:test",
311
"Condition":{
312
"StringLike":{
313
"AWS:SourceArn":"arn:aws:*:*:698519295917:*"
314
}
315
}
316
}
317
]
318
}'''
319
right = '''
320
{
321
"Version":"2008-10-17",
322
"Id":"us-east-1/698519295917/test_foo",
323
"Statement":[
324
{
325
"Effect":"Allow",
326
"Sid":"us-east-1/698519295917/test_foo_ID",
327
"Principal":{
328
"AWS":"*"
329
},
330
"Action":[
331
"SNS:GetTopicAttributes",
332
"SNS:SetTopicAttributes",
333
"SNS:AddPermission",
334
"SNS:RemovePermission",
335
"SNS:DeleteTopic",
336
"SNS:Subscribe",
337
"SNS:ListSubscriptionsByTopic",
338
"SNS:Publish",
339
"SNS:Receive"
340
],
341
"Resource":"arn:aws:sns:us-east-1:698519295917:test",
342
"Condition":{
343
"StringLike":{
344
"AWS:SourceArn":"arn:aws:*:*:698519295917:*"
345
}
346
}
347
}
348
]
349
}'''
350
expected = '''
351
{
352
"Version":"2008-10-17",
353
"Id":"us-east-1/698519295917/test__default_policy_ID",
354
"Statement":[
355
{
356
"Effect":"Allow",
357
"Sid":"us-east-1/698519295917/test__default_statement_ID",
358
"Principal":{
359
"AWS":"*"
360
},
361
"Action":[
362
"SNS:GetTopicAttributes",
363
"SNS:SetTopicAttributes",
364
"SNS:AddPermission",
365
"SNS:RemovePermission",
366
"SNS:DeleteTopic",
367
"SNS:Subscribe",
368
"SNS:ListSubscriptionsByTopic",
369
"SNS:Publish",
370
"SNS:Receive"
371
],
372
"Resource":"arn:aws:sns:us-east-1:698519295917:test",
373
"Condition":{
374
"StringLike":{
375
"AWS:SourceArn":"arn:aws:*:*:698519295917:*"
376
}
377
}
378
},
379
{
380
"Effect":"Allow",
381
"Sid":"us-east-1/698519295917/test_foo_ID",
382
"Principal":{
383
"AWS":"*"
384
},
385
"Action":[
386
"SNS:GetTopicAttributes",
387
"SNS:SetTopicAttributes",
388
"SNS:AddPermission",
389
"SNS:RemovePermission",
390
"SNS:DeleteTopic",
391
"SNS:Subscribe",
392
"SNS:ListSubscriptionsByTopic",
393
"SNS:Publish",
394
"SNS:Receive"
395
],
396
"Resource":"arn:aws:sns:us-east-1:698519295917:test",
397
"Condition":{
398
"StringLike":{
399
"AWS:SourceArn":"arn:aws:*:*:698519295917:*"
400
}
401
}
402
}
403
]
404
}'''
405
406
merged = self.subscribe.merge_sns_policy(left, right)
407
408
self.assertEqual(json.loads(expected), json.loads(merged))
409
410