Skip to content

Commit 559337d

Browse files
committed
Overhaul to networkmanager, using supported library
1 parent 0fc66b1 commit 559337d

File tree

4 files changed

+113
-101
lines changed

4 files changed

+113
-101
lines changed

lego_lcd/wifi_connect/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The module has the following functions exposed:
1010
* `has_internet` - checks if there is a current active internet connection
1111
* `local_ip` - gets the local IP of the machine
1212
* `external_ip` - gets the external IP of the machine
13-
* `get_list_of_access_points` - gets a list of all access points
13+
* `get_all_access_points` - gets a list of all access points
1414
* `delete_all_wifi_connections` - remove all existing wifi connections
15+
* `connect_to_ap` - connect to an access point
1516
*

lego_lcd/wifi_connect/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
from .utils import have_internet, local_ip, external_ip
2-
from .netman import delete_all_wifi_connections, get_list_of_access_points
2+
from .netman import delete_all_wifi_connections, get_all_access_points, SecurityType, connect_to_ap

lego_lcd/wifi_connect/netman.py

+109-98
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,38 @@
11
# Start a local hotspot using NetworkManager.
2-
3-
# You must use https://developer.gnome.org/NetworkManager/1.2/spec.html
4-
# to see the DBUS API that the python-NetworkManager module is communicating
5-
# over (the module documentation is scant).
2+
# Uses the NetworkManager D-Bus API to communicate with NetworkManager.
63

74
from contextlib import contextmanager
8-
from dataclasses import dataclass
95
from enum import Enum
106
from uuid import uuid4
117
from time import sleep
12-
13-
import NetworkManager
8+
from ipaddress import ip_address
9+
10+
import sdbus
11+
from sdbus_block.networkmanager import (
12+
NetworkManager, NetworkManagerSettings,
13+
NetworkConnectionSettings, NetworkDeviceGeneric, NetworkDeviceWireless, AccessPoint
14+
)
15+
from sdbus_block.networkmanager.settings import ConnectionProfile
16+
from sdbus_block.networkmanager.enums import DeviceType, DeviceState
17+
try:
18+
from sdbus_block.networkmanager.enums import (
19+
WifiAccessPointCapabilitiesFlags, WifiAccessPointSecurityFlags
20+
)
21+
except ImportError:
22+
# For older versions of sdbus_block.networkmanager
23+
from sdbus_block.networkmanager.enums import (
24+
AccessPointCapabilities as WifiAccessPointCapabilitiesFlags,
25+
WpaSecurityFlags as WifiAccessPointSecurityFlags
26+
)
1427

1528
from .defaults import HOTSPOT_CONNECTION_NAME, GENERIC_CONNECTION_NAME
1629
from .defaults import DEFAULT_GATEWAY, DEFAULT_PREFIX, DEFAULT_INTERFACE
1730

1831

32+
# Set the default bus to the system bus - recommended for NetworkManager
33+
sdbus.set_default_bus(sdbus.sd_bus_open_system())
34+
35+
1936
class SecurityType(Enum):
2037
"""Enum for the different types of security an AP can have."""
2138
NONE = 0
@@ -26,89 +43,104 @@ class SecurityType(Enum):
2643
HIDDEN = 16
2744

2845

46+
def __all_connections() -> list[NetworkConnectionSettings]:
47+
"""Return a list of all known connections."""
48+
return [NetworkConnectionSettings(path) for path in NetworkManagerSettings().connections]
49+
50+
51+
def __filter_connections(key: str, value) -> list[NetworkConnectionSettings]:
52+
"""Return a list of connections that have the given key and value."""
53+
return [conn for conn in __all_connections()
54+
if conn.get_settings()["connection"][key][1] == value]
55+
56+
57+
def __find_connection(name: str) -> NetworkConnectionSettings|None:
58+
connections = __filter_connections("id", name)
59+
return connections[0] if connections else None
60+
61+
62+
def __all_wifi_devices() -> list[NetworkDeviceWireless]:
63+
"""Return a list of all known wifi devices."""
64+
all_devices = [(path, NetworkDeviceGeneric(path)) for path in NetworkManager().devices]
65+
return [NetworkDeviceWireless(path) for (path, dev) in all_devices
66+
if dev.device_type == DeviceType.WIFI]
67+
68+
69+
def __first_wifi_device_path() -> str|None:
70+
"""Returns the first known wifi device path."""
71+
return next((path for path in NetworkManager().devices
72+
if NetworkDeviceGeneric(path).device_type == DeviceType.WIFI), None)
73+
74+
2975
def delete_all_wifi_connections() -> None:
3076
"""
3177
Remove ALL wifi connections - to start clean or before running the hotspot.
3278
"""
33-
# Get all known connections
34-
connections = NetworkManager.Settings.ListConnections()
35-
3679
# Delete the '802-11-wireless' connections
37-
for connection in connections:
38-
if connection.GetSettings()["connection"]["type"] == "802-11-wireless":
39-
connection.Delete()
80+
for connection in __filter_connections("type", "802-11-wireless"):
81+
connection.delete()
4082
sleep(2)
4183

4284

43-
def __find_connection(name: str) -> object|None:
44-
connections = NetworkManager.Settings.ListConnections()
45-
return next((conn for conn in connections
46-
if conn.GetSettings()['connection']['id'] == name), None)
47-
48-
4985
def stop_connection(name: str = GENERIC_CONNECTION_NAME) -> bool:
5086
"""Generic connection stopper / deleter."""
5187
conn = __find_connection(name)
5288
if conn is None:
5389
return False
54-
conn.Delete()
90+
conn.delete()
5591
sleep(2)
5692
return True
5793

5894

59-
def get_list_of_access_points() -> dict[str, SecurityType]:
60-
"""
61-
Return a dictionary of available SSIDs and their security type, or {} for none available or error.
62-
"""
95+
def get_all_access_points() -> dict[str, SecurityType]:
96+
"""Return a dictionary of available SSIDs and their security type."""
97+
# Ignores duplicate SSIDs, only keeps the last one found
6398
aps = {}
64-
for dev in NetworkManager.NetworkManager.GetDevices():
65-
if dev.DeviceType != NetworkManager.NM_DEVICE_TYPE_WIFI:
66-
continue
67-
for ap in dev.GetAccessPoints():
68-
aps[ap.Ssid] = __get_security_type(ap)
99+
for dev in __all_wifi_devices():
100+
for ap in (AccessPoint(ap) for ap in dev.access_points):
101+
# TODO: save max(ap.strength)?
102+
aps[ap.ssid] = __get_security_type(ap)
69103
return aps
70104

71105

72-
def __get_security_type(ap) -> SecurityType:
106+
def __get_security_type(ap: AccessPoint) -> SecurityType:
73107
"""
74108
Return the security type of the given SSID, or None if not found.
75109
"""
76-
# Get Flags, WpaFlags and RsnFlags, all are bit OR'd combinations
77-
# of the NM_802_11_AP_SEC_* bit flags.
78-
# https://developer.gnome.org/NetworkManager/1.2/nm-dbus-types.html#NM80211ApSecurityFlags
79-
80-
security = SecurityType.NONE
81-
82-
# Based on a subset of the flag settings we can determine which
83-
# type of security this AP uses.
84-
# We can also determine what input we need from the user to connect to
85-
# any given AP (required for our dynamic UI form).
86-
if (ap.Flags & NetworkManager.NM_802_11_AP_FLAGS_PRIVACY and
87-
ap.WpaFlags == NetworkManager.NM_802_11_AP_SEC_NONE and
88-
ap.RsnFlags == NetworkManager.NM_802_11_AP_SEC_NONE):
89-
security = SecurityType.WEP
90-
91-
if ap.WpaFlags != NetworkManager.NM_802_11_AP_SEC_NONE:
92-
security = SecurityType.WPA
93-
94-
if ap.RsnFlags != NetworkManager.NM_802_11_AP_SEC_NONE:
95-
security = SecurityType.WPA2
96-
97-
if (ap.WpaFlags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_802_1X or
98-
ap.RsnFlags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_802_1X):
99-
security = SecurityType.ENTERPRISE
100-
101-
#print(f'{ap.Ssid:15} Flags=0x{ap.Flags:X} WpaFlags=0x{ap.WpaFlags:X} RsnFlags=0x{ap.RsnFlags:X}')
102-
103-
return security
110+
# The wpa and rsn (i.e. WPA2) flags can be used to determine the general security type
111+
if (ap.wpa_flags | ap.rsn_flags) & WifiAccessPointSecurityFlags.KEY_MGMT_802_1X:
112+
return SecurityType.ENTERPRISE
113+
if ap.rsn_flags != WifiAccessPointSecurityFlags.NONE:
114+
return SecurityType.WPA2
115+
if ap.wpa_flags != WifiAccessPointSecurityFlags.NONE:
116+
return SecurityType.WPA
117+
if ap.flags & WifiAccessPointCapabilitiesFlags.PRIVACY:
118+
return SecurityType.WEP
119+
return SecurityType.NONE
120+
121+
122+
def connect_wifi(conn_info: dict) -> None:
123+
"""Create and activate a wifi connection using NetworkManager."""
124+
# Add and activate the connection
125+
dev_path = __first_wifi_device_path()
126+
profile = ConnectionProfile.from_settings_dict(conn_info)
127+
NetworkManager().add_and_activate_connection(profile.to_dbus(), dev_path, "/")
128+
# Wait for the connection to activate
129+
loop_count = 0
130+
dev = NetworkDeviceWireless(dev_path)
131+
while dev.state != DeviceState.ACTIVATED:
132+
sleep(1)
133+
loop_count += 1
134+
if loop_count > 30: # only wait 30 seconds max
135+
raise TimeoutError(f"Connection {conn_info['connection']['id']} failed to activate.")
104136

105137

106138
def connect_to_ap(ssid: str, password: str|None = None, username: str|None = None,
107-
conn_name: str = GENERIC_CONNECTION_NAME) -> None:
139+
hidden: bool = False, conn_name: str = GENERIC_CONNECTION_NAME) -> None:
108140
"""
109141
Connect to the given SSID with the given optional username and password.
110142
"""
111-
conn_dict = __generic_connection_dict(conn_name, ssid)
143+
conn_dict = __generic_connection_profile(conn_name, ssid, hidden)
112144
if password is None:
113145
# No auth, 'open' connection
114146
pass
@@ -122,71 +154,50 @@ def connect_to_ap(ssid: str, password: str|None = None, username: str|None = Non
122154
# Enterprise, WPA-EAP, username and password required
123155
conn_dict['802-11-wireless']['security'] = '802-11-wireless-security'
124156
conn_dict['802-11-wireless-security'] = {'auth-alg': 'open', 'key-mgmt': 'wpa-eap'}
125-
conn_dict['802-1x'] = {'eap': ['peap'], 'identity': username, 'password': password, 'phase2-auth': 'mschapv2'}
126-
127-
connect_wifi(conn_dict)
128-
129-
130-
131-
def connect_wifi(connection_info: dict) -> None:
132-
"""Create and activate a wifi connection using NetworkManager."""
157+
conn_dict['802-1x'] = {'eap': ['peap'], 'phase2-auth': 'mschapv2',
158+
'identity': username, 'password': password,}
133159

134-
name = connection_info['connection']['id']
160+
connect_wifi(conn_dict)
135161

136-
# Add and activate the connection
137-
NetworkManager.Settings.AddConnection(connection_info)
138-
conn = __find_connection(name)
139-
dev = next(dev for dev in NetworkManager.NetworkManager.GetDevices()
140-
if dev.DeviceType == NetworkManager.NM_DEVICE_TYPE_WIFI)
141-
NetworkManager.NetworkManager.ActivateConnection(conn, dev, "/")
142162

143-
# Wait for ADDRCONF(NETDEV_CHANGE): wlan0: link becomes ready
144-
print(f'Waiting for connection to become active...')
145-
loop_count = 0
146-
while dev.State != NetworkManager.NM_DEVICE_STATE_ACTIVATED:
147-
sleep(1)
148-
loop_count += 1
149-
if loop_count > 30: # only wait 30 seconds max
150-
raise TimeoutError(f"Connection {name} failed to activate.")
163+
def __get_ip_address_int(ip: str) -> int:
164+
# the bytes are reversed for NetworkManager...
165+
return int(ip_address('.'.join(ip.split('.')[::-1])))
151166

152167

153-
def __generic_connection_dict(name: str, ssid: str) -> dict:
168+
def __generic_connection_profile(name: str, ssid: str, hidden: bool = False) -> dict:
169+
"""Return a generic connection profile for the given name and ssid. Has no security."""
170+
wifi = {'mode': 'infrastructure', 'ssid': ssid}
171+
if hidden: wifi['hidden'] = True
154172
return {
155-
'802-11-wireless': {'mode': 'infrastructure', 'ssid': ssid},
173+
'802-11-wireless': hidden,
156174
'connection': {'id': name, 'type': '802-11-wireless', 'uuid': str(uuid4())},
157-
'ipv4': {'method': 'auto'},
158-
'ipv6': {'method': 'auto'},
175+
'ipv4': {'method': 'auto'}, 'ipv6': {'method': 'auto'},
159176
}
160177

161178

162179
def start_hotspot(ssid: str,
163180
address: str = DEFAULT_GATEWAY, prefix: int = DEFAULT_PREFIX,
164181
interface: str = DEFAULT_INTERFACE, name: str = HOTSPOT_CONNECTION_NAME) -> None:
165182
"""Start a local hotspot on the wifi interface."""
166-
conn = __generic_connection_dict(name, ssid)
183+
conn = __generic_connection_profile(name, ssid)
167184
conn['802-11-wireless'] |= {'band': 'bg', 'mode': 'ap'}
168185
conn['connection'] |= {'autoconnect': False, 'interface-name': interface}
169-
conn['ipv4'] = {'address-data': [{'address': 'address', 'prefix': prefix}],
170-
'addresses': [[address, prefix, '0.0.0.0']],
186+
conn['ipv4'] = {'address-data': [{'address': address, 'prefix': prefix}], 'gateway': '0.0.0.0',
187+
#'addresses': [[__get_ip_address_int(address), prefix, 0]], # 0 == '0.0.0.0'
171188
'method': 'manual'}
172189
connect_wifi(conn)
173190

174191

175192
def stop_hotspot(name: str = HOTSPOT_CONNECTION_NAME) -> bool:
176-
"""
177-
Stop and delete the hotspot.
178-
Returns True for success or False (for hotspot not found or error).
179-
"""
193+
"""Stop and delete the hotspot. Returns True for success or False (for not found)."""
180194
return stop_connection(name)
181195

182196

183197
@contextmanager
184198
def hotspot(ssid: str, address: str = DEFAULT_GATEWAY, prefix: int = DEFAULT_PREFIX,
185199
interface: str = DEFAULT_INTERFACE, name: str = HOTSPOT_CONNECTION_NAME):
186-
"""
187-
Context manager that starts a hotspot with the given ssid, address, prefix, and interface, and then
188-
stops it when the context is exited.
189-
"""
200+
"""Context manager that runs a hotspot with the given ssid."""
190201
start_hotspot(ssid, address, prefix, interface, name)
191202
try:
192203
yield

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ name = "lego-lcd"
77
version = "0.1"
88
readme = "README.md"
99
requires-python = ">= 3.8"
10-
dependencies = ["python-networkmanager"]
10+
dependencies = ["sdbus-networkmanager"]
1111

1212
[project.scripts]
1313
lcd-clock = "lego_lcd.clock:main"

0 commit comments

Comments
 (0)