import logging
from botocore.compat import json
from botocore.paginate import PageIterator
from botocore.utils import set_value_from_jmespath
from awscli import compat, text
from awscli.table import ColorizedStyler, MultiTable, Styler
from awscli.utils import json_encoder
LOG = logging.getLogger(__name__)
def is_response_paginated(response):
return isinstance(response, PageIterator)
class Formatter:
def __init__(self, args):
self._args = args
def _remove_request_id(self, response_data):
if 'ResponseMetadata' in response_data:
if 'RequestId' in response_data['ResponseMetadata']:
request_id = response_data['ResponseMetadata']['RequestId']
LOG.debug('RequestId: %s', request_id)
del response_data['ResponseMetadata']
def _get_default_stream(self):
return compat.get_stdout_text_writer()
def _flush_stream(self, stream):
try:
stream.flush()
except OSError:
pass
class FullyBufferedFormatter(Formatter):
def __call__(self, command_name, response, stream=None):
if stream is None:
stream = self._get_default_stream()
if is_response_paginated(response):
response_data = response.build_full_result()
else:
response_data = response
self._remove_request_id(response_data)
if self._args.query is not None:
response_data = self._args.query.search(response_data)
try:
self._format_response(command_name, response_data, stream)
except OSError:
pass
finally:
self._flush_stream(stream)
class JSONFormatter(FullyBufferedFormatter):
def _format_response(self, command_name, response, stream):
if response != {}:
json.dump(
response,
stream,
indent=4,
default=json_encoder,
ensure_ascii=False,
)
stream.write('\n')
class TableFormatter(FullyBufferedFormatter):
"""Pretty print a table from a given response.
The table formatter is able to take any generic response
and generate a pretty printed table. It does this without
using the output definition from the model.
"""
def __init__(self, args, table=None):
super(TableFormatter, self).__init__(args)
if args.color == 'auto':
self.table = MultiTable(
initial_section=False, column_separator='|'
)
elif args.color == 'off':
styler = Styler()
self.table = MultiTable(
initial_section=False, column_separator='|', styler=styler
)
elif args.color == 'on':
styler = ColorizedStyler()
self.table = MultiTable(
initial_section=False, column_separator='|', styler=styler
)
else:
raise ValueError("Unknown color option: %s" % args.color)
def _format_response(self, command_name, response, stream):
if self._build_table(command_name, response):
try:
self.table.render(stream)
except OSError:
pass
def _build_table(self, title, current, indent_level=0):
if not current:
return False
if title is not None:
self.table.new_section(title, indent_level=indent_level)
if isinstance(current, list):
if isinstance(current[0], dict):
self._build_sub_table_from_list(current, indent_level, title)
else:
for item in current:
if self._scalar_type(item):
self.table.add_row([item])
elif all(self._scalar_type(el) for el in item):
self.table.add_row(item)
else:
self._build_table(title=None, current=item)
if isinstance(current, dict):
self._build_sub_table_from_dict(current, indent_level)
return True
def _build_sub_table_from_dict(self, current, indent_level):
headers, more = self._group_scalar_keys(current)
if len(headers) == 1:
self.table.add_row([headers[0], current[headers[0]]])
elif headers:
self.table.add_row_header(headers)
self.table.add_row([current[k] for k in headers])
for remaining in more:
self._build_table(
remaining, current[remaining], indent_level=indent_level + 1
)
def _build_sub_table_from_list(self, current, indent_level, title):
headers, more = self._group_scalar_keys_from_list(current)
self.table.add_row_header(headers)
first = True
for element in current:
if not first and more:
self.table.new_section(title, indent_level=indent_level)
self.table.add_row_header(headers)
first = False
self.table.add_row([element.get(header, '') for header in headers])
for remaining in more:
if remaining in element:
self._build_table(
remaining,
element[remaining],
indent_level=indent_level + 1,
)
def _scalar_type(self, element):
return not isinstance(element, (list, dict))
def _group_scalar_keys_from_list(self, list_of_dicts):
headers = set()
more = set()
for item in list_of_dicts:
current_headers, current_more = self._group_scalar_keys(item)
headers.update(current_headers)
more.update(current_more)
headers = list(sorted(headers))
more = list(sorted(more))
return headers, more
def _group_scalar_keys(self, current):
more = []
headers = []
for element in current:
if self._scalar_type(current[element]):
headers.append(element)
else:
more.append(element)
headers.sort()
more.sort()
return headers, more
class TextFormatter(Formatter):
def __call__(self, command_name, response, stream=None):
if stream is None:
stream = self._get_default_stream()
try:
if is_response_paginated(response):
result_keys = response.result_keys
for i, page in enumerate(response):
if i > 0:
current = {}
else:
current = response.non_aggregate_part
for result_key in result_keys:
data = result_key.search(page)
set_value_from_jmespath(
current, result_key.expression, data
)
self._format_response(current, stream)
if response.resume_token:
self._format_response(
{'NextToken': {'NextToken': response.resume_token}},
stream,
)
else:
self._remove_request_id(response)
self._format_response(response, stream)
finally:
self._flush_stream(stream)
def _format_response(self, response, stream):
if self._args.query is not None:
expression = self._args.query
response = expression.search(response)
text.format_text(response, stream)
def get_formatter(format_type, args):
if format_type == 'json':
return JSONFormatter(args)
elif format_type == 'text':
return TextFormatter(args)
elif format_type == 'table':
return TableFormatter(args)
raise ValueError("Unknown output type: %s" % format_type)