Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/formatter.py
1566 views
1
# Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
7
# http://aws.amazon.com/apache2.0/
8
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import logging
14
15
from botocore.compat import json
16
from botocore.paginate import PageIterator
17
from botocore.utils import set_value_from_jmespath
18
19
from awscli import compat, text
20
from awscli.table import ColorizedStyler, MultiTable, Styler
21
from awscli.utils import json_encoder
22
23
LOG = logging.getLogger(__name__)
24
25
26
def is_response_paginated(response):
27
return isinstance(response, PageIterator)
28
29
30
class Formatter:
31
def __init__(self, args):
32
self._args = args
33
34
def _remove_request_id(self, response_data):
35
if 'ResponseMetadata' in response_data:
36
if 'RequestId' in response_data['ResponseMetadata']:
37
request_id = response_data['ResponseMetadata']['RequestId']
38
LOG.debug('RequestId: %s', request_id)
39
del response_data['ResponseMetadata']
40
41
def _get_default_stream(self):
42
return compat.get_stdout_text_writer()
43
44
def _flush_stream(self, stream):
45
try:
46
stream.flush()
47
except OSError:
48
pass
49
50
51
class FullyBufferedFormatter(Formatter):
52
def __call__(self, command_name, response, stream=None):
53
if stream is None:
54
# Retrieve stdout on invocation instead of at import time
55
# so that if anything wraps stdout we'll pick up those changes
56
# (specifically colorama on windows wraps stdout).
57
stream = self._get_default_stream()
58
# I think the interfaces between non-paginated
59
# and paginated responses can still be cleaned up.
60
if is_response_paginated(response):
61
response_data = response.build_full_result()
62
else:
63
response_data = response
64
self._remove_request_id(response_data)
65
if self._args.query is not None:
66
response_data = self._args.query.search(response_data)
67
try:
68
self._format_response(command_name, response_data, stream)
69
except OSError:
70
# If the reading end of our stdout stream has closed the file
71
# we can just exit.
72
pass
73
finally:
74
# flush is needed to avoid the "close failed in file object
75
# destructor" in python2.x (see http://bugs.python.org/issue11380).
76
self._flush_stream(stream)
77
78
79
class JSONFormatter(FullyBufferedFormatter):
80
def _format_response(self, command_name, response, stream):
81
# For operations that have no response body (e.g. s3 put-object)
82
# the response will be an empty string. We don't want to print
83
# that out to the user but other "falsey" values like an empty
84
# dictionary should be printed.
85
if response != {}:
86
json.dump(
87
response,
88
stream,
89
indent=4,
90
default=json_encoder,
91
ensure_ascii=False,
92
)
93
stream.write('\n')
94
95
96
class TableFormatter(FullyBufferedFormatter):
97
"""Pretty print a table from a given response.
98
99
The table formatter is able to take any generic response
100
and generate a pretty printed table. It does this without
101
using the output definition from the model.
102
103
"""
104
105
def __init__(self, args, table=None):
106
super(TableFormatter, self).__init__(args)
107
if args.color == 'auto':
108
self.table = MultiTable(
109
initial_section=False, column_separator='|'
110
)
111
elif args.color == 'off':
112
styler = Styler()
113
self.table = MultiTable(
114
initial_section=False, column_separator='|', styler=styler
115
)
116
elif args.color == 'on':
117
styler = ColorizedStyler()
118
self.table = MultiTable(
119
initial_section=False, column_separator='|', styler=styler
120
)
121
else:
122
raise ValueError("Unknown color option: %s" % args.color)
123
124
def _format_response(self, command_name, response, stream):
125
if self._build_table(command_name, response):
126
try:
127
self.table.render(stream)
128
except OSError:
129
# If they're piping stdout to another process which exits before
130
# we're done writing all of our output, we'll get an error about a
131
# closed pipe which we can safely ignore.
132
pass
133
134
def _build_table(self, title, current, indent_level=0):
135
if not current:
136
return False
137
if title is not None:
138
self.table.new_section(title, indent_level=indent_level)
139
if isinstance(current, list):
140
if isinstance(current[0], dict):
141
self._build_sub_table_from_list(current, indent_level, title)
142
else:
143
for item in current:
144
if self._scalar_type(item):
145
self.table.add_row([item])
146
elif all(self._scalar_type(el) for el in item):
147
self.table.add_row(item)
148
else:
149
self._build_table(title=None, current=item)
150
if isinstance(current, dict):
151
# Render a single row section with keys as header
152
# and the row as the values, unless the value
153
# is a list.
154
self._build_sub_table_from_dict(current, indent_level)
155
return True
156
157
def _build_sub_table_from_dict(self, current, indent_level):
158
# Render a single row section with keys as header
159
# and the row as the values, unless the value
160
# is a list.
161
headers, more = self._group_scalar_keys(current)
162
if len(headers) == 1:
163
# Special casing if a dict has a single scalar key/value pair.
164
self.table.add_row([headers[0], current[headers[0]]])
165
elif headers:
166
self.table.add_row_header(headers)
167
self.table.add_row([current[k] for k in headers])
168
for remaining in more:
169
self._build_table(
170
remaining, current[remaining], indent_level=indent_level + 1
171
)
172
173
def _build_sub_table_from_list(self, current, indent_level, title):
174
headers, more = self._group_scalar_keys_from_list(current)
175
self.table.add_row_header(headers)
176
first = True
177
for element in current:
178
if not first and more:
179
self.table.new_section(title, indent_level=indent_level)
180
self.table.add_row_header(headers)
181
first = False
182
# Use .get() to account for the fact that sometimes an element
183
# may not have all the keys from the header.
184
self.table.add_row([element.get(header, '') for header in headers])
185
for remaining in more:
186
# Some of the non scalar attributes may not necessarily
187
# be in every single element of the list, so we need to
188
# check this condition before recursing.
189
if remaining in element:
190
self._build_table(
191
remaining,
192
element[remaining],
193
indent_level=indent_level + 1,
194
)
195
196
def _scalar_type(self, element):
197
return not isinstance(element, (list, dict))
198
199
def _group_scalar_keys_from_list(self, list_of_dicts):
200
# We want to make sure we catch all the keys in the list of dicts.
201
# Most of the time each list element has the same keys, but sometimes
202
# a list element will have keys not defined in other elements.
203
headers = set()
204
more = set()
205
for item in list_of_dicts:
206
current_headers, current_more = self._group_scalar_keys(item)
207
headers.update(current_headers)
208
more.update(current_more)
209
headers = list(sorted(headers))
210
more = list(sorted(more))
211
return headers, more
212
213
def _group_scalar_keys(self, current):
214
# Given a dict, separate the keys into those whose values are
215
# scalar, and those whose values aren't. Return two lists,
216
# one is the scalar value keys, the second is the remaining keys.
217
more = []
218
headers = []
219
for element in current:
220
if self._scalar_type(current[element]):
221
headers.append(element)
222
else:
223
more.append(element)
224
headers.sort()
225
more.sort()
226
return headers, more
227
228
229
class TextFormatter(Formatter):
230
def __call__(self, command_name, response, stream=None):
231
if stream is None:
232
stream = self._get_default_stream()
233
try:
234
if is_response_paginated(response):
235
result_keys = response.result_keys
236
for i, page in enumerate(response):
237
if i > 0:
238
current = {}
239
else:
240
current = response.non_aggregate_part
241
242
for result_key in result_keys:
243
data = result_key.search(page)
244
set_value_from_jmespath(
245
current, result_key.expression, data
246
)
247
self._format_response(current, stream)
248
if response.resume_token:
249
# Tell the user about the next token so they can continue
250
# if they want.
251
self._format_response(
252
{'NextToken': {'NextToken': response.resume_token}},
253
stream,
254
)
255
else:
256
self._remove_request_id(response)
257
self._format_response(response, stream)
258
finally:
259
# flush is needed to avoid the "close failed in file object
260
# destructor" in python2.x (see http://bugs.python.org/issue11380).
261
self._flush_stream(stream)
262
263
def _format_response(self, response, stream):
264
if self._args.query is not None:
265
expression = self._args.query
266
response = expression.search(response)
267
text.format_text(response, stream)
268
269
270
def get_formatter(format_type, args):
271
if format_type == 'json':
272
return JSONFormatter(args)
273
elif format_type == 'text':
274
return TextFormatter(args)
275
elif format_type == 'table':
276
return TableFormatter(args)
277
raise ValueError("Unknown output type: %s" % format_type)
278
279