Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/net/ynl/pyynl/cli.py
49231 views
1
#!/usr/bin/env python3
2
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3
4
import argparse
5
import json
6
import os
7
import pathlib
8
import pprint
9
import sys
10
import textwrap
11
12
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
13
from lib import YnlFamily, Netlink, NlError, SpecFamily
14
15
sys_schema_dir='/usr/share/ynl'
16
relative_schema_dir='../../../../Documentation/netlink'
17
18
def schema_dir():
19
script_dir = os.path.dirname(os.path.abspath(__file__))
20
schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
21
if not os.path.isdir(schema_dir):
22
schema_dir = sys_schema_dir
23
if not os.path.isdir(schema_dir):
24
raise Exception(f"Schema directory {schema_dir} does not exist")
25
return schema_dir
26
27
def spec_dir():
28
spec_dir = schema_dir() + '/specs'
29
if not os.path.isdir(spec_dir):
30
raise Exception(f"Spec directory {spec_dir} does not exist")
31
return spec_dir
32
33
34
class YnlEncoder(json.JSONEncoder):
35
def default(self, obj):
36
if isinstance(obj, bytes):
37
return bytes.hex(obj)
38
if isinstance(obj, set):
39
return list(obj)
40
return json.JSONEncoder.default(self, obj)
41
42
43
def print_attr_list(ynl, attr_names, attr_set, indent=2):
44
"""Print a list of attributes with their types and documentation."""
45
prefix = ' ' * indent
46
for attr_name in attr_names:
47
if attr_name in attr_set.attrs:
48
attr = attr_set.attrs[attr_name]
49
attr_info = f'{prefix}- {attr_name}: {attr.type}'
50
if 'enum' in attr.yaml:
51
enum_name = attr.yaml['enum']
52
attr_info += f" (enum: {enum_name})"
53
# Print enum values if available
54
if enum_name in ynl.consts:
55
const = ynl.consts[enum_name]
56
enum_values = list(const.entries.keys())
57
attr_info += f"\n{prefix} {const.type.capitalize()}: {', '.join(enum_values)}"
58
59
# Show nested attributes reference and recursively display them
60
nested_set_name = None
61
if attr.type == 'nest' and 'nested-attributes' in attr.yaml:
62
nested_set_name = attr.yaml['nested-attributes']
63
attr_info += f" -> {nested_set_name}"
64
65
if attr.yaml.get('doc'):
66
doc_text = textwrap.indent(attr.yaml['doc'], prefix + ' ')
67
attr_info += f"\n{doc_text}"
68
print(attr_info)
69
70
# Recursively show nested attributes
71
if nested_set_name in ynl.attr_sets:
72
nested_set = ynl.attr_sets[nested_set_name]
73
# Filter out 'unspec' and other unused attrs
74
nested_names = [n for n in nested_set.attrs.keys()
75
if nested_set.attrs[n].type != 'unused']
76
if nested_names:
77
print_attr_list(ynl, nested_names, nested_set, indent + 4)
78
79
80
def print_mode_attrs(ynl, mode, mode_spec, attr_set, print_request=True):
81
"""Print a given mode (do/dump/event/notify)."""
82
mode_title = mode.capitalize()
83
84
if print_request and 'request' in mode_spec and 'attributes' in mode_spec['request']:
85
print(f'\n{mode_title} request attributes:')
86
print_attr_list(ynl, mode_spec['request']['attributes'], attr_set)
87
88
if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
89
print(f'\n{mode_title} reply attributes:')
90
print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
91
92
if 'attributes' in mode_spec:
93
print(f'\n{mode_title} attributes:')
94
print_attr_list(ynl, mode_spec['attributes'], attr_set)
95
96
97
def main():
98
description = """
99
YNL CLI utility - a general purpose netlink utility that uses YAML
100
specs to drive protocol encoding and decoding.
101
"""
102
epilog = """
103
The --multi option can be repeated to include several do operations
104
in the same netlink payload.
105
"""
106
107
parser = argparse.ArgumentParser(description=description,
108
epilog=epilog)
109
spec_group = parser.add_mutually_exclusive_group(required=True)
110
spec_group.add_argument('--family', dest='family', type=str,
111
help='name of the netlink FAMILY')
112
spec_group.add_argument('--list-families', action='store_true',
113
help='list all netlink families supported by YNL (has spec)')
114
spec_group.add_argument('--spec', dest='spec', type=str,
115
help='choose the family by SPEC file path')
116
117
parser.add_argument('--schema', dest='schema', type=str)
118
parser.add_argument('--no-schema', action='store_true')
119
parser.add_argument('--json', dest='json_text', type=str)
120
121
group = parser.add_mutually_exclusive_group()
122
group.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
123
group.add_argument('--multi', dest='multi', nargs=2, action='append',
124
metavar=('DO-OPERATION', 'JSON_TEXT'), type=str)
125
group.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
126
group.add_argument('--list-ops', action='store_true')
127
group.add_argument('--list-msgs', action='store_true')
128
group.add_argument('--list-attrs', dest='list_attrs', metavar='OPERATION', type=str,
129
help='List attributes for an operation')
130
group.add_argument('--validate', action='store_true')
131
132
parser.add_argument('--duration', dest='duration', type=int,
133
help='when subscribed, watch for DURATION seconds')
134
parser.add_argument('--sleep', dest='duration', type=int,
135
help='alias for duration')
136
parser.add_argument('--subscribe', dest='ntf', type=str)
137
parser.add_argument('--replace', dest='flags', action='append_const',
138
const=Netlink.NLM_F_REPLACE)
139
parser.add_argument('--excl', dest='flags', action='append_const',
140
const=Netlink.NLM_F_EXCL)
141
parser.add_argument('--create', dest='flags', action='append_const',
142
const=Netlink.NLM_F_CREATE)
143
parser.add_argument('--append', dest='flags', action='append_const',
144
const=Netlink.NLM_F_APPEND)
145
parser.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
146
parser.add_argument('--output-json', action='store_true')
147
parser.add_argument('--dbg-small-recv', default=0, const=4000,
148
action='store', nargs='?', type=int)
149
args = parser.parse_args()
150
151
def output(msg):
152
if args.output_json:
153
print(json.dumps(msg, cls=YnlEncoder))
154
else:
155
pprint.PrettyPrinter().pprint(msg)
156
157
if args.list_families:
158
for filename in sorted(os.listdir(spec_dir())):
159
if filename.endswith('.yaml'):
160
print(filename.removesuffix('.yaml'))
161
return
162
163
if args.no_schema:
164
args.schema = ''
165
166
attrs = {}
167
if args.json_text:
168
attrs = json.loads(args.json_text)
169
170
if args.family:
171
spec = f"{spec_dir()}/{args.family}.yaml"
172
else:
173
spec = args.spec
174
if not os.path.isfile(spec):
175
raise Exception(f"Spec file {spec} does not exist")
176
177
if args.validate:
178
try:
179
SpecFamily(spec, args.schema)
180
except Exception as error:
181
print(error)
182
exit(1)
183
return
184
185
if args.family: # set behaviour when using installed specs
186
if args.schema is None and spec.startswith(sys_schema_dir):
187
args.schema = '' # disable schema validation when installed
188
if args.process_unknown is None:
189
args.process_unknown = True
190
191
ynl = YnlFamily(spec, args.schema, args.process_unknown,
192
recv_size=args.dbg_small_recv)
193
if args.dbg_small_recv:
194
ynl.set_recv_dbg(True)
195
196
if args.ntf:
197
ynl.ntf_subscribe(args.ntf)
198
199
if args.list_ops:
200
for op_name, op in ynl.ops.items():
201
print(op_name, " [", ", ".join(op.modes), "]")
202
if args.list_msgs:
203
for op_name, op in ynl.msgs.items():
204
print(op_name, " [", ", ".join(op.modes), "]")
205
206
if args.list_attrs:
207
op = ynl.msgs.get(args.list_attrs)
208
if not op:
209
print(f'Operation {args.list_attrs} not found')
210
exit(1)
211
212
print(f'Operation: {op.name}')
213
print(op.yaml['doc'])
214
215
for mode in ['do', 'dump', 'event']:
216
if mode in op.yaml:
217
print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, True)
218
219
if 'notify' in op.yaml:
220
mode_spec = op.yaml['notify']
221
ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
222
if ref_spec:
223
print_mode_attrs(ynl, 'notify', ref_spec, op.attr_set, False)
224
225
if 'mcgrp' in op.yaml:
226
print(f"\nMulticast group: {op.yaml['mcgrp']}")
227
228
try:
229
if args.do:
230
reply = ynl.do(args.do, attrs, args.flags)
231
output(reply)
232
if args.dump:
233
reply = ynl.dump(args.dump, attrs)
234
output(reply)
235
if args.multi:
236
ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
237
reply = ynl.do_multi(ops)
238
output(reply)
239
240
if args.ntf:
241
for msg in ynl.poll_ntf(duration=args.duration):
242
output(msg)
243
except NlError as e:
244
print(e)
245
exit(1)
246
except KeyboardInterrupt:
247
pass
248
except BrokenPipeError:
249
pass
250
251
252
if __name__ == "__main__":
253
main()
254
255