-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSensorProducer.py
230 lines (193 loc) · 10.1 KB
/
SensorProducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
############################################################################
#
# Python Onboard Diagnostics II Advanced
#
# SensorProducer.py
#
# Copyright 2021-2023 Keven L. Ates ([email protected])
#
# This file is part of the Onboard Diagnostics II Advanced (pyOBDA) system.
#
# pyOBDA is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# pyOBDA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pyOBDA; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
############################################################################
import wx
import time
import threading
from typing import Callable
import AppSettings
from Connection import Connection
from OBD2Port import OBD2Port
from SensorManager import SensorManager
from EventHandler import EventHandler
from EventDebug import EventDebug
from EventDTC import EventDTC
from EventSensor import EventSensor
from EventConnection import EventConnection
from EventTest import EventTest
from OBD2Device.Codes import Codes
class ThreadCommands:
Null = 0 # ...Do Nothing
DTC_Clear = 1 # ...Clear DTCs
DTC_Load = 2 # ...Get DTCs
Disconnect = -1 # ...Disconnect from vehicle's ECU
class SensorProducer(threading.Thread):
"""
The Sensor Producer Class to produce sensor managers.
"""
def __init__(self, connection: Connection, notebook: wx.Notebook, events: EventHandler, funcSetTestIgnition: Callable):
super().__init__()
self.connection = Connection(connection) # ...copy
self.PORT = None
self.notebook = notebook
self.events = events
self.setTestIgnition = funcSetTestIgnition
self.iSensorListLen = len(SensorManager.SENSORS)
self.supported = [0] * self.iSensorListLen
self.active = [[]] * self.iSensorListLen
self.iThreadControl = ThreadCommands.Null
self.iCurrSensorsPage = 0 # ...starting sensors page
def run(self):
self.startProduction()
self.stopProduction()
def setThreadControl(self, iThreadControl: int):
self.iThreadControl = iThreadControl
def setSensorPage(self, iCurrSensorPage: int):
self.iCurrSensorsPage = iCurrSensorPage
def getSensorPage(self) -> int:
return self.iCurrSensorsPage
def startProduction(self):
wx.PostEvent( self.events, EventDebug( [2, "Starting Sensor Connection..."] ) )
wx.PostEvent( self.events, EventConnection( [0, 1, AppSettings.CHAR_QMARK] ) )
self.initCommunication()
if self.PORT.bConnected == False: # ...cannot connect, exit thread
wx.PostEvent( self.events, EventDebug( [1, "ERROR: Connection Failed!"] ) )
# Signal app that communication failed...
wx.PostEvent( self.events, EventConnection( [0, 1, AppSettings.CHAR_CROSSX] ) )
wx.PostEvent( self.events, EventConnection( [-1, -1, "SIGNAL: Failed"] ) ) # ...signal connection failure to app
return
wx.PostEvent( self.events, EventDebug( [3, " Connected"] ) )
wx.PostEvent( self.events, EventConnection( [0, 1, AppSettings.CHAR_CHECK] ) )
wx.PostEvent( self.events, EventConnection( [2, 1, self.PORT.strELMver] ) )
wx.PostEvent( self.events, EventConnection( [3, 1, self.PORT.strELMvolts] ) )
statePrev = -1
stateCurr = -1
while self.iThreadControl != ThreadCommands.Disconnect: # ...thread loop, not disconnecting...
statePrev = stateCurr # ...store last state
stateCurr = self.notebook.GetSelection()
if stateCurr == 0: # ...Status Page
if statePrev != stateCurr :
wx.PostEvent( self.events, EventDebug([2, "Status Page..."] ) )
elif stateCurr == 1: # ...Tests Page
if statePrev != stateCurr :
wx.PostEvent( self.events, EventDebug([2, "Test Page..."] ) )
astrResults = self.PORT.getStatusTests()
if astrResults:
for iIndex in range(0, len(astrResults)):
if (astrResults[iIndex] != None): # ...spacer
wx.PostEvent( self.events, EventTest( [ iIndex, 1, astrResults[iIndex] ] ) )
elif stateCurr == 2: # ...Sensor Page
if statePrev != stateCurr :
wx.PostEvent( self.events, EventDebug( [2, "Sensor Page..."] ) )
iStartSensors = 3
if self.iCurrSensorsPage > 0 :
iStartSensors = 1
for iIndex in range(iStartSensors, len(self.active[self.iCurrSensorsPage])):
if self.active[self.iCurrSensorsPage][iIndex]:
tupSensorInfo = self.PORT.getSensorInfo(self.iCurrSensorsPage, iIndex)
listResponse = [self.iCurrSensorsPage, iIndex, 2, "%s (%s)" % (tupSensorInfo[1], tupSensorInfo[2])]
wx.PostEvent( self.events, EventSensor(listResponse) )
elif stateCurr == 3: # ...DTC Page
if statePrev != stateCurr :
wx.PostEvent( self.events, EventDebug( [2, "DTC Page..."] ) )
if self.iThreadControl == ThreadCommands.DTC_Clear:
listResponse = self.PORT.clearDTC()
# Response is N/A for CLEAR_DTC...no need to process
# Before resetting ThreadControl, check for a disconnect...
if self.iThreadControl == ThreadCommands.Disconnect:
break
self.iThreadControl = ThreadCommands.DTC_Load
if self.iThreadControl == ThreadCommands.DTC_Load:
wx.PostEvent( self.events, EventDTC(0) ) # ...clear list
listCodesDTC = self.PORT.getDTC()
if len(listCodesDTC) == 0:
listResponse = ["", "", "No DTC Codes (all clear)"]
wx.PostEvent( self.events, EventDTC(listResponse) )
for iIndex in range(0, len(listCodesDTC)):
listResponse = [ listCodesDTC[iIndex][1], listCodesDTC[iIndex][0], Codes.Codes[ listCodesDTC[iIndex][1] ] ]
wx.PostEvent( self.events, EventDTC(listResponse) )
# Before resetting ThreadControl, check for a disconnect...
if self.iThreadControl == ThreadCommands.Disconnect:
break
self.iThreadControl = ThreadCommands.Null
elif stateCurr == 4: # ...Trace (Debug) Page
if statePrev != stateCurr :
wx.PostEvent( self.events, EventDebug( [2, "Trace Page..."] ) )
# View the trace log...
else: # ...everything else
if statePrev != stateCurr :
# We should never see this message...
wx.PostEvent( self.events, EventDebug( [2, "ERROR Page..."] ) )
if self.iThreadControl == ThreadCommands.Disconnect: # ...end thread
break
# Delay the loop iteration based on the Connection configured DELAY...
time.sleep(self.connection.DELAY if self.connection.DELAY > 1.0 else 1.0)
def initCommunication(self):
self.PORT = OBD2Port(self.connection, self.events, self.getSensorPage, self.setTestIgnition)
if self.PORT.bConnected == False: # ...cannot connect to vehicle
return
wx.PostEvent( self.events, EventDebug( [1, "Communication initialized..."] ) )
# Populate sensor pages with initial data...
for iSensorGroup in range(self.iSensorListLen) :
response = self.PORT.getSensorInfo(iSensorGroup, 0)[1]
if ( response.isNull() ) :
# NOTE: Event message already from getSensorInfo()
self.supported[iSensorGroup] = ""
else:
self.supported[iSensorGroup] = response.value # ...Supported PIDs
self.active[iSensorGroup] = []
for iIndex, bSupport in enumerate(self.supported[iSensorGroup]) :
self.active[iSensorGroup].append(bSupport)
if bSupport :
wx.PostEvent( self.events, EventSensor([iSensorGroup, iIndex, 0, AppSettings.CHAR_CHECK] ) )
else:
wx.PostEvent( self.events, EventSensor([iSensorGroup, iIndex, 0, AppSettings.CHAR_BALLOTX] ) )
wx.PostEvent( self.events, EventDebug( [3, " Sensors marked for support..."] ) )
def stopProduction(self):
wx.PostEvent( self.events, EventDebug( [2, "Stopping Sensor Connection..."] ) )
# If stop is called on a defined connection port...
if self.PORT != None:
self.PORT.close()
self.PORT = None
wx.PostEvent( self.events, EventConnection([0, 1, AppSettings.CHAR_BALLOTX] ) )
wx.PostEvent( self.events, EventConnection([2, 1, "Unknown"] ) )
wx.PostEvent( self.events, EventConnection([3, 1, "---"] ) )
def setIDOff(self, iID):
wx.PostEvent( self.events, EventDebug( [2, "Setting Sensor ID OFF"] ) )
if iID >= 0 and iID < len(self.active):
self.active[iID] = False
else:
wx.PostEvent( self.events, EventDebug( [2, "Invalid Sensor ID"] ) )
def setIDOn(self, iID):
wx.PostEvent( self.events, EventDebug( [2, "Setting Sensor ID ON"] ) )
if iID >= 0 and iID < len(self.active):
self.active[iID] = True
else:
wx.PostEvent( self.events, EventDebug( [2, "Invalid Sensor ID"] ) )
def setAllIDsOff(self):
for iID in range(0, len(self.active)):
self.setIDOff(iID)
def setAllIDsOn(self):
for iID in range(0, len(self.active)):
self.setIDOff(iID)