Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Tools/c-analyzer/c_common/tables.py
12 views
1
from collections import namedtuple
2
import csv
3
import re
4
import textwrap
5
6
from . import NOT_SET, strutil, fsutil
7
8
9
EMPTY = '-'
10
UNKNOWN = '???'
11
12
13
def parse_markers(markers, default=None):
14
if markers is NOT_SET:
15
return default
16
if not markers:
17
return None
18
if type(markers) is not str:
19
return markers
20
if markers == markers[0] * len(markers):
21
return [markers]
22
return list(markers)
23
24
25
def fix_row(row, **markers):
26
if isinstance(row, str):
27
raise NotImplementedError(row)
28
empty = parse_markers(markers.pop('empty', ('-',)))
29
unknown = parse_markers(markers.pop('unknown', ('???',)))
30
row = (val if val else None for val in row)
31
if not empty:
32
if unknown:
33
row = (UNKNOWN if val in unknown else val for val in row)
34
elif not unknown:
35
row = (EMPTY if val in empty else val for val in row)
36
else:
37
row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
38
for val in row)
39
return tuple(row)
40
41
42
def _fix_read_default(row):
43
for value in row:
44
yield value.strip()
45
46
47
def _fix_write_default(row, empty=''):
48
for value in row:
49
yield empty if value is None else str(value)
50
51
52
def _normalize_fix_read(fix):
53
if fix is None:
54
fix = ''
55
if callable(fix):
56
def fix_row(row):
57
values = fix(row)
58
return _fix_read_default(values)
59
elif isinstance(fix, str):
60
def fix_row(row):
61
values = _fix_read_default(row)
62
return (None if v == fix else v
63
for v in values)
64
else:
65
raise NotImplementedError(fix)
66
return fix_row
67
68
69
def _normalize_fix_write(fix, empty=''):
70
if fix is None:
71
fix = empty
72
if callable(fix):
73
def fix_row(row):
74
values = fix(row)
75
return _fix_write_default(values, empty)
76
elif isinstance(fix, str):
77
def fix_row(row):
78
return _fix_write_default(row, fix)
79
else:
80
raise NotImplementedError(fix)
81
return fix_row
82
83
84
def read_table(infile, header, *,
85
sep='\t',
86
fix=None,
87
_open=open,
88
_get_reader=csv.reader,
89
):
90
"""Yield each row of the given ???-separated (e.g. tab) file."""
91
if isinstance(infile, str):
92
with _open(infile, newline='') as infile:
93
yield from read_table(
94
infile,
95
header,
96
sep=sep,
97
fix=fix,
98
_open=_open,
99
_get_reader=_get_reader,
100
)
101
return
102
lines = strutil._iter_significant_lines(infile)
103
104
# Validate the header.
105
if not isinstance(header, str):
106
header = sep.join(header)
107
try:
108
actualheader = next(lines).strip()
109
except StopIteration:
110
actualheader = ''
111
if actualheader != header:
112
raise ValueError(f'bad header {actualheader!r}')
113
114
fix_row = _normalize_fix_read(fix)
115
for row in _get_reader(lines, delimiter=sep or '\t'):
116
yield tuple(fix_row(row))
117
118
119
def write_table(outfile, header, rows, *,
120
sep='\t',
121
fix=None,
122
backup=True,
123
_open=open,
124
_get_writer=csv.writer,
125
):
126
"""Write each of the rows to the given ???-separated (e.g. tab) file."""
127
if backup:
128
fsutil.create_backup(outfile, backup)
129
if isinstance(outfile, str):
130
with _open(outfile, 'w', newline='') as outfile:
131
return write_table(
132
outfile,
133
header,
134
rows,
135
sep=sep,
136
fix=fix,
137
backup=backup,
138
_open=_open,
139
_get_writer=_get_writer,
140
)
141
142
if isinstance(header, str):
143
header = header.split(sep or '\t')
144
fix_row = _normalize_fix_write(fix)
145
writer = _get_writer(outfile, delimiter=sep or '\t')
146
writer.writerow(header)
147
for row in rows:
148
writer.writerow(
149
tuple(fix_row(row))
150
)
151
152
153
def parse_table(entries, sep, header=None, rawsep=None, *,
154
default=NOT_SET,
155
strict=True,
156
):
157
header, sep = _normalize_table_file_props(header, sep)
158
if not sep:
159
raise ValueError('missing "sep"')
160
161
ncols = None
162
if header:
163
if strict:
164
ncols = len(header.split(sep))
165
cur_file = None
166
for line, filename in strutil.parse_entries(entries, ignoresep=sep):
167
_sep = sep
168
if filename:
169
if header and cur_file != filename:
170
cur_file = filename
171
# Skip the first line if it's the header.
172
if line.strip() == header:
173
continue
174
else:
175
# We expected the header.
176
raise NotImplementedError((header, line))
177
elif rawsep and sep not in line:
178
_sep = rawsep
179
180
row = _parse_row(line, _sep, ncols, default)
181
if strict and not ncols:
182
ncols = len(row)
183
yield row, filename
184
185
186
def parse_row(line, sep, *, ncols=None, default=NOT_SET):
187
if not sep:
188
raise ValueError('missing "sep"')
189
return _parse_row(line, sep, ncols, default)
190
191
192
def _parse_row(line, sep, ncols, default):
193
row = tuple(v.strip() for v in line.split(sep))
194
if (ncols or 0) > 0:
195
diff = ncols - len(row)
196
if diff:
197
if default is NOT_SET or diff < 0:
198
raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
199
row += (default,) * diff
200
return row
201
202
203
def _normalize_table_file_props(header, sep):
204
if not header:
205
return None, sep
206
207
if not isinstance(header, str):
208
if not sep:
209
raise NotImplementedError(header)
210
header = sep.join(header)
211
elif not sep:
212
for sep in ('\t', ',', ' '):
213
if sep in header:
214
break
215
else:
216
sep = None
217
return header, sep
218
219
220
##################################
221
# stdout tables
222
223
WIDTH = 20
224
225
226
def resolve_columns(specs):
227
if isinstance(specs, str):
228
specs = specs.replace(',', ' ').strip().split()
229
resolved = []
230
for raw in specs:
231
column = ColumnSpec.from_raw(raw)
232
resolved.append(column)
233
return resolved
234
235
236
def build_table(specs, *, sep=' ', defaultwidth=None):
237
columns = resolve_columns(specs)
238
return _build_table(columns, sep=sep, defaultwidth=defaultwidth)
239
240
241
class ColumnSpec(namedtuple('ColumnSpec', 'field label fmt')):
242
243
REGEX = re.compile(textwrap.dedent(r'''
244
^
245
(?:
246
\[
247
(
248
(?: [^\s\]] [^\]]* )?
249
[^\s\]]
250
) # <label>
251
]
252
)?
253
( [-\w]+ ) # <field>
254
(?:
255
(?:
256
:
257
( [<^>] ) # <align>
258
( \d+ )? # <width1>
259
)
260
|
261
(?:
262
(?:
263
:
264
( \d+ ) # <width2>
265
)?
266
(?:
267
:
268
( .*? ) # <fmt>
269
)?
270
)
271
)?
272
$
273
'''), re.VERBOSE)
274
275
@classmethod
276
def from_raw(cls, raw):
277
if not raw:
278
raise ValueError('missing column spec')
279
elif isinstance(raw, cls):
280
return raw
281
282
if isinstance(raw, str):
283
*values, _ = cls._parse(raw)
284
else:
285
*values, _ = cls._normalize(raw)
286
if values is None:
287
raise ValueError(f'unsupported column spec {raw!r}')
288
return cls(*values)
289
290
@classmethod
291
def parse(cls, specstr):
292
parsed = cls._parse(specstr)
293
if not parsed:
294
return None
295
*values, _ = parsed
296
return cls(*values)
297
298
@classmethod
299
def _parse(cls, specstr):
300
m = cls.REGEX.match(specstr)
301
if not m:
302
return None
303
(label, field,
304
align, width1,
305
width2, fmt,
306
) = m.groups()
307
if not label:
308
label = field
309
if fmt:
310
assert not align and not width1, (specstr,)
311
_parsed = _parse_fmt(fmt)
312
if not _parsed:
313
raise NotImplementedError
314
elif width2:
315
width, _ = _parsed
316
if width != int(width2):
317
raise NotImplementedError(specstr)
318
elif width2:
319
fmt = width2
320
width = int(width2)
321
else:
322
assert not fmt, (fmt, specstr)
323
if align:
324
width = int(width1) if width1 else len(label)
325
fmt = f'{align}{width}'
326
else:
327
width = None
328
return field, label, fmt, width
329
330
@classmethod
331
def _normalize(cls, spec):
332
if len(spec) == 1:
333
raw, = spec
334
raise NotImplementedError
335
return _resolve_column(raw)
336
337
if len(spec) == 4:
338
label, field, width, fmt = spec
339
if width:
340
if not fmt:
341
fmt = str(width)
342
elif _parse_fmt(fmt)[0] != width:
343
raise ValueError(f'width mismatch in {spec}')
344
elif len(raw) == 3:
345
label, field, fmt = spec
346
if not field:
347
label, field = None, label
348
elif not isinstance(field, str) or not field.isidentifier():
349
# XXX This doesn't seem right...
350
fmt = f'{field}:{fmt}' if fmt else field
351
label, field = None, label
352
elif len(raw) == 2:
353
label = None
354
field, fmt = raw
355
if not field:
356
field, fmt = fmt, None
357
elif not field.isidentifier() or fmt.isidentifier():
358
label, field = field, fmt
359
else:
360
raise NotImplementedError
361
362
fmt = f':{fmt}' if fmt else ''
363
if label:
364
return cls._parse(f'[{label}]{field}{fmt}')
365
else:
366
return cls._parse(f'{field}{fmt}')
367
368
@property
369
def width(self):
370
if not self.fmt:
371
return None
372
parsed = _parse_fmt(self.fmt)
373
if not parsed:
374
return None
375
width, _ = parsed
376
return width
377
378
def resolve_width(self, default=None):
379
return _resolve_width(self.width, self.fmt, self.label, default)
380
381
382
def _parse_fmt(fmt):
383
if fmt.startswith(tuple('<^>')):
384
align = fmt[0]
385
width = fmt[1:]
386
if width.isdigit():
387
return int(width), align
388
elif fmt.isdigit():
389
return int(fmt), '<'
390
return None
391
392
393
def _resolve_width(width, fmt, label, default):
394
if width:
395
if not isinstance(width, int):
396
raise NotImplementedError
397
return width
398
elif fmt:
399
parsed = _parse_fmt(fmt)
400
if parsed:
401
width, _ = parsed
402
if width:
403
return width
404
405
if not default:
406
return WIDTH
407
elif hasattr(default, 'get'):
408
defaults = default
409
default = defaults.get(None) or WIDTH
410
return defaults.get(label) or default
411
else:
412
return default or WIDTH
413
414
415
def _build_table(columns, *, sep=' ', defaultwidth=None):
416
header = []
417
div = []
418
rowfmt = []
419
for spec in columns:
420
width = spec.resolve_width(defaultwidth)
421
colfmt = spec.fmt
422
colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'
423
424
header.append(f' {{:^{width}}} '.format(spec.label))
425
div.append('-' * (width + 2))
426
rowfmt.append(f' {{{spec.field}{colfmt}}} ')
427
return (
428
sep.join(header),
429
sep.join(div),
430
sep.join(rowfmt),
431
)
432
433