Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/test_waiters.py
1567 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
from botocore.waiter import WaiterModel
14
from botocore.exceptions import DataNotFoundError
15
16
from awscli.testutils import (
17
mock, unittest,
18
BaseAWSHelpOutputTest,BaseAWSCommandParamsTest
19
)
20
from awscli.customizations.waiters import add_waiters, WaitCommand, \
21
get_waiter_model_from_service_model, WaiterStateCommand, WaiterCaller, \
22
WaiterStateDocBuilder, WaiterStateCommandBuilder
23
24
25
class TestAddWaiters(unittest.TestCase):
26
def setUp(self):
27
self.service_model = mock.Mock()
28
self.session = mock.Mock()
29
30
self.command_object = mock.Mock()
31
self.command_object.service_model = self.service_model
32
33
# Set up the mock session.
34
self.session.get_waiter_model.return_value = WaiterModel(
35
{
36
'version': 2,
37
'waiters': {
38
'FooExists': {},
39
}
40
}
41
)
42
43
def test_add_waiters(self):
44
command_table = {}
45
add_waiters(command_table, self.session, self.command_object)
46
# Make sure a wait command was added.
47
self.assertIn('wait', command_table)
48
self.assertIsInstance(command_table['wait'], WaitCommand)
49
50
def test_add_waiters_no_waiter_names(self):
51
self.session.get_waiter_model.return_value = WaiterModel(
52
{
53
'version': 2,
54
# No waiters are specified.
55
'waiters': {}
56
}
57
)
58
command_table = {}
59
add_waiters(command_table, self.session, self.command_object)
60
# Make sure that no wait command was added since the service object
61
# has no waiters.
62
self.assertEqual(command_table, {})
63
64
def test_add_waiters_no_service_object(self):
65
command_table = {}
66
self.command_object.service_model = None
67
add_waiters(command_table, self.session, self.command_object)
68
# Make sure that no wait command was added since no service object
69
# was passed in.
70
self.assertEqual(command_table, {})
71
72
def test_add_waiter_no_waiter_config(self):
73
self.session.get_waiter_model.side_effect = DataNotFoundError(
74
data_path='foo')
75
command_table = {}
76
add_waiters(command_table, self.session, self.command_object)
77
self.assertEqual(command_table, {})
78
79
80
class TestServicetoWaiterModel(unittest.TestCase):
81
def test_service_object_to_waiter_model(self):
82
service_model = mock.Mock()
83
session = mock.Mock()
84
service_model.service_name = 'service'
85
service_model.api_version = '2014-01-01'
86
get_waiter_model_from_service_model(session, service_model)
87
session.get_waiter_model.assert_called_with('service', '2014-01-01')
88
89
def test_can_handle_data_errors(self):
90
service_model = mock.Mock()
91
session = mock.Mock()
92
service_model.service_name = 'service'
93
service_model.api_version = '2014-01-01'
94
session.get_waiter_model.side_effect = DataNotFoundError(
95
data_path='foo')
96
self.assertIsNone(
97
get_waiter_model_from_service_model(session, service_model))
98
99
100
class TestWaitCommand(unittest.TestCase):
101
def setUp(self):
102
self.session = mock.Mock()
103
self.model = WaiterModel({
104
'version': 2,
105
'waiters': {
106
'Foo': {
107
'operation': 'foo', 'maxAttempts': 1, 'delay': 1,
108
'acceptors': [],
109
}
110
}
111
})
112
self.service_model = mock.Mock()
113
self.cmd = WaitCommand(self.session, self.model, self.service_model)
114
115
def test_passes_on_lineage(self):
116
child_cmd = self.cmd.subcommand_table['foo']
117
self.assertEqual(len(child_cmd.lineage), 2)
118
self.assertEqual(child_cmd.lineage[0], self.cmd)
119
self.assertIsInstance(child_cmd.lineage[1], WaiterStateCommand)
120
121
def test_run_main_error(self):
122
self.parsed_args = mock.Mock()
123
self.parsed_args.subcommand = None
124
with self.assertRaises(ValueError):
125
self.cmd._run_main(self.parsed_args, None)
126
127
128
class TestWaitHelpOutput(BaseAWSHelpOutputTest):
129
def test_wait_command_is_in_list(self):
130
self.driver.main(['ec2', 'help'])
131
self.assert_contains('* wait')
132
133
def test_wait_help_command(self):
134
self.driver.main(['ec2', 'wait', 'help'])
135
self.assert_contains('.. _cli:aws ec2 wait:')
136
self.assert_contains('Wait until a particular condition is satisfied.')
137
self.assert_contains('* instance-running')
138
self.assert_contains('* vpc-available')
139
140
def test_wait_state_help_command(self):
141
self.driver.main(['ec2', 'wait', 'instance-running', 'help'])
142
self.assert_contains('.. _cli:aws ec2 wait instance-running:')
143
self.assert_contains(
144
'Wait until JMESPath query Reservations[].Instances[].State.Name')
145
self.assert_contains('poll every')
146
self.assert_contains('This will exit with a return code of 255 after')
147
self.assert_contains('``describe-instances``')
148
self.assert_contains('[--filters <value>]')
149
self.assert_contains('``--filters`` (list)')
150
self.assert_contains('======\nOutput\n======\n\nNone')
151
152
153
class TestWait(BaseAWSCommandParamsTest):
154
""" This is merely a smoke test.
155
156
Its purpose is to test that the wait command can be run proberly for
157
various services. It is by no means exhaustive.
158
"""
159
def test_ec2_instance_running(self):
160
cmdline = 'ec2 wait instance-running'
161
cmdline += ' --instance-ids i-12345678 i-87654321'
162
cmdline += """ --filters {"Name":"group-name","Values":["foobar"]}"""
163
result = {'Filters': [{'Name': 'group-name',
164
'Values': ['foobar']}],
165
'InstanceIds': ['i-12345678', 'i-87654321']}
166
self.parsed_response = {
167
'Reservations': [{
168
'Instances': [{
169
'State': {
170
'Name': 'running'
171
}
172
}]
173
}]
174
}
175
self.assert_params_for_cmd(cmdline, result)
176
177
def test_dynamodb_table_exists(self):
178
cmdline = 'dynamodb wait table-exists'
179
cmdline += ' --table-name mytable'
180
result = {"TableName": "mytable"}
181
self.parsed_response = {'Table': {'TableStatus': 'ACTIVE'}}
182
self.assert_params_for_cmd(cmdline, result)
183
184
def test_elastictranscoder_jobs_complete(self):
185
cmdline = 'rds wait db-instance-available'
186
cmdline += ' --db-instance-identifier abc'
187
result = {'DBInstanceIdentifier': 'abc'}
188
self.parsed_response = {
189
'DBInstances': [{
190
'DBInstanceStatus': 'available'
191
}]
192
}
193
self.assert_params_for_cmd(cmdline, result)
194
195
196
class TestWaiterStateCommandBuilder(unittest.TestCase):
197
def setUp(self):
198
self.session = mock.Mock()
199
self.service_model = mock.Mock()
200
201
# Create some waiters.
202
self.model = WaiterModel({
203
'version': 2,
204
'waiters': {
205
'InstanceRunning': {
206
'description': 'My waiter description.',
207
'delay': 1,
208
'maxAttempts': 10,
209
'operation': 'MyOperation',
210
},
211
'BucketExists': {
212
'description': 'My waiter description.',
213
'operation': 'MyOperation',
214
'delay': 1,
215
'maxAttempts': 10,
216
}
217
}
218
})
219
220
self.waiter_builder = WaiterStateCommandBuilder(
221
self.session,
222
self.model,
223
self.service_model
224
)
225
226
def test_build_waiter_state_cmds(self):
227
subcommand_table = {}
228
self.waiter_builder.build_all_waiter_state_cmds(subcommand_table)
229
# Check the commands are in the command table
230
self.assertEqual(len(subcommand_table), 2)
231
self.assertIn('instance-running', subcommand_table)
232
self.assertIn('bucket-exists', subcommand_table)
233
234
# Make sure that the correct operation object was used.
235
self.service_model.operation_model.assert_called_with('MyOperation')
236
237
# Introspect the commands in the command table
238
instance_running_cmd = subcommand_table['instance-running']
239
bucket_exists_cmd = subcommand_table['bucket-exists']
240
241
# Check that the instance type is correct.
242
self.assertIsInstance(instance_running_cmd, WaiterStateCommand)
243
self.assertIsInstance(bucket_exists_cmd, WaiterStateCommand)
244
245
# Check the descriptions are set correctly.
246
self.assertEqual(
247
instance_running_cmd.DESCRIPTION,
248
'My waiter description. It will poll every 1 seconds until '
249
'a successful state has been reached. This will exit with a '
250
'return code of 255 after 10 failed checks.'
251
)
252
self.assertEqual(
253
bucket_exists_cmd.DESCRIPTION,
254
'My waiter description. It will poll every 1 seconds until '
255
'a successful state has been reached. This will exit with a '
256
'return code of 255 after 10 failed checks.'
257
)
258
259
260
class TestWaiterStateDocBuilder(unittest.TestCase):
261
def setUp(self):
262
self.waiter_config = mock.Mock()
263
self.waiter_config.description = ''
264
self.waiter_config.operation = 'MyOperation'
265
self.waiter_config.delay = 5
266
self.waiter_config.max_attempts = 20
267
268
# Set up the acceptors.
269
self.success_acceptor = mock.Mock()
270
self.success_acceptor.state = 'success'
271
self.fail_acceptor = mock.Mock()
272
self.fail_acceptor.state = 'failure'
273
self.error_acceptor = mock.Mock()
274
self.error_acceptor.state = 'error'
275
self.waiter_config.acceptors = [
276
self.fail_acceptor,
277
self.success_acceptor,
278
self.error_acceptor
279
]
280
281
self.doc_builder = WaiterStateDocBuilder(self.waiter_config)
282
283
def test_config_provided_description(self):
284
# Description is provided by the config file
285
self.waiter_config.description = 'My description.'
286
description = self.doc_builder.build_waiter_state_description()
287
self.assertEqual(
288
description,
289
'My description. It will poll every 5 seconds until a '
290
'successful state has been reached. This will exit with a '
291
'return code of 255 after 20 failed checks.')
292
293
def test_error_acceptor(self):
294
self.success_acceptor.matcher = 'error'
295
self.success_acceptor.expected = 'MyException'
296
description = self.doc_builder.build_waiter_state_description()
297
self.assertEqual(
298
description,
299
'Wait until MyException is thrown when polling with '
300
'``my-operation``. It will poll every 5 seconds until a '
301
'successful state has been reached. This will exit with a '
302
'return code of 255 after 20 failed checks.'
303
)
304
305
def test_status_acceptor(self):
306
self.success_acceptor.matcher = 'status'
307
self.success_acceptor.expected = 200
308
description = self.doc_builder.build_waiter_state_description()
309
self.assertEqual(
310
description,
311
'Wait until 200 response is received when polling with '
312
'``my-operation``. It will poll every 5 seconds until a '
313
'successful state has been reached. This will exit with a '
314
'return code of 255 after 20 failed checks.'
315
)
316
317
def test_path_acceptor(self):
318
self.success_acceptor.matcher = 'path'
319
self.success_acceptor.argument = 'MyResource.name'
320
self.success_acceptor.expected = 'running'
321
description = self.doc_builder.build_waiter_state_description()
322
self.assertEqual(
323
description,
324
'Wait until JMESPath query MyResource.name returns running when '
325
'polling with ``my-operation``. It will poll every 5 seconds '
326
'until a successful state has been reached. This will exit with '
327
'a return code of 255 after 20 failed checks.'
328
)
329
330
def test_path_all_acceptor(self):
331
self.success_acceptor.matcher = 'pathAll'
332
self.success_acceptor.argument = 'MyResource[].name'
333
self.success_acceptor.expected = 'running'
334
description = self.doc_builder.build_waiter_state_description()
335
self.assertEqual(
336
description,
337
'Wait until JMESPath query MyResource[].name returns running for '
338
'all elements when polling with ``my-operation``. It will poll '
339
'every 5 seconds until a successful state has been reached. '
340
'This will exit with a return code of 255 after 20 failed checks.'
341
)
342
343
def test_path_any_acceptor(self):
344
self.success_acceptor.matcher = 'pathAny'
345
self.success_acceptor.argument = 'MyResource[].name'
346
self.success_acceptor.expected = 'running'
347
description = self.doc_builder.build_waiter_state_description()
348
self.assertEqual(
349
description,
350
'Wait until JMESPath query MyResource[].name returns running for '
351
'any element when polling with ``my-operation``. It will poll '
352
'every 5 seconds until a successful state has been reached. '
353
'This will exit with a return code of 255 after 20 failed checks.'
354
)
355
356
357
class TestWaiterCaller(unittest.TestCase):
358
def test_invoke(self):
359
waiter = mock.Mock()
360
waiter_name = 'my_waiter'
361
session = mock.Mock()
362
session.create_client.return_value.get_waiter.return_value = waiter
363
364
parameters = {'Foo': 'bar', 'Baz': 'biz'}
365
parsed_globals = mock.Mock()
366
parsed_globals.region = 'us-east-1'
367
parsed_globals.endpoint_url = 'myurl'
368
parsed_globals.verify_ssl = True
369
370
waiter_caller = WaiterCaller(session, waiter_name)
371
waiter_caller.invoke('myservice', 'MyWaiter', parameters,
372
parsed_globals)
373
374
# Make sure the client was created properly.
375
session.create_client.assert_called_with(
376
'myservice',
377
region_name=parsed_globals.region,
378
endpoint_url=parsed_globals.endpoint_url,
379
verify=parsed_globals.verify_ssl
380
)
381
382
# Make sure we got the correct waiter.
383
session.create_client.return_value.get_waiter.assert_called_with(
384
waiter_name)
385
386
# Ensure the wait command was called properly.
387
waiter.wait.assert_called_with(
388
Foo='bar', Baz='biz')
389
390