Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/datapipeline/__init__.py
1567 views
1
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import json
15
from datetime import timedelta
16
17
from awscli.formatter import get_formatter
18
from awscli.arguments import CustomArgument
19
from awscli.compat import get_current_datetime
20
from awscli.customizations.commands import BasicCommand
21
from awscli.customizations.datapipeline import translator
22
from awscli.customizations.datapipeline.createdefaultroles \
23
import CreateDefaultRoles
24
from awscli.customizations.datapipeline.listrunsformatter \
25
import ListRunsFormatter
26
from awscli.utils import create_nested_client
27
28
DEFINITION_HELP_TEXT = """\
29
The JSON pipeline definition. If the pipeline definition
30
is in a file you can use the file://<filename> syntax to
31
specify a filename.
32
"""
33
PARAMETER_OBJECTS_HELP_TEXT = """\
34
The JSON parameter objects. If the parameter objects are
35
in a file you can use the file://<filename> syntax to
36
specify a filename. You can optionally provide these in
37
pipeline definition as well. Parameter objects provided
38
on command line would replace the one in definition.
39
"""
40
PARAMETER_VALUES_HELP_TEXT = """\
41
The JSON parameter values. If the parameter values are
42
in a file you can use the file://<filename> syntax to
43
specify a filename. You can optionally provide these in
44
pipeline definition as well. Parameter values provided
45
on command line would replace the one in definition.
46
"""
47
INLINE_PARAMETER_VALUES_HELP_TEXT = """\
48
The JSON parameter values. You can specify these as
49
key-value pairs in the key=value format. Multiple parameters
50
are separated by a space. For list type parameter values
51
you can use the same key name and specify each value as
52
a key value pair. e.g. arrayValue=value1 arrayValue=value2
53
"""
54
MAX_ITEMS_PER_DESCRIBE = 100
55
56
57
class DocSectionNotFoundError(Exception):
58
pass
59
60
61
class ParameterDefinitionError(Exception):
62
def __init__(self, msg):
63
full_msg = ("Error in parameter: %s\n" % msg)
64
super(ParameterDefinitionError, self).__init__(full_msg)
65
self.msg = msg
66
67
68
def register_customizations(cli):
69
cli.register(
70
'building-argument-table.datapipeline.put-pipeline-definition',
71
add_pipeline_definition)
72
cli.register(
73
'building-argument-table.datapipeline.activate-pipeline',
74
activate_pipeline_definition)
75
cli.register(
76
'after-call.datapipeline.GetPipelineDefinition',
77
translate_definition)
78
cli.register(
79
'building-command-table.datapipeline',
80
register_commands)
81
cli.register_last(
82
'doc-output.datapipeline.get-pipeline-definition',
83
document_translation)
84
85
86
def register_commands(command_table, session, **kwargs):
87
command_table['list-runs'] = ListRunsCommand(session)
88
command_table['create-default-roles'] = CreateDefaultRoles(session)
89
90
91
def document_translation(help_command, **kwargs):
92
# Remove all the writes until we get to the output.
93
# I don't think this is the ideal way to do this, we should
94
# improve our plugin/doc system to make this easier.
95
doc = help_command.doc
96
current = ''
97
while current != '======\nOutput\n======':
98
try:
99
current = doc.pop_write()
100
except IndexError:
101
# This should never happen, but in the rare case that it does
102
# we should be raising something with a helpful error message.
103
raise DocSectionNotFoundError(
104
'Could not find the "output" section for the command: %s'
105
% help_command)
106
doc.write('======\nOutput\n======')
107
doc.write(
108
'\nThe output of this command is the pipeline definition, which'
109
' is documented in the '
110
'`Pipeline Definition File Syntax '
111
'<http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/'
112
'dp-writing-pipeline-definition.html>`__')
113
114
115
def add_pipeline_definition(argument_table, **kwargs):
116
argument_table['pipeline-definition'] = PipelineDefinitionArgument(
117
'pipeline-definition', required=True,
118
help_text=DEFINITION_HELP_TEXT)
119
120
argument_table['parameter-objects'] = ParameterObjectsArgument(
121
'parameter-objects', required=False,
122
help_text=PARAMETER_OBJECTS_HELP_TEXT)
123
124
argument_table['parameter-values-uri'] = ParameterValuesArgument(
125
'parameter-values-uri',
126
required=False,
127
help_text=PARAMETER_VALUES_HELP_TEXT)
128
129
# Need to use an argument model for inline parameters to accept a list
130
argument_table['parameter-values'] = ParameterValuesInlineArgument(
131
'parameter-values',
132
required=False,
133
nargs='+',
134
help_text=INLINE_PARAMETER_VALUES_HELP_TEXT)
135
136
# The pipeline-objects is no longer needed required because
137
# a user can provide a pipeline-definition instead.
138
# get-pipeline-definition also displays the output in the
139
# translated format.
140
141
del argument_table['pipeline-objects']
142
143
144
def activate_pipeline_definition(argument_table, **kwargs):
145
argument_table['parameter-values-uri'] = ParameterValuesArgument(
146
'parameter-values-uri', required=False,
147
help_text=PARAMETER_VALUES_HELP_TEXT)
148
149
# Need to use an argument model for inline parameters to accept a list
150
argument_table['parameter-values'] = ParameterValuesInlineArgument(
151
'parameter-values',
152
required=False,
153
nargs='+',
154
help_text=INLINE_PARAMETER_VALUES_HELP_TEXT,
155
)
156
157
158
def translate_definition(parsed, **kwargs):
159
translator.api_to_definition(parsed)
160
161
162
def convert_described_objects(api_describe_objects, sort_key_func=None):
163
# We need to take a field list that looks like this:
164
# {u'key': u'@sphere', u'stringValue': u'INSTANCE'},
165
# into {"@sphere": "INSTANCE}.
166
# We convert the fields list into a field dict.
167
converted = []
168
for obj in api_describe_objects:
169
new_fields = {
170
'@id': obj['id'],
171
'name': obj['name'],
172
}
173
for field in obj['fields']:
174
new_fields[field['key']] = field.get('stringValue',
175
field.get('refValue'))
176
converted.append(new_fields)
177
if sort_key_func is not None:
178
converted.sort(key=sort_key_func)
179
return converted
180
181
182
class QueryArgBuilder(object):
183
"""
184
Convert CLI arguments to Query arguments used by QueryObject.
185
"""
186
def __init__(self, current_time=None):
187
if current_time is None:
188
current_time = get_current_datetime()
189
self.current_time = current_time
190
191
def build_query(self, parsed_args):
192
selectors = []
193
if parsed_args.start_interval is None and \
194
parsed_args.schedule_interval is None:
195
# If no intervals are specified, default
196
# to a start time of 4 days ago and an end time
197
# of right now.
198
end_datetime = self.current_time
199
start_datetime = end_datetime - timedelta(days=4)
200
start_time_str = start_datetime.strftime('%Y-%m-%dT%H:%M:%S')
201
end_time_str = end_datetime.strftime('%Y-%m-%dT%H:%M:%S')
202
selectors.append({
203
'fieldName': '@actualStartTime',
204
'operator': {
205
'type': 'BETWEEN',
206
'values': [start_time_str, end_time_str]
207
}
208
})
209
else:
210
self._build_schedule_times(selectors, parsed_args)
211
if parsed_args.status is not None:
212
self._build_status(selectors, parsed_args)
213
query = {'selectors': selectors}
214
return query
215
216
def _build_schedule_times(self, selectors, parsed_args):
217
if parsed_args.start_interval is not None:
218
start_time_str = parsed_args.start_interval[0]
219
end_time_str = parsed_args.start_interval[1]
220
selectors.append({
221
'fieldName': '@actualStartTime',
222
'operator': {
223
'type': 'BETWEEN',
224
'values': [start_time_str, end_time_str]
225
}
226
})
227
if parsed_args.schedule_interval is not None:
228
start_time_str = parsed_args.schedule_interval[0]
229
end_time_str = parsed_args.schedule_interval[1]
230
selectors.append({
231
'fieldName': '@scheduledStartTime',
232
'operator': {
233
'type': 'BETWEEN',
234
'values': [start_time_str, end_time_str]
235
}
236
})
237
238
def _build_status(self, selectors, parsed_args):
239
selectors.append({
240
'fieldName': '@status',
241
'operator': {
242
'type': 'EQ',
243
'values': [status.upper() for status in parsed_args.status]
244
}
245
})
246
247
248
class PipelineDefinitionArgument(CustomArgument):
249
def add_to_params(self, parameters, value):
250
if value is None:
251
return
252
parsed = json.loads(value)
253
api_objects = translator.definition_to_api_objects(parsed)
254
parameter_objects = translator.definition_to_api_parameters(parsed)
255
parameter_values = translator.definition_to_parameter_values(parsed)
256
parameters['pipelineObjects'] = api_objects
257
# Use Parameter objects and values from def if not already provided
258
if 'parameterObjects' not in parameters \
259
and parameter_objects is not None:
260
parameters['parameterObjects'] = parameter_objects
261
if 'parameterValues' not in parameters \
262
and parameter_values is not None:
263
parameters['parameterValues'] = parameter_values
264
265
266
class ParameterObjectsArgument(CustomArgument):
267
def add_to_params(self, parameters, value):
268
if value is None:
269
return
270
parsed = json.loads(value)
271
parameter_objects = translator.definition_to_api_parameters(parsed)
272
parameters['parameterObjects'] = parameter_objects
273
274
275
class ParameterValuesArgument(CustomArgument):
276
def add_to_params(self, parameters, value):
277
278
if value is None:
279
return
280
281
if parameters.get('parameterValues', None) is not None:
282
raise Exception(
283
"Only parameter-values or parameter-values-uri is allowed"
284
)
285
286
parsed = json.loads(value)
287
parameter_values = translator.definition_to_parameter_values(parsed)
288
parameters['parameterValues'] = parameter_values
289
290
291
class ParameterValuesInlineArgument(CustomArgument):
292
def add_to_params(self, parameters, value):
293
294
if value is None:
295
return
296
297
if parameters.get('parameterValues', None) is not None:
298
raise Exception(
299
"Only parameter-values or parameter-values-uri is allowed"
300
)
301
302
parameter_object = {}
303
# break string into = point
304
for argument in value:
305
try:
306
argument_components = argument.split('=', 1)
307
key = argument_components[0]
308
value = argument_components[1]
309
if key in parameter_object:
310
if isinstance(parameter_object[key], list):
311
parameter_object[key].append(value)
312
else:
313
parameter_object[key] = [parameter_object[key], value]
314
else:
315
parameter_object[key] = value
316
except IndexError:
317
raise ParameterDefinitionError(
318
"Invalid inline parameter format: %s" % argument
319
)
320
parsed = {'values': parameter_object}
321
parameter_values = translator.definition_to_parameter_values(parsed)
322
parameters['parameterValues'] = parameter_values
323
324
325
class ListRunsCommand(BasicCommand):
326
NAME = 'list-runs'
327
DESCRIPTION = (
328
'Lists the times the specified pipeline has run. '
329
'You can optionally filter the complete list of '
330
'results to include only the runs you are interested in.')
331
ARG_TABLE = [
332
{'name': 'pipeline-id', 'help_text': 'The identifier of the pipeline.',
333
'action': 'store', 'required': True, 'cli_type_name': 'string', },
334
{'name': 'status',
335
'help_text': (
336
'Filters the list to include only runs in the '
337
'specified statuses. '
338
'The valid statuses are as follows: waiting, pending, cancelled, '
339
'running, finished, failed, waiting_for_runner, '
340
'and waiting_on_dependencies.'),
341
'action': 'store'},
342
{'name': 'start-interval',
343
'help_text': (
344
'Filters the list to include only runs that started '
345
'within the specified interval.'),
346
'action': 'store', 'required': False, 'cli_type_name': 'string', },
347
{'name': 'schedule-interval',
348
'help_text': (
349
'Filters the list to include only runs that are scheduled to '
350
'start within the specified interval.'),
351
'action': 'store', 'required': False, 'cli_type_name': 'string', },
352
]
353
VALID_STATUS = ['waiting', 'pending', 'cancelled', 'running',
354
'finished', 'failed', 'waiting_for_runner',
355
'waiting_on_dependencies', 'shutting_down']
356
357
def _run_main(self, parsed_args, parsed_globals, **kwargs):
358
self._set_client(parsed_globals)
359
self._parse_type_args(parsed_args)
360
self._list_runs(parsed_args, parsed_globals)
361
362
def _set_client(self, parsed_globals):
363
# This is called from _run_main and is used to ensure that we have
364
# a service/endpoint object to work with.
365
from awscli.utils import create_nested_client
366
self.client = create_nested_client(
367
self._session,
368
'datapipeline',
369
region_name=parsed_globals.region,
370
endpoint_url=parsed_globals.endpoint_url,
371
verify=parsed_globals.verify_ssl)
372
373
def _parse_type_args(self, parsed_args):
374
# TODO: give good error messages!
375
# Parse the start/schedule times.
376
# Parse the status csv.
377
if parsed_args.start_interval is not None:
378
parsed_args.start_interval = [
379
arg.strip() for arg in
380
parsed_args.start_interval.split(',')]
381
if parsed_args.schedule_interval is not None:
382
parsed_args.schedule_interval = [
383
arg.strip() for arg in
384
parsed_args.schedule_interval.split(',')]
385
if parsed_args.status is not None:
386
parsed_args.status = [
387
arg.strip() for arg in
388
parsed_args.status.split(',')]
389
self._validate_status_choices(parsed_args.status)
390
391
def _validate_status_choices(self, statuses):
392
for status in statuses:
393
if status not in self.VALID_STATUS:
394
raise ValueError("Invalid status: %s, must be one of: %s" %
395
(status, ', '.join(self.VALID_STATUS)))
396
397
def _list_runs(self, parsed_args, parsed_globals):
398
query = QueryArgBuilder().build_query(parsed_args)
399
object_ids = self._query_objects(parsed_args.pipeline_id, query)
400
objects = self._describe_objects(parsed_args.pipeline_id, object_ids)
401
converted = convert_described_objects(
402
objects,
403
sort_key_func=lambda x: (x.get('@scheduledStartTime'),
404
x.get('name')))
405
formatter = self._get_formatter(parsed_globals)
406
formatter(self.NAME, converted)
407
408
def _describe_objects(self, pipeline_id, object_ids):
409
# DescribeObjects will only accept 100 objectIds at a time,
410
# so we need to break up the list passed in into chunks that are at
411
# most that size. We then aggregate the results to return.
412
objects = []
413
for i in range(0, len(object_ids), MAX_ITEMS_PER_DESCRIBE):
414
current_object_ids = object_ids[i:i + MAX_ITEMS_PER_DESCRIBE]
415
result = self.client.describe_objects(
416
pipelineId=pipeline_id, objectIds=current_object_ids)
417
objects.extend(result['pipelineObjects'])
418
419
return objects
420
421
def _query_objects(self, pipeline_id, query):
422
paginator = self.client.get_paginator('query_objects').paginate(
423
pipelineId=pipeline_id,
424
sphere='INSTANCE', query=query)
425
parsed = paginator.build_full_result()
426
return parsed['ids']
427
428
def _get_formatter(self, parsed_globals):
429
output = parsed_globals.output
430
if output is None:
431
return ListRunsFormatter(parsed_globals)
432
else:
433
return get_formatter(output, parsed_globals)
434
435