Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Tools/c-analyzer/c_common/scriptutil.py
12 views
1
import argparse
2
import contextlib
3
import logging
4
import os
5
import os.path
6
import shutil
7
import sys
8
9
from . import fsutil, strutil, iterutil, logging as loggingutil
10
11
12
_NOT_SET = object()
13
14
15
def get_prog(spec=None, *, absolute=False, allowsuffix=True):
16
if spec is None:
17
_, spec = _find_script()
18
# This is more natural for prog than __file__ would be.
19
filename = sys.argv[0]
20
elif isinstance(spec, str):
21
filename = os.path.normpath(spec)
22
spec = None
23
else:
24
filename = spec.origin
25
if _is_standalone(filename):
26
# Check if "installed".
27
if allowsuffix or not filename.endswith('.py'):
28
basename = os.path.basename(filename)
29
found = shutil.which(basename)
30
if found:
31
script = os.path.abspath(filename)
32
found = os.path.abspath(found)
33
if os.path.normcase(script) == os.path.normcase(found):
34
return basename
35
# It is only "standalone".
36
if absolute:
37
filename = os.path.abspath(filename)
38
return filename
39
elif spec is not None:
40
module = spec.name
41
if module.endswith('.__main__'):
42
module = module[:-9]
43
return f'{sys.executable} -m {module}'
44
else:
45
if absolute:
46
filename = os.path.abspath(filename)
47
return f'{sys.executable} {filename}'
48
49
50
def _find_script():
51
frame = sys._getframe(2)
52
while frame.f_globals['__name__'] != '__main__':
53
frame = frame.f_back
54
55
# This should match sys.argv[0].
56
filename = frame.f_globals['__file__']
57
# This will be None if -m wasn't used..
58
spec = frame.f_globals['__spec__']
59
return filename, spec
60
61
62
def is_installed(filename, *, allowsuffix=True):
63
if not allowsuffix and filename.endswith('.py'):
64
return False
65
filename = os.path.abspath(os.path.normalize(filename))
66
found = shutil.which(os.path.basename(filename))
67
if not found:
68
return False
69
if found != filename:
70
return False
71
return _is_standalone(filename)
72
73
74
def is_standalone(filename):
75
filename = os.path.abspath(os.path.normalize(filename))
76
return _is_standalone(filename)
77
78
79
def _is_standalone(filename):
80
return fsutil.is_executable(filename)
81
82
83
##################################
84
# logging
85
86
VERBOSITY = 3
87
88
TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
89
TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
90
91
92
logger = logging.getLogger(__name__)
93
94
95
def configure_logger(verbosity, logger=None, **kwargs):
96
if logger is None:
97
# Configure the root logger.
98
logger = logging.getLogger()
99
loggingutil.configure_logger(logger, verbosity, **kwargs)
100
101
102
##################################
103
# selections
104
105
class UnsupportedSelectionError(Exception):
106
def __init__(self, values, possible):
107
self.values = tuple(values)
108
self.possible = tuple(possible)
109
super().__init__(f'unsupported selections {self.unique}')
110
111
@property
112
def unique(self):
113
return tuple(sorted(set(self.values)))
114
115
116
def normalize_selection(selected: str, *, possible=None):
117
if selected in (None, True, False):
118
return selected
119
elif isinstance(selected, str):
120
selected = [selected]
121
elif not selected:
122
return ()
123
124
unsupported = []
125
_selected = set()
126
for item in selected:
127
if not item:
128
continue
129
for value in item.strip().replace(',', ' ').split():
130
if not value:
131
continue
132
# XXX Handle subtraction (leading "-").
133
if possible and value not in possible and value != 'all':
134
unsupported.append(value)
135
_selected.add(value)
136
if unsupported:
137
raise UnsupportedSelectionError(unsupported, tuple(possible))
138
if 'all' in _selected:
139
return True
140
return frozenset(selected)
141
142
143
##################################
144
# CLI parsing helpers
145
146
class CLIArgSpec(tuple):
147
def __new__(cls, *args, **kwargs):
148
return super().__new__(cls, (args, kwargs))
149
150
def __repr__(self):
151
args, kwargs = self
152
args = [repr(arg) for arg in args]
153
for name, value in kwargs.items():
154
args.append(f'{name}={value!r}')
155
return f'{type(self).__name__}({", ".join(args)})'
156
157
def __call__(self, parser, *, _noop=(lambda a: None)):
158
self.apply(parser)
159
return _noop
160
161
def apply(self, parser):
162
args, kwargs = self
163
parser.add_argument(*args, **kwargs)
164
165
166
def apply_cli_argspecs(parser, specs):
167
processors = []
168
for spec in specs:
169
if callable(spec):
170
procs = spec(parser)
171
_add_procs(processors, procs)
172
else:
173
args, kwargs = spec
174
parser.add_argument(args, kwargs)
175
return processors
176
177
178
def _add_procs(flattened, procs):
179
# XXX Fail on non-empty, non-callable procs?
180
if not procs:
181
return
182
if callable(procs):
183
flattened.append(procs)
184
else:
185
#processors.extend(p for p in procs if callable(p))
186
for proc in procs:
187
_add_procs(flattened, proc)
188
189
190
def add_verbosity_cli(parser):
191
parser.add_argument('-q', '--quiet', action='count', default=0)
192
parser.add_argument('-v', '--verbose', action='count', default=0)
193
194
def process_args(args, *, argv=None):
195
ns = vars(args)
196
key = 'verbosity'
197
if key in ns:
198
parser.error(f'duplicate arg {key!r}')
199
ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
200
return key
201
return process_args
202
203
204
def add_traceback_cli(parser):
205
parser.add_argument('--traceback', '--tb', action='store_true',
206
default=TRACEBACK)
207
parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
208
action='store_const', const=False)
209
210
def process_args(args, *, argv=None):
211
ns = vars(args)
212
key = 'traceback_cm'
213
if key in ns:
214
parser.error(f'duplicate arg {key!r}')
215
showtb = ns.pop('traceback')
216
217
@contextlib.contextmanager
218
def traceback_cm():
219
restore = loggingutil.hide_emit_errors()
220
try:
221
yield
222
except BrokenPipeError:
223
# It was piped to "head" or something similar.
224
pass
225
except NotImplementedError:
226
raise # re-raise
227
except Exception as exc:
228
if not showtb:
229
sys.exit(f'ERROR: {exc}')
230
raise # re-raise
231
except KeyboardInterrupt:
232
if not showtb:
233
sys.exit('\nINTERRUPTED')
234
raise # re-raise
235
except BaseException as exc:
236
if not showtb:
237
sys.exit(f'{type(exc).__name__}: {exc}')
238
raise # re-raise
239
finally:
240
restore()
241
ns[key] = traceback_cm()
242
return key
243
return process_args
244
245
246
def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
247
# if opt is True:
248
# parser.add_argument(f'--{dest}', action='append', **kwargs)
249
# elif isinstance(opt, str) and opt.startswith('-'):
250
# parser.add_argument(opt, dest=dest, action='append', **kwargs)
251
# else:
252
# arg = dest if not opt else opt
253
# kwargs.setdefault('nargs', '+')
254
# parser.add_argument(arg, dest=dest, action='append', **kwargs)
255
if not isinstance(opt, str):
256
parser.error(f'opt must be a string, got {opt!r}')
257
elif opt.startswith('-'):
258
parser.add_argument(opt, dest=dest, action='append', **kwargs)
259
else:
260
kwargs.setdefault('nargs', '+')
261
#kwargs.setdefault('metavar', opt.upper())
262
parser.add_argument(opt, dest=dest, action='append', **kwargs)
263
264
def process_args(args, *, argv=None):
265
ns = vars(args)
266
267
# XXX Use normalize_selection()?
268
if isinstance(ns[dest], str):
269
ns[dest] = [ns[dest]]
270
selections = []
271
for many in ns[dest] or ():
272
for value in many.split(sep):
273
if value not in choices:
274
parser.error(f'unknown {dest} {value!r}')
275
selections.append(value)
276
ns[dest] = selections
277
return process_args
278
279
280
def add_files_cli(parser, *, excluded=None, nargs=None):
281
process_files = add_file_filtering_cli(parser, excluded=excluded)
282
parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
283
return [
284
process_files,
285
]
286
287
288
def add_file_filtering_cli(parser, *, excluded=None):
289
parser.add_argument('--start')
290
parser.add_argument('--include', action='append')
291
parser.add_argument('--exclude', action='append')
292
293
excluded = tuple(excluded or ())
294
295
def process_args(args, *, argv=None):
296
ns = vars(args)
297
key = 'iter_filenames'
298
if key in ns:
299
parser.error(f'duplicate arg {key!r}')
300
301
_include = tuple(ns.pop('include') or ())
302
_exclude = excluded + tuple(ns.pop('exclude') or ())
303
kwargs = dict(
304
start=ns.pop('start'),
305
include=tuple(_parse_files(_include)),
306
exclude=tuple(_parse_files(_exclude)),
307
# We use the default for "show_header"
308
)
309
def process_filenames(filenames, relroot=None):
310
return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
311
ns[key] = process_filenames
312
return process_args
313
314
315
def _parse_files(filenames):
316
for filename, _ in strutil.parse_entries(filenames):
317
yield filename.strip()
318
319
320
def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
321
parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
322
parser.add_argument('--no-progress', dest='track_progress', action='store_false')
323
parser.set_defaults(track_progress=True)
324
325
def process_args(args, *, argv=None):
326
if args.track_progress:
327
ns = vars(args)
328
verbosity = ns.get('verbosity', VERBOSITY)
329
if verbosity <= threshold:
330
args.track_progress = track_progress_compact
331
else:
332
args.track_progress = track_progress_flat
333
return process_args
334
335
336
def add_failure_filtering_cli(parser, pool, *, default=False):
337
parser.add_argument('--fail', action='append',
338
metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
339
parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
340
341
def process_args(args, *, argv=None):
342
ns = vars(args)
343
344
fail = ns.pop('fail')
345
try:
346
fail = normalize_selection(fail, possible=pool)
347
except UnsupportedSelectionError as exc:
348
parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
349
else:
350
if fail is None:
351
fail = default
352
353
if fail is True:
354
def ignore_exc(_exc):
355
return False
356
elif fail is False:
357
def ignore_exc(_exc):
358
return True
359
else:
360
def ignore_exc(exc):
361
for err in fail:
362
if type(exc) == pool[err]:
363
return False
364
else:
365
return True
366
args.ignore_exc = ignore_exc
367
return process_args
368
369
370
def add_kind_filtering_cli(parser, *, default=None):
371
parser.add_argument('--kinds', action='append')
372
373
def process_args(args, *, argv=None):
374
ns = vars(args)
375
376
kinds = []
377
for kind in ns.pop('kinds') or default or ():
378
kinds.extend(kind.strip().replace(',', ' ').split())
379
380
if not kinds:
381
match_kind = (lambda k: True)
382
else:
383
included = set()
384
excluded = set()
385
for kind in kinds:
386
if kind.startswith('-'):
387
kind = kind[1:]
388
excluded.add(kind)
389
if kind in included:
390
included.remove(kind)
391
else:
392
included.add(kind)
393
if kind in excluded:
394
excluded.remove(kind)
395
if excluded:
396
if included:
397
... # XXX fail?
398
def match_kind(kind, *, _excluded=excluded):
399
return kind not in _excluded
400
else:
401
def match_kind(kind, *, _included=included):
402
return kind in _included
403
args.match_kind = match_kind
404
return process_args
405
406
407
COMMON_CLI = [
408
add_verbosity_cli,
409
add_traceback_cli,
410
#add_dryrun_cli,
411
]
412
413
414
def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
415
arg_processors = {}
416
if isinstance(subset, str):
417
cmdname = subset
418
try:
419
_, argspecs, _ = commands[cmdname]
420
except KeyError:
421
raise ValueError(f'unsupported subset {subset!r}')
422
parser.set_defaults(cmd=cmdname)
423
arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
424
else:
425
if subset is None:
426
cmdnames = subset = list(commands)
427
elif not subset:
428
raise NotImplementedError
429
elif isinstance(subset, set):
430
cmdnames = [k for k in commands if k in subset]
431
subset = sorted(subset)
432
else:
433
cmdnames = [n for n in subset if n in commands]
434
if len(cmdnames) < len(subset):
435
bad = tuple(n for n in subset if n not in commands)
436
raise ValueError(f'unsupported subset {bad}')
437
438
common = argparse.ArgumentParser(add_help=False)
439
common_processors = apply_cli_argspecs(common, commonspecs)
440
subs = parser.add_subparsers(dest='cmd')
441
for cmdname in cmdnames:
442
description, argspecs, _ = commands[cmdname]
443
sub = subs.add_parser(
444
cmdname,
445
description=description,
446
parents=[common],
447
)
448
cmd_processors = _add_cmd_cli(sub, (), argspecs)
449
arg_processors[cmdname] = common_processors + cmd_processors
450
return arg_processors
451
452
453
def _add_cmd_cli(parser, commonspecs, argspecs):
454
processors = []
455
argspecs = list(commonspecs or ()) + list(argspecs or ())
456
for argspec in argspecs:
457
if callable(argspec):
458
procs = argspec(parser)
459
_add_procs(processors, procs)
460
else:
461
if not argspec:
462
raise NotImplementedError
463
args = list(argspec)
464
if not isinstance(args[-1], str):
465
kwargs = args.pop()
466
if not isinstance(args[0], str):
467
try:
468
args, = args
469
except (TypeError, ValueError):
470
parser.error(f'invalid cmd args {argspec!r}')
471
else:
472
kwargs = {}
473
parser.add_argument(*args, **kwargs)
474
# There will be nothing to process.
475
return processors
476
477
478
def _flatten_processors(processors):
479
for proc in processors:
480
if proc is None:
481
continue
482
if callable(proc):
483
yield proc
484
else:
485
yield from _flatten_processors(proc)
486
487
488
def process_args(args, argv, processors, *, keys=None):
489
processors = _flatten_processors(processors)
490
ns = vars(args)
491
extracted = {}
492
if keys is None:
493
for process_args in processors:
494
for key in process_args(args, argv=argv):
495
extracted[key] = ns.pop(key)
496
else:
497
remainder = set(keys)
498
for process_args in processors:
499
hanging = process_args(args, argv=argv)
500
if isinstance(hanging, str):
501
hanging = [hanging]
502
for key in hanging or ():
503
if key not in remainder:
504
raise NotImplementedError(key)
505
extracted[key] = ns.pop(key)
506
remainder.remove(key)
507
if remainder:
508
raise NotImplementedError(sorted(remainder))
509
return extracted
510
511
512
def process_args_by_key(args, argv, processors, keys):
513
extracted = process_args(args, argv, processors, keys=keys)
514
return [extracted[key] for key in keys]
515
516
517
##################################
518
# commands
519
520
def set_command(name, add_cli):
521
"""A decorator factory to set CLI info."""
522
def decorator(func):
523
if hasattr(func, '__cli__'):
524
raise Exception(f'already set')
525
func.__cli__ = (name, add_cli)
526
return func
527
return decorator
528
529
530
##################################
531
# main() helpers
532
533
def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
534
# We expect each filename to be a normalized, absolute path.
535
for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
536
if (reason := check()):
537
logger.debug(f'{filename}: {reason}')
538
continue
539
yield filename
540
541
542
def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
543
filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
544
for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
545
if show:
546
print()
547
print(relfile)
548
print('-------------------------------------------')
549
if (reason := check()):
550
print(reason)
551
continue
552
yield filename, relfile
553
554
555
def _iter_filenames(filenames, process, relroot):
556
if process is None:
557
yield from fsutil.process_filenames(filenames, relroot=relroot)
558
return
559
560
onempty = Exception('no filenames provided')
561
items = process(filenames, relroot=relroot)
562
items, peeked = iterutil.peek_and_iter(items)
563
if not items:
564
raise onempty
565
if isinstance(peeked, str):
566
if relroot and relroot is not fsutil.USE_CWD:
567
relroot = os.path.abspath(relroot)
568
check = (lambda: True)
569
for filename, ismany in iterutil.iter_many(items, onempty):
570
relfile = fsutil.format_filename(filename, relroot, fixroot=False)
571
yield filename, relfile, check, ismany
572
elif len(peeked) == 4:
573
yield from items
574
else:
575
raise NotImplementedError
576
577
578
def track_progress_compact(items, *, groups=5, **mark_kwargs):
579
last = os.linesep
580
marks = iter_marks(groups=groups, **mark_kwargs)
581
for item in items:
582
last = next(marks)
583
print(last, end='', flush=True)
584
yield item
585
if not last.endswith(os.linesep):
586
print()
587
588
589
def track_progress_flat(items, fmt='<{}>'):
590
for item in items:
591
print(fmt.format(item), flush=True)
592
yield item
593
594
595
def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
596
mark = mark or ''
597
group = group if group and group > 1 else 1
598
groups = groups if groups and groups > 1 else 1
599
600
sep = f'{mark}{sep}' if sep else mark
601
end = f'{mark}{os.linesep}'
602
div = os.linesep
603
perline = group * groups
604
if lines is _NOT_SET:
605
# By default we try to put about 100 in each line group.
606
perlines = 100 // perline * perline
607
elif not lines or lines < 0:
608
perlines = None
609
else:
610
perlines = perline * lines
611
612
if perline == 1:
613
yield end
614
elif group == 1:
615
yield sep
616
617
count = 1
618
while True:
619
if count % perline == 0:
620
yield end
621
if perlines and count % perlines == 0:
622
yield div
623
elif count % group == 0:
624
yield sep
625
else:
626
yield mark
627
count += 1
628
629