Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/hid/tests/base_device.py
26308 views
1
#!/bin/env python3
2
# SPDX-License-Identifier: GPL-2.0
3
# -*- coding: utf-8 -*-
4
#
5
# Copyright (c) 2017 Benjamin Tissoires <[email protected]>
6
# Copyright (c) 2017 Red Hat, Inc.
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation; either version 2 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21
import dataclasses
22
import fcntl
23
import functools
24
import libevdev
25
import os
26
import threading
27
28
try:
29
import pyudev
30
except ImportError:
31
raise ImportError("UHID is not supported due to missing pyudev dependency")
32
33
import logging
34
35
import hidtools.hid as hid
36
from hidtools.uhid import UHIDDevice
37
from hidtools.util import BusType
38
39
from pathlib import Path
40
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
41
42
logger = logging.getLogger("hidtools.device.base_device")
43
44
45
class SysfsFile(object):
46
def __init__(self, path):
47
self.path = path
48
49
def __set_value(self, value):
50
with open(self.path, "w") as f:
51
return f.write(f"{value}\n")
52
53
def __get_value(self):
54
with open(self.path) as f:
55
return f.read().strip()
56
57
@property
58
def int_value(self) -> int:
59
return int(self.__get_value())
60
61
@int_value.setter
62
def int_value(self, v: int) -> None:
63
self.__set_value(v)
64
65
@property
66
def str_value(self) -> str:
67
return self.__get_value()
68
69
@str_value.setter
70
def str_value(self, v: str) -> None:
71
self.__set_value(v)
72
73
74
class LED(object):
75
def __init__(self, sys_path):
76
self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
77
self.__brightness = SysfsFile(sys_path / "brightness")
78
79
@property
80
def brightness(self) -> int:
81
return self.__brightness.int_value
82
83
@brightness.setter
84
def brightness(self, value: int) -> None:
85
self.__brightness.int_value = value
86
87
88
class PowerSupply(object):
89
"""Represents Linux power_supply_class sysfs nodes."""
90
91
def __init__(self, sys_path):
92
self._capacity = SysfsFile(sys_path / "capacity")
93
self._status = SysfsFile(sys_path / "status")
94
self._type = SysfsFile(sys_path / "type")
95
96
@property
97
def capacity(self) -> int:
98
return self._capacity.int_value
99
100
@property
101
def status(self) -> str:
102
return self._status.str_value
103
104
@property
105
def type(self) -> str:
106
return self._type.str_value
107
108
109
@dataclasses.dataclass
110
class HidReadiness:
111
is_ready: bool = False
112
count: int = 0
113
114
115
class HIDIsReady(object):
116
"""
117
Companion class that binds to a kernel mechanism
118
and that allows to know when a uhid device is ready or not.
119
120
See :meth:`is_ready` for details.
121
"""
122
123
def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
124
self.uhid = uhid
125
126
def is_ready(self: "HIDIsReady") -> HidReadiness:
127
"""
128
Overwrite in subclasses: should return True or False whether
129
the attached uhid device is ready or not.
130
"""
131
return HidReadiness()
132
133
134
class UdevHIDIsReady(HIDIsReady):
135
_pyudev_context: ClassVar[Optional[pyudev.Context]] = None
136
_pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
137
_uhid_devices: ClassVar[Dict[int, HidReadiness]] = {}
138
139
def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
140
super().__init__(uhid)
141
self._init_pyudev()
142
143
@classmethod
144
def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
145
if cls._pyudev_context is None:
146
cls._pyudev_context = pyudev.Context()
147
cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
148
cls._pyudev_monitor.filter_by("hid")
149
cls._pyudev_monitor.start()
150
151
UHIDDevice._append_fd_to_poll(
152
cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
153
)
154
155
@classmethod
156
def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
157
if cls._pyudev_monitor is None:
158
return
159
event: pyudev.Device
160
for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
161
if event.action not in ["bind", "remove", "unbind"]:
162
return
163
164
logger.debug(f"udev event: {event.action} -> {event}")
165
166
id = int(event.sys_path.strip().split(".")[-1], 16)
167
168
readiness = cls._uhid_devices.setdefault(id, HidReadiness())
169
170
ready = event.action == "bind"
171
if not readiness.is_ready and ready:
172
readiness.count += 1
173
174
readiness.is_ready = ready
175
176
def is_ready(self: "UdevHIDIsReady") -> HidReadiness:
177
try:
178
return self._uhid_devices[self.uhid.hid_id]
179
except KeyError:
180
return HidReadiness()
181
182
183
class EvdevMatch(object):
184
def __init__(
185
self: "EvdevMatch",
186
*,
187
requires: List[Any] = [],
188
excludes: List[Any] = [],
189
req_properties: List[Any] = [],
190
excl_properties: List[Any] = [],
191
) -> None:
192
self.requires = requires
193
self.excludes = excludes
194
self.req_properties = req_properties
195
self.excl_properties = excl_properties
196
197
def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
198
for m in self.requires:
199
if not evdev.has(m):
200
return False
201
for m in self.excludes:
202
if evdev.has(m):
203
return False
204
for p in self.req_properties:
205
if not evdev.has_property(p):
206
return False
207
for p in self.excl_properties:
208
if evdev.has_property(p):
209
return False
210
return True
211
212
213
class EvdevDevice(object):
214
"""
215
Represents an Evdev node and its properties.
216
This is a stub for the libevdev devices, as they are relying on
217
uevent to get the data, saving us some ioctls to fetch the names
218
and properties.
219
"""
220
221
def __init__(self: "EvdevDevice", sysfs: Path) -> None:
222
self.sysfs = sysfs
223
self.event_node: Any = None
224
self.libevdev: Optional[libevdev.Device] = None
225
226
self.uevents = {}
227
# all of the interesting properties are stored in the input uevent, so in the parent
228
# so convert the uevent file of the parent input node into a dict
229
with open(sysfs.parent / "uevent") as f:
230
for line in f.readlines():
231
key, value = line.strip().split("=")
232
self.uevents[key] = value.strip('"')
233
234
# we open all evdev nodes in order to not miss any event
235
self.open()
236
237
@property
238
def name(self: "EvdevDevice") -> str:
239
assert "NAME" in self.uevents
240
241
return self.uevents["NAME"]
242
243
@property
244
def evdev(self: "EvdevDevice") -> Path:
245
return Path("/dev/input") / self.sysfs.name
246
247
def matches_application(
248
self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
249
) -> bool:
250
if self.libevdev is None:
251
return False
252
253
if application in matches:
254
return matches[application].is_a_match(self.libevdev)
255
256
logger.error(
257
f"application '{application}' is unknown, please update/fix hid-tools"
258
)
259
assert False # hid-tools likely needs an update
260
261
def open(self: "EvdevDevice") -> libevdev.Device:
262
self.event_node = open(self.evdev, "rb")
263
self.libevdev = libevdev.Device(self.event_node)
264
265
assert self.libevdev.fd is not None
266
267
fd = self.libevdev.fd.fileno()
268
flag = fcntl.fcntl(fd, fcntl.F_GETFD)
269
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
270
271
return self.libevdev
272
273
def close(self: "EvdevDevice") -> None:
274
if self.libevdev is not None and self.libevdev.fd is not None:
275
self.libevdev.fd.close()
276
self.libevdev = None
277
if self.event_node is not None:
278
self.event_node.close()
279
self.event_node = None
280
281
282
class BaseDevice(UHIDDevice):
283
# default _application_matches that matches nothing. This needs
284
# to be set in the subclasses to have get_evdev() working
285
_application_matches: Dict[str, EvdevMatch] = {}
286
287
def __init__(
288
self,
289
name,
290
application,
291
rdesc_str: Optional[str] = None,
292
rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
293
input_info=None,
294
) -> None:
295
self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
296
if rdesc_str is None and rdesc is None:
297
raise Exception("Please provide at least a rdesc or rdesc_str")
298
super().__init__()
299
if name is None:
300
name = f"uhid gamepad test {self.__class__.__name__}"
301
if input_info is None:
302
input_info = (BusType.USB, 1, 2)
303
self.name = name
304
self.info = input_info
305
self.default_reportID = None
306
self.opened = False
307
self.started = False
308
self.application = application
309
self._input_nodes: Optional[list[EvdevDevice]] = None
310
if rdesc is None:
311
assert rdesc_str is not None
312
self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore
313
else:
314
self.rdesc = rdesc # type: ignore
315
316
@property
317
def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
318
ps = self.walk_sysfs("power_supply", "power_supply/*")
319
if ps is None or len(ps) < 1:
320
return None
321
322
return PowerSupply(ps[0])
323
324
@property
325
def led_classes(self: "BaseDevice") -> List[LED]:
326
leds = self.walk_sysfs("led", "**/max_brightness")
327
if leds is None:
328
return []
329
330
return [LED(led.parent) for led in leds]
331
332
@property
333
def kernel_is_ready(self: "BaseDevice") -> bool:
334
return self._kernel_is_ready.is_ready().is_ready and self.started
335
336
@property
337
def kernel_ready_count(self: "BaseDevice") -> int:
338
return self._kernel_is_ready.is_ready().count
339
340
@property
341
def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
342
if self._input_nodes is not None:
343
return self._input_nodes
344
345
if not self.kernel_is_ready or not self.started:
346
return []
347
348
# Starting with kernel v6.16, an event is emitted when
349
# userspace opens a kernel device, and for some devices
350
# this translates into a SET_REPORT.
351
# Because EvdevDevice(path) opens every single evdev node
352
# we need to have a separate thread to process the incoming
353
# SET_REPORT or we end up having to wait for the kernel
354
# timeout of 5 seconds.
355
done = False
356
357
def dispatch():
358
while not done:
359
self.dispatch(1)
360
361
t = threading.Thread(target=dispatch)
362
t.start()
363
364
self._input_nodes = [
365
EvdevDevice(path)
366
for path in self.walk_sysfs("input", "input/input*/event*")
367
]
368
done = True
369
t.join()
370
return self._input_nodes
371
372
def match_evdev_rule(self, application, evdev):
373
"""Replace this in subclasses if the device has multiple reports
374
of the same type and we need to filter based on the actual evdev
375
node.
376
377
returning True will append the corresponding report to
378
`self.input_nodes[type]`
379
returning False will ignore this report / type combination
380
for the device.
381
"""
382
return True
383
384
def open(self):
385
self.opened = True
386
387
def _close_all_opened_evdev(self):
388
if self._input_nodes is not None:
389
for e in self._input_nodes:
390
e.close()
391
392
def __del__(self):
393
self._close_all_opened_evdev()
394
395
def close(self):
396
self.opened = False
397
398
def start(self, flags):
399
self.started = True
400
401
def stop(self):
402
self.started = False
403
self._close_all_opened_evdev()
404
405
def next_sync_events(self, application=None):
406
evdev = self.get_evdev(application)
407
if evdev is not None:
408
return list(evdev.events())
409
return []
410
411
@property
412
def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
413
return self._application_matches
414
415
@application_matches.setter
416
def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
417
self._application_matches = data
418
419
def get_evdev(self, application=None):
420
if application is None:
421
application = self.application
422
423
if len(self.input_nodes) == 0:
424
return None
425
426
assert self._input_nodes is not None
427
428
if len(self._input_nodes) == 1:
429
evdev = self._input_nodes[0]
430
if self.match_evdev_rule(application, evdev.libevdev):
431
return evdev.libevdev
432
else:
433
for _evdev in self._input_nodes:
434
if _evdev.matches_application(application, self.application_matches):
435
if self.match_evdev_rule(application, _evdev.libevdev):
436
return _evdev.libevdev
437
438
def is_ready(self):
439
"""Returns whether a UHID device is ready. Can be overwritten in
440
subclasses to add extra conditions on when to consider a UHID
441
device ready. This can be:
442
443
- we need to wait on different types of input devices to be ready
444
(Touch Screen and Pen for example)
445
- we need to have at least 4 LEDs present
446
(len(self.uhdev.leds_classes) == 4)
447
- or any other combinations"""
448
return self.kernel_is_ready
449
450