-
Notifications
You must be signed in to change notification settings - Fork 88
/
Copy pathMAVLinkServer.py
81 lines (68 loc) · 3.24 KB
/
MAVLinkServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import re
import subprocess
from typing import Optional
from mavlink_proxy.AbstractRouter import AbstractRouter, TLogCondition
from mavlink_proxy.Endpoint import Endpoint, EndpointType
class MAVLinkServer(AbstractRouter):
def __init__(self) -> None:
super().__init__()
def _get_version(self) -> Optional[str]:
binary = self.binary()
assert binary is not None
for line in subprocess.check_output([binary, "--version"]).decode("utf-8").split("\n"):
regex = re.search(r"mavlink-server (?P<version>\S+)\b", line)
if regex:
return regex.group("version")
return None
def assemble_command(self, master_endpoint: Endpoint) -> str:
# Convert endpoint format to mavlink-router format
def convert_endpoint(endpoint: Endpoint) -> str:
if endpoint.connection_type == EndpointType.Serial:
return f"serial:{endpoint.place}:{endpoint.argument}"
if endpoint.connection_type == EndpointType.TCPServer:
return f"tcps:{endpoint.place}:{endpoint.argument}"
if endpoint.connection_type == EndpointType.TCPClient:
return f"tcpc:{endpoint.place}:{endpoint.argument}"
if endpoint.connection_type == EndpointType.UDPServer:
return f"udps:{endpoint.place}:{endpoint.argument}"
if endpoint.connection_type == EndpointType.UDPClient:
return f"udpc:{endpoint.place}:{endpoint.argument}"
if endpoint.connection_type == EndpointType.Zenoh:
return f"zenoh:{endpoint.place}:{endpoint.argument}"
raise ValueError(f"Endpoint of type {endpoint.connection_type} not supported on MAVLink-Server.")
def convert_tlog_condition(tlog_condition: TLogCondition) -> str:
match tlog_condition:
case TLogCondition.Always:
return "?when=always"
case TLogCondition.WhileArmed:
return "?when=while_armed"
tlog_condition_arg = convert_tlog_condition(self.tlog_condition())
logging_endpoint = f"tlogwriter://{self.logdir()}{tlog_condition_arg}"
str_endpoints = [convert_endpoint(endpoint) for endpoint in [master_endpoint, *self.endpoints()]]
endpoints = " ".join([*str_endpoints, logging_endpoint])
return f"{self.binary()} {endpoints}"
@staticmethod
def name() -> str:
return "MAVLink-Server"
@staticmethod
def binary_name() -> str:
return "mavlink-server"
@staticmethod
def _validate_endpoint(endpoint: Endpoint) -> None:
valid_connection_types = [
EndpointType.UDPClient,
EndpointType.UDPServer,
EndpointType.TCPServer,
EndpointType.TCPClient,
EndpointType.Serial,
EndpointType.Zenoh,
]
if endpoint.connection_type not in valid_connection_types:
raise ValueError(f"Connection_type '{endpoint.connection_type}' not supported by {MAVLinkServer.name()}.")
@staticmethod
def is_ok() -> bool:
try:
router = MAVLinkServer()
return router.binary() is not None and router.version() is not None
except Exception as _:
return False