-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrawcore_serial.py
249 lines (210 loc) · 9.35 KB
/
drawcore_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# coding=utf-8
# drawcore_serial.py
# Serial connection utilities for DrawCore
from packaging.version import parse
import logging
import time
import serial
logger = logging.getLogger(__name__)
def find_port():
# Find first available Board by searching USB ports.
# Return serial port object.
try:
from serial.tools.list_ports import comports
except ImportError:
return None
if comports:
try:
com_ports_list = list(comports())
except TypeError:
return None
drawcore_port = None
for port in com_ports_list:
# if port[1].startswith("USB-SERIAL"):
# drawcore_port = port[0] # Success;
# break # stop searching-- we are done.
# if port[1].startswith("USB Serial"):
# drawcore_port = port[0] # Success;
# break # stop searching-- we are done.
if port[2].startswith("USB VID:PID=1A86:7523"):
drawcore_port = port[0] # Success; DrawCore found by VID/PID match.
break
if port[2].startswith("USB VID:PID=1A86:8040"):
drawcore_port = port[0] # Success; DrawCore found by VID/PID match.
break
if drawcore_port == None:
for port in com_ports_list:
logger.error('com_port: {0}'.format(port))
return drawcore_port
def list_drawcore_ports():
# Find and return a list of all EiBotBoard units
# connected via USB port.
try:
from serial.tools.list_ports import comports
except ImportError:
return None
if comports:
com_ports_list = list(comports())
drawcore_ports_list = []
for port in com_ports_list:
# logger.error('com_port: {0}'.format(port[1]))
port_has_drawcore = False
# if port[1].startswith("USB-SERIAL"):
# port_has_drawcore = True
# if port[1].startswith("USB Serial"):
# port_has_drawcore = True
if port[2].startswith("USB VID:PID=1A86:7523"):
port_has_drawcore = True # Success; DrawCore found by VID/PID match.
if port[2].startswith("USB VID:PID=1A86:8040"):
port_has_drawcore = True # Success; DrawCore found by VID/PID match.
if port_has_drawcore:
drawcore_ports_list.append(port)
if drawcore_ports_list:
return drawcore_ports_list
def find_named(port_name):
return None
def test_port(port_name):
"""
Open a given serial port, verify that it is an EiBotBoard,
and return a SerialPort object that we can reference later.
This routine only opens the port;
it will need to be closed as well, for example with closePort( port_name ).
You, who open the port, are responsible for closing it as well.
"""
if port_name is not None:
try:
# serial_port = serial.Serial(port_name, timeout=1.0) # 1 second timeout!
serial_port = serial.Serial()
serial_port.port = port_name
serial_port.baudrate = 115200
serial_port.timeout = 1
serial_port.rts = 0
serial_port.dtr = 0
serial_port.open()
# serial_port.flushInput() # deprecated function name;
# use serial_port.reset_input_buffer()
# if we can be sure that we have pySerial 3+.
# serial_port.readline()
# n = serial_port.in_waiting
# logger.error("Error reading serial data%d"%n)
# while n > 0:
# n = serial_port.in_waiting
# serial_port.readline()
serial_port.write('v\r'.encode('ascii'))
str_version = serial_port.readline()
if str_version and str_version.startswith("DrawCore".encode('ascii')):
serial_port.reset_input_buffer()
return serial_port
n = serial_port.in_waiting
while n > 0:
n = serial_port.in_waiting
serial_port.readline()
serial_port.write('v\r'.encode('ascii'))
str_version = serial_port.readline()
if str_version and str_version.startswith("DrawCore".encode('ascii')):
serial_port.reset_input_buffer()
return serial_port
serial_port.write('v\r'.encode('ascii'))
str_version = serial_port.readline()
if str_version and str_version.startswith("DrawCore".encode('ascii')):
serial_port.reset_input_buffer()
return serial_port
serial_port.close()
except serial.SerialException as err:
logger.error("Error testing serial port `{}` connection".format(port_name))
logger.info("Error context:", exc_info=err)
return None
def open_port():
# Find and open a port to a single attached EiBotBoard.
# The first port located will be used.
found_port = find_port()
serial_port = test_port(found_port)
if serial_port:
return serial_port
else:
logger.error('error open com_port: {0}'.format(found_port))
return None
def close_port(port_name):
if port_name is not None:
try:
port_name.close()
except serial.SerialException:
pass
def timestamp():
# Return a string containing the current date and time.
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def query(port_name, cmd):
if port_name is not None and cmd is not None:
response_lines = []
try:
port_name.write(cmd.encode('ascii'))
# Keep reading lines until we get an 'ok' or timeout
while True:
line = port_name.readline().decode('ascii').strip()
n_retry_count = 0
# Handle empty responses with retry
while len(line) == 0 and n_retry_count < 20:
line = port_name.readline().decode('ascii').strip()
n_retry_count += 1
# Special case for commands that don't return 'ok'
if cmd.split(",")[0].strip().lower() in ["v", "i", "a", "mr", "pi", "qm"]:
return line if line else ''
# If we got a response
if line:
if line.strip().startswith("ok"):
break
response_lines.append(line)
else:
# No response after retries
break
# Return all lines joined with newlines
return '\n'.join(response_lines) if response_lines else ''
except (serial.SerialException, IOError, RuntimeError, OSError) as err:
logger.error("Error reading serial data")
logger.info("Error context:", exc_info=err)
return ''
def command(port_name, cmd):
if port_name is not None and cmd is not None:
try:
port_name.write(cmd.encode('ascii'))
response = port_name.readline().decode('ascii')
n_retry_count = 0
while len(response) == 0 and n_retry_count < 20:
# get new response to replace null response if necessary
response = port_name.readline().decode('ascii')
n_retry_count += 1
if response.strip().startswith("ok"):
# Debug option: indicate which command:
# inkex.errormsg( 'OK after command: ' + cmd )
pass
else:
if response:
error_msg = '\n'.join(('Unexpected response from DrawCore.',
' Command: {0}'.format(cmd.strip()),
' Response: {0}'.format(response.strip())))
else:
error_msg = 'DrawCore Serial Timeout after command: {0}'.format(cmd)
raise ValueError(error_msg)
except (serial.SerialException, IOError, RuntimeError, OSError) as err:
if cmd.strip().lower() not in ["rb"]: # Ignore error on reboot (RB) command
logger.error('Failed after command: {0}'.format(cmd))
logger.info("Error context:", exc_info=err)
def min_version(port_name, version_string):
# Query the DrawCore firmware version for the DrawCore located at port_name.
# Return True if the DrawCore firmware version is at least version_string.
# Return False if the DrawCore firmware version is below version_string.
# Return None if we are unable to determine True or False.
if port_name is not None:
drawcore_version_string = query_version(port_name) # Full string, human readable
drawcore_version_string = drawcore_version_string.split("Firmware Version ", 1)
if len(drawcore_version_string) > 1:
drawcore_version_string = drawcore_version_string[1]
else:
return None # We haven't received a reasonable version number response.
drawcore_version_string = drawcore_version_string.strip() # Stripped copy, for number comparisons
if parse(drawcore_version_string) >= parse(version_string):
return True
else:
return False
def query_version(port_name):
return query(port_name, 'V\r') # Query DrawCore Version String