Path: blob/develop/awscli/customizations/datapipeline/__init__.py
1567 views
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.1213import json14from datetime import timedelta1516from awscli.formatter import get_formatter17from awscli.arguments import CustomArgument18from awscli.compat import get_current_datetime19from awscli.customizations.commands import BasicCommand20from awscli.customizations.datapipeline import translator21from awscli.customizations.datapipeline.createdefaultroles \22import CreateDefaultRoles23from awscli.customizations.datapipeline.listrunsformatter \24import ListRunsFormatter25from awscli.utils import create_nested_client2627DEFINITION_HELP_TEXT = """\28The JSON pipeline definition. If the pipeline definition29is in a file you can use the file://<filename> syntax to30specify a filename.31"""32PARAMETER_OBJECTS_HELP_TEXT = """\33The JSON parameter objects. If the parameter objects are34in a file you can use the file://<filename> syntax to35specify a filename. You can optionally provide these in36pipeline definition as well. Parameter objects provided37on command line would replace the one in definition.38"""39PARAMETER_VALUES_HELP_TEXT = """\40The JSON parameter values. If the parameter values are41in a file you can use the file://<filename> syntax to42specify a filename. You can optionally provide these in43pipeline definition as well. Parameter values provided44on command line would replace the one in definition.45"""46INLINE_PARAMETER_VALUES_HELP_TEXT = """\47The JSON parameter values. You can specify these as48key-value pairs in the key=value format. Multiple parameters49are separated by a space. For list type parameter values50you can use the same key name and specify each value as51a key value pair. e.g. arrayValue=value1 arrayValue=value252"""53MAX_ITEMS_PER_DESCRIBE = 100545556class DocSectionNotFoundError(Exception):57pass585960class ParameterDefinitionError(Exception):61def __init__(self, msg):62full_msg = ("Error in parameter: %s\n" % msg)63super(ParameterDefinitionError, self).__init__(full_msg)64self.msg = msg656667def register_customizations(cli):68cli.register(69'building-argument-table.datapipeline.put-pipeline-definition',70add_pipeline_definition)71cli.register(72'building-argument-table.datapipeline.activate-pipeline',73activate_pipeline_definition)74cli.register(75'after-call.datapipeline.GetPipelineDefinition',76translate_definition)77cli.register(78'building-command-table.datapipeline',79register_commands)80cli.register_last(81'doc-output.datapipeline.get-pipeline-definition',82document_translation)838485def register_commands(command_table, session, **kwargs):86command_table['list-runs'] = ListRunsCommand(session)87command_table['create-default-roles'] = CreateDefaultRoles(session)888990def document_translation(help_command, **kwargs):91# Remove all the writes until we get to the output.92# I don't think this is the ideal way to do this, we should93# improve our plugin/doc system to make this easier.94doc = help_command.doc95current = ''96while current != '======\nOutput\n======':97try:98current = doc.pop_write()99except IndexError:100# This should never happen, but in the rare case that it does101# we should be raising something with a helpful error message.102raise DocSectionNotFoundError(103'Could not find the "output" section for the command: %s'104% help_command)105doc.write('======\nOutput\n======')106doc.write(107'\nThe output of this command is the pipeline definition, which'108' is documented in the '109'`Pipeline Definition File Syntax '110'<http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/'111'dp-writing-pipeline-definition.html>`__')112113114def add_pipeline_definition(argument_table, **kwargs):115argument_table['pipeline-definition'] = PipelineDefinitionArgument(116'pipeline-definition', required=True,117help_text=DEFINITION_HELP_TEXT)118119argument_table['parameter-objects'] = ParameterObjectsArgument(120'parameter-objects', required=False,121help_text=PARAMETER_OBJECTS_HELP_TEXT)122123argument_table['parameter-values-uri'] = ParameterValuesArgument(124'parameter-values-uri',125required=False,126help_text=PARAMETER_VALUES_HELP_TEXT)127128# Need to use an argument model for inline parameters to accept a list129argument_table['parameter-values'] = ParameterValuesInlineArgument(130'parameter-values',131required=False,132nargs='+',133help_text=INLINE_PARAMETER_VALUES_HELP_TEXT)134135# The pipeline-objects is no longer needed required because136# a user can provide a pipeline-definition instead.137# get-pipeline-definition also displays the output in the138# translated format.139140del argument_table['pipeline-objects']141142143def activate_pipeline_definition(argument_table, **kwargs):144argument_table['parameter-values-uri'] = ParameterValuesArgument(145'parameter-values-uri', required=False,146help_text=PARAMETER_VALUES_HELP_TEXT)147148# Need to use an argument model for inline parameters to accept a list149argument_table['parameter-values'] = ParameterValuesInlineArgument(150'parameter-values',151required=False,152nargs='+',153help_text=INLINE_PARAMETER_VALUES_HELP_TEXT,154)155156157def translate_definition(parsed, **kwargs):158translator.api_to_definition(parsed)159160161def convert_described_objects(api_describe_objects, sort_key_func=None):162# We need to take a field list that looks like this:163# {u'key': u'@sphere', u'stringValue': u'INSTANCE'},164# into {"@sphere": "INSTANCE}.165# We convert the fields list into a field dict.166converted = []167for obj in api_describe_objects:168new_fields = {169'@id': obj['id'],170'name': obj['name'],171}172for field in obj['fields']:173new_fields[field['key']] = field.get('stringValue',174field.get('refValue'))175converted.append(new_fields)176if sort_key_func is not None:177converted.sort(key=sort_key_func)178return converted179180181class QueryArgBuilder(object):182"""183Convert CLI arguments to Query arguments used by QueryObject.184"""185def __init__(self, current_time=None):186if current_time is None:187current_time = get_current_datetime()188self.current_time = current_time189190def build_query(self, parsed_args):191selectors = []192if parsed_args.start_interval is None and \193parsed_args.schedule_interval is None:194# If no intervals are specified, default195# to a start time of 4 days ago and an end time196# of right now.197end_datetime = self.current_time198start_datetime = end_datetime - timedelta(days=4)199start_time_str = start_datetime.strftime('%Y-%m-%dT%H:%M:%S')200end_time_str = end_datetime.strftime('%Y-%m-%dT%H:%M:%S')201selectors.append({202'fieldName': '@actualStartTime',203'operator': {204'type': 'BETWEEN',205'values': [start_time_str, end_time_str]206}207})208else:209self._build_schedule_times(selectors, parsed_args)210if parsed_args.status is not None:211self._build_status(selectors, parsed_args)212query = {'selectors': selectors}213return query214215def _build_schedule_times(self, selectors, parsed_args):216if parsed_args.start_interval is not None:217start_time_str = parsed_args.start_interval[0]218end_time_str = parsed_args.start_interval[1]219selectors.append({220'fieldName': '@actualStartTime',221'operator': {222'type': 'BETWEEN',223'values': [start_time_str, end_time_str]224}225})226if parsed_args.schedule_interval is not None:227start_time_str = parsed_args.schedule_interval[0]228end_time_str = parsed_args.schedule_interval[1]229selectors.append({230'fieldName': '@scheduledStartTime',231'operator': {232'type': 'BETWEEN',233'values': [start_time_str, end_time_str]234}235})236237def _build_status(self, selectors, parsed_args):238selectors.append({239'fieldName': '@status',240'operator': {241'type': 'EQ',242'values': [status.upper() for status in parsed_args.status]243}244})245246247class PipelineDefinitionArgument(CustomArgument):248def add_to_params(self, parameters, value):249if value is None:250return251parsed = json.loads(value)252api_objects = translator.definition_to_api_objects(parsed)253parameter_objects = translator.definition_to_api_parameters(parsed)254parameter_values = translator.definition_to_parameter_values(parsed)255parameters['pipelineObjects'] = api_objects256# Use Parameter objects and values from def if not already provided257if 'parameterObjects' not in parameters \258and parameter_objects is not None:259parameters['parameterObjects'] = parameter_objects260if 'parameterValues' not in parameters \261and parameter_values is not None:262parameters['parameterValues'] = parameter_values263264265class ParameterObjectsArgument(CustomArgument):266def add_to_params(self, parameters, value):267if value is None:268return269parsed = json.loads(value)270parameter_objects = translator.definition_to_api_parameters(parsed)271parameters['parameterObjects'] = parameter_objects272273274class ParameterValuesArgument(CustomArgument):275def add_to_params(self, parameters, value):276277if value is None:278return279280if parameters.get('parameterValues', None) is not None:281raise Exception(282"Only parameter-values or parameter-values-uri is allowed"283)284285parsed = json.loads(value)286parameter_values = translator.definition_to_parameter_values(parsed)287parameters['parameterValues'] = parameter_values288289290class ParameterValuesInlineArgument(CustomArgument):291def add_to_params(self, parameters, value):292293if value is None:294return295296if parameters.get('parameterValues', None) is not None:297raise Exception(298"Only parameter-values or parameter-values-uri is allowed"299)300301parameter_object = {}302# break string into = point303for argument in value:304try:305argument_components = argument.split('=', 1)306key = argument_components[0]307value = argument_components[1]308if key in parameter_object:309if isinstance(parameter_object[key], list):310parameter_object[key].append(value)311else:312parameter_object[key] = [parameter_object[key], value]313else:314parameter_object[key] = value315except IndexError:316raise ParameterDefinitionError(317"Invalid inline parameter format: %s" % argument318)319parsed = {'values': parameter_object}320parameter_values = translator.definition_to_parameter_values(parsed)321parameters['parameterValues'] = parameter_values322323324class ListRunsCommand(BasicCommand):325NAME = 'list-runs'326DESCRIPTION = (327'Lists the times the specified pipeline has run. '328'You can optionally filter the complete list of '329'results to include only the runs you are interested in.')330ARG_TABLE = [331{'name': 'pipeline-id', 'help_text': 'The identifier of the pipeline.',332'action': 'store', 'required': True, 'cli_type_name': 'string', },333{'name': 'status',334'help_text': (335'Filters the list to include only runs in the '336'specified statuses. '337'The valid statuses are as follows: waiting, pending, cancelled, '338'running, finished, failed, waiting_for_runner, '339'and waiting_on_dependencies.'),340'action': 'store'},341{'name': 'start-interval',342'help_text': (343'Filters the list to include only runs that started '344'within the specified interval.'),345'action': 'store', 'required': False, 'cli_type_name': 'string', },346{'name': 'schedule-interval',347'help_text': (348'Filters the list to include only runs that are scheduled to '349'start within the specified interval.'),350'action': 'store', 'required': False, 'cli_type_name': 'string', },351]352VALID_STATUS = ['waiting', 'pending', 'cancelled', 'running',353'finished', 'failed', 'waiting_for_runner',354'waiting_on_dependencies', 'shutting_down']355356def _run_main(self, parsed_args, parsed_globals, **kwargs):357self._set_client(parsed_globals)358self._parse_type_args(parsed_args)359self._list_runs(parsed_args, parsed_globals)360361def _set_client(self, parsed_globals):362# This is called from _run_main and is used to ensure that we have363# a service/endpoint object to work with.364from awscli.utils import create_nested_client365self.client = create_nested_client(366self._session,367'datapipeline',368region_name=parsed_globals.region,369endpoint_url=parsed_globals.endpoint_url,370verify=parsed_globals.verify_ssl)371372def _parse_type_args(self, parsed_args):373# TODO: give good error messages!374# Parse the start/schedule times.375# Parse the status csv.376if parsed_args.start_interval is not None:377parsed_args.start_interval = [378arg.strip() for arg in379parsed_args.start_interval.split(',')]380if parsed_args.schedule_interval is not None:381parsed_args.schedule_interval = [382arg.strip() for arg in383parsed_args.schedule_interval.split(',')]384if parsed_args.status is not None:385parsed_args.status = [386arg.strip() for arg in387parsed_args.status.split(',')]388self._validate_status_choices(parsed_args.status)389390def _validate_status_choices(self, statuses):391for status in statuses:392if status not in self.VALID_STATUS:393raise ValueError("Invalid status: %s, must be one of: %s" %394(status, ', '.join(self.VALID_STATUS)))395396def _list_runs(self, parsed_args, parsed_globals):397query = QueryArgBuilder().build_query(parsed_args)398object_ids = self._query_objects(parsed_args.pipeline_id, query)399objects = self._describe_objects(parsed_args.pipeline_id, object_ids)400converted = convert_described_objects(401objects,402sort_key_func=lambda x: (x.get('@scheduledStartTime'),403x.get('name')))404formatter = self._get_formatter(parsed_globals)405formatter(self.NAME, converted)406407def _describe_objects(self, pipeline_id, object_ids):408# DescribeObjects will only accept 100 objectIds at a time,409# so we need to break up the list passed in into chunks that are at410# most that size. We then aggregate the results to return.411objects = []412for i in range(0, len(object_ids), MAX_ITEMS_PER_DESCRIBE):413current_object_ids = object_ids[i:i + MAX_ITEMS_PER_DESCRIBE]414result = self.client.describe_objects(415pipelineId=pipeline_id, objectIds=current_object_ids)416objects.extend(result['pipelineObjects'])417418return objects419420def _query_objects(self, pipeline_id, query):421paginator = self.client.get_paginator('query_objects').paginate(422pipelineId=pipeline_id,423sphere='INSTANCE', query=query)424parsed = paginator.build_full_result()425return parsed['ids']426427def _get_formatter(self, parsed_globals):428output = parsed_globals.output429if output is None:430return ListRunsFormatter(parsed_globals)431else:432return get_formatter(output, parsed_globals)433434435