-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvisualizer_win.py
216 lines (181 loc) · 7.76 KB
/
visualizer_win.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import socket
import threading
import paramiko
from architecture.topicLogUtil import *
from network_constants import *
from visualization.VisualizerGeometry import *
TYPES_3D_PLOT = {
"TRANSLATION3D": ["X", "Y", "Z"],
"ANGLE_RAD": ["X", "Y", "Z"],
"ORIENTATION_RAD": ["X", "Y", "Z"],
"POSE2D": ["X", "Y", "THETA"],
"SO3": ["ROLL", "PITCH", "YAW"],
"SE3": ["X", "Y", "Z", "ROLL", "PITCH", "YAW"]
}
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 4096 * 10) # 10 times the previous buffer size
server_socket.setblocking(False)
# Bind the socket to a specific address and port
server_address = (visualizer_ip, 12345) # Change to your needs
server_socket.bind(server_address)
triads = {}
rectangular_prisms = {}
plt.style.use("ggplot")
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
boundL, boundH = -1, 2
ax.set_xlim(boundL, boundH)
ax.set_ylim(boundL, boundH)
ax.set_zlim(boundL, boundH)
triads_lock = threading.Lock()
triads["ORIGIN"] = TriadVector(ax, origin=[0, 0, 0], length=0.1)
rectangular_prisms["test"] = RectangularPrism(ax, origin=[0.1, 0.1, 0.1], dimensions=(1, 1, 1))
removed_item_names = []
def update_triads(data):
with triads_lock:
for key in data.keys():
for type_ in TYPES_3D_PLOT.keys():
if type_ in key:
# remove the end from the string
triad_key = key.replace("_" + type_, "")
if triad_key in removed_item_names:
continue
if triad_key not in triads.keys():
triads[triad_key] = TriadVector(ax, origin=[0.5, 0.5, 0.5], length=0.1)
split_data = split_outside_brackets(data[key])
vector_data = json.loads(split_data[0])
if "TRANSLATION3D" in type_:
triads[triad_key].set_position([vector_data["X"], vector_data["Y"], vector_data["Z"]])
elif "ANGLE_RAD" in type_ or "ORIENTATION_RAD" in type_:
triads[triad_key].set_rotation([vector_data["X"], vector_data["Y"], vector_data["Z"]])
elif "POSE2D" in type_:
triads[triad_key].set_position([vector_data["X"], vector_data["Y"], 0])
triads[triad_key].set_rotation([0, 0, vector_data["THETA"]])
elif "SO3" in type_:
triads[triad_key].set_rotation([vector_data["PITCH"], vector_data["ROLL"], vector_data["YAW"]])
elif "SE3" in type_:
triads[triad_key].set_rotation([vector_data["PITCH"], vector_data["ROLL"], vector_data["YAW"]])
triads[triad_key].set_position([vector_data["X"], vector_data["Y"], vector_data["Z"]])
def user_commands():
"""
command syntax
add - adds an element to the 3d visualization
examples:
- add triad triad_name 30, 30, 30, 0, 0, 0
- remove triad triad_name
- remove prism prism_name
"""
while True:
user_input = input("pyROSe - viz: ")
user_input = user_input.split(" ")
command = user_input[0]
command = command.lower()
print(user_input)
if command == "add":
type_to_add = user_input[1].lower()
if type_to_add == "triad":
name = user_input[2]
if name in triads.keys():
print("{} already exists!".format(name))
continue
try:
x, y, z = float(user_input[3]), float(user_input[4]), float(user_input[5])
except IndexError:
print("something went wrong with parsing, try syntax again")
continue
roll, pitch, yaw = 0, 0, 0
if len(user_input) >= 9:
# there exists angular elements
roll = np.degrees(float(user_input[6]))
pitch = np.degrees(float(user_input[7]))
yaw = np.degrees(float(user_input[8]))
triads[name] = TriadVector(ax, origin=[x, y, z], length=0.3)
triads[name].set_rotation([roll, pitch, yaw])
if command == "remove":
print("removing")
type_to_remove = user_input[1].lower()
print(type_to_remove)
name_to_remove = user_input[2]
print(name_to_remove)
if type_to_remove == "triad":
try:
del triads[name_to_remove]
removed_item_names.append(name_to_remove)
except KeyError:
print("Name was not found in the list")
if command == "ls" or command == "list":
print("Triads:")
for triad in triads.keys():
name = triad
position = triads[name].origin
orientation = triads[name].orientation
x = position[0]
y = position[1]
z = position[2]
roll = orientation[0]
pitch = orientation[1]
yaw = orientation[2]
print("Name: {} ; Position: (X:{} Y:{} Z:{}) "
"Orientation (deg): (Roll:{} Pitch:{} Yaw:{})"
.format(triad, x, y, z, np.degrees(roll), np.degrees(pitch), np.degrees(yaw)))
if command == "run":
file = user_input[1]
run_script_on_rpi(robot_hostname, robot_port, robot_hostname, robot_password, file)
def update(_):
with triads_lock:
artists = []
for key, triad in triads.items():
if triad.rotation_data is not None: # Only try to use rotation_data if it's not None
triad.set_rotation(triad.rotation_data)
artists.extend(triad.get_artists())
for key, prism in rectangular_prisms.items():
if prism.rotation_data is not None:
prism.set_rotation(prism.rotation_data)
artists.extend(prism.get_artists())
ax.figure.canvas.draw()
ax.figure.canvas.flush_events()
return artists
def main():
while True:
try:
# This will raise a socket.error exception if there's nothing to read
data, address = server_socket.recvfrom(4096)
print(data)
# Convert the data from a byte string to a string
data = data.decode()
# Convert the string to a JSON object
_, data = parse_line(data)
data = json.loads(data)
update_triads(data)
except socket.error:
pass
def run_script_on_rpi(hostname, port, username, password, script_name):
# Create a new SSH client
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect to the Raspberry Pi
client.connect(hostname, port, username, password)
# The command to run
command = f'python3 /Documents/pyROS/{script_name}'
# Execute the command
stdin, stdout, stderr = client.exec_command(command)
# Print any output from the command
for line in stdout:
print(line.strip('\n'))
# Close the connection
client.close()
if __name__ == "__main__":
server_thread = threading.Thread(target=main)
print("server thread init")
user_command_thread = threading.Thread(target=user_commands)
print("user command init")
ani = FuncAnimation(fig, update, blit=False, interval=50, repeat=False, cache_frame_data=False, save_count=40)
print("matplotlib init")
server_thread.start()
print("server thread start")
# user_command_thread.start()
# print("user input start")
plt.show()
print("matplotlib start")
server_socket.close()
print("killed server")