Skip to content

Commit

Permalink
Fill in some type annotations (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamsorcerer authored Feb 20, 2025
1 parent 6523e3f commit 7916a7b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 55 deletions.
32 changes: 19 additions & 13 deletions src/evdev/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import os
from typing import NamedTuple, Tuple, Union
from typing import Dict, Iterator, List, Literal, NamedTuple, Tuple, Union, overload

from . import _input, ecodes, util

Expand Down Expand Up @@ -95,7 +95,7 @@ class DeviceInfo(NamedTuple):
product: int
version: int

def __str__(self):
def __str__(self) -> str:
msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}"
return msg.format(*self) # pylint: disable=not-an-iterable

Expand Down Expand Up @@ -151,7 +151,7 @@ def __init__(self, dev: Union[str, bytes, os.PathLike]):
#: The number of force feedback effects the device can keep in its memory.
self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd)

def __del__(self):
def __del__(self) -> None:
if hasattr(self, "fd") and self.fd is not None:
try:
self.close()
Expand All @@ -176,7 +176,13 @@ def _capabilities(self, absinfo: bool = True):

return res

def capabilities(self, verbose: bool = False, absinfo: bool = True):
@overload
def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]:
...
@overload
def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]:
...
def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]:
"""
Return the event types that this device supports as a mapping of
supported event types to lists of handled event codes.
Expand Down Expand Up @@ -263,7 +269,7 @@ def leds(self, verbose: bool = False):

return leds

def set_led(self, led_num: int, value: int):
def set_led(self, led_num: int, value: int) -> None:
"""
Set the state of the selected LED.
Expand All @@ -279,26 +285,26 @@ def __eq__(self, other):
"""
return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path

def __str__(self):
def __str__(self) -> str:
msg = 'device {}, name "{}", phys "{}", uniq "{}"'
return msg.format(self.path, self.name, self.phys, self.uniq or "")

def __repr__(self):
def __repr__(self) -> str:
msg = (self.__class__.__name__, self.path)
return "{}({!r})".format(*msg)

def __fspath__(self):
return self.path

def close(self):
def close(self) -> None:
if self.fd > -1:
try:
super().close()
os.close(self.fd)
finally:
self.fd = -1

def grab(self):
def grab(self) -> None:
"""
Grab input device using ``EVIOCGRAB`` - other applications will
be unable to receive events until the device is released. Only
Expand All @@ -311,7 +317,7 @@ def grab(self):

_input.ioctl_EVIOCGRAB(self.fd, 1)

def ungrab(self):
def ungrab(self) -> None:
"""
Release device if it has been already grabbed (uses `EVIOCGRAB`).
Expand All @@ -324,7 +330,7 @@ def ungrab(self):
_input.ioctl_EVIOCGRAB(self.fd, 0)

@contextlib.contextmanager
def grab_context(self):
def grab_context(self) -> Iterator[None]:
"""
A context manager for the duration of which only the current
process will be able to receive events from the device.
Expand All @@ -342,7 +348,7 @@ def upload_effect(self, effect: "ff.Effect"):
ff_id = _input.upload_effect(self.fd, data)
return ff_id

def erase_effect(self, ff_id):
def erase_effect(self, ff_id) -> None:
"""
Erase a force effect from a force feedback device. This also
stops the effect.
Expand Down Expand Up @@ -402,7 +408,7 @@ def absinfo(self, axis_num: int):
"""
return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num))

def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None):
def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None:
"""
Update :class:`AbsInfo` values. Only specified values will be overwritten.
Expand Down
87 changes: 47 additions & 40 deletions src/evdev/eventio_async.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
import asyncio
import select
import sys

from . import eventio
from .events import InputEvent

# needed for compatibility
from .eventio import EvdevError

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing import Any as Self


class ReadIterator:
def __init__(self, device):
self.current_batch = iter(())
self.device = device

# Standard iterator protocol.
def __iter__(self) -> Self:
return self

def __next__(self) -> InputEvent:
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)

def __aiter__(self) -> Self:
return self

def __anext__(self) -> "asyncio.Future[InputEvent]":
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:

def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)

self.device.async_read().add_done_callback(next_batch_ready)
return future


class EventIO(eventio.EventIO):
def _do_when_readable(self, callback):
Expand Down Expand Up @@ -42,7 +88,7 @@ def async_read(self):
self._do_when_readable(lambda: self._set_result(future, self.read))
return future

def async_read_loop(self):
def async_read_loop(self) -> ReadIterator:
"""
Return an iterator that yields input events. This iterator is
compatible with the ``async for`` syntax.
Expand All @@ -58,42 +104,3 @@ def close(self):
# no event loop present, so there is nothing to
# remove the reader from. Ignore
pass


class ReadIterator:
def __init__(self, device):
self.current_batch = iter(())
self.device = device

# Standard iterator protocol.
def __iter__(self):
return self

def __next__(self):
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)

def __aiter__(self):
return self

def __anext__(self):
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:

def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)

self.device.async_read().add_done_callback(next_batch_ready)
return future
4 changes: 2 additions & 2 deletions src/evdev/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Union, List

from . import ecodes
from .events import event_factory
from .events import InputEvent, event_factory


def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]:
Expand All @@ -32,7 +32,7 @@ def is_device(fn: Union[str, bytes, os.PathLike]) -> bool:
return True


def categorize(event):
def categorize(event: InputEvent) -> InputEvent:
"""
Categorize an event according to its type.
Expand Down

0 comments on commit 7916a7b

Please sign in to comment.