forked from DiamondLightSource/pythonSoftIOC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsoftioc.py
327 lines (242 loc) · 10.4 KB
/
softioc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import os
import sys
import atexit
from ctypes import *
from tempfile import NamedTemporaryFile
from epicsdbbuilder.recordset import recordset
from . import imports, device
__all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc']
# tie in epicsAtExit() to interpreter lifecycle
@atexit.register
def epicsAtPyExit():
imports.epicsExitCallAtExits()
def iocInit(dispatcher=None):
"""This must be called exactly once after loading all EPICS database files.
After this point the EPICS IOC is running and serving PVs.
Args:
dispatcher: A callable with signature ``dispatcher(func, *args)``. Will
be called in response to caput on a record. If not supplied use
`cothread` as a dispatcher.
See Also:
`softioc.asyncio_dispatcher` is a dispatcher for `asyncio` applications
"""
if dispatcher is None:
# Fallback to cothread
import cothread
# Create our own cothread callback queue so that our callbacks
# processing doesn't interfere with other callback processing.
dispatcher = cothread.cothread._Callback()
# Set the dispatcher for record processing callbacks
device.dispatcher = dispatcher
imports.iocInit()
def safeEpicsExit(code=0):
'''Calls epicsExit() after ensuring Python exit handlers called.'''
if hasattr(sys, 'exitfunc'): # py 2.x
try:
# Calling epicsExit() will bypass any atexit exit handlers, so call
# them explicitly now.
sys.exitfunc()
finally:
# Make sure we don't try the exit handlers more than once!
del sys.exitfunc
elif hasattr(atexit, '_run_exitfuncs'): # py 3.x
atexit._run_exitfuncs()
# calls epicsExitCallAtExits()
# and then OS exit()
imports.epicsExit(code)
epicsExit = safeEpicsExit
# The following identifiers will be exported to interactive shell.
command_names = []
# IOC Test facilities
def ExportTest(name, argtypes, defaults=(), description='no description yet',
lib=imports.dbCore):
f = getattr(lib, name)
f.argtypes = argtypes
f.restype = None
length = len(argtypes)
def call_f(*args):
missing = length - len(args)
if missing > 0:
# Add in the missing arguments from the given defaults
args = args + defaults[-missing:]
f(*args)
call_f.__doc__ = description
call_f.__name__ = name
globals()[name] = call_f
command_names.append(name)
auto_encode = imports.auto_encode
ExportTest('dba', (auto_encode,), (), '''\
dba(field)
Prints value of each field in dbAddr structure associated with field.''')
ExportTest('dbl', (auto_encode, auto_encode,), ('', ''), '''\
dbl(pattern='', fields='')
Prints the names of records in the database matching pattern. If a (space
separated) list of fields is also given then the values of the fields are also
printed.''')
ExportTest('dbnr', (c_int,), (0,), '''\
dbnr(all=0)
Print number of records of each record type.''')
ExportTest('dbgrep', (auto_encode,), (), '''\
dbgrep(pattern)
Lists all record names that match the pattern. * matches any number of
characters in a record name.''')
ExportTest('dbgf', (auto_encode,), (), '''\
dbgf(field)
Prints field type and value.''')
ExportTest('dbpf', (auto_encode, auto_encode,), (), '''\
dbpf(field, value)
Writes the given value into the field.''')
ExportTest('dbpr', (auto_encode, c_int,), (0,), '''\
dbpr(record, interest=0)
Prints all the fields in record up to the indicated interest level:
= ========================================================
0 Application fields which change during record processing
1 Application fields which are fixed during processing
2 System developer fields of major interest
3 System developer fields of minor interest
4 All other fields
= ========================================================''')
ExportTest('dbtr', (auto_encode,), (), '''\
dbtr(record)
Tests processing of the specified record.''')
ExportTest('dbtgf', (auto_encode,), (), '''\
dbtgf(field_name)
This performs a dbNameToAddr and then calls dbGetField with all possible request
types and options. It prints the results of each call. This routine is of most
interest to system developers for testing database access.''')
ExportTest('dbtpf', (auto_encode, auto_encode,), (), '''\
dbtpf(field_name, value)
This command performs a dbNameToAddr, then calls dbPutField, followed by dbgf
for each possible request type. This routine is of interest to system developers
for testing database access.''')
ExportTest('dbtpn', (auto_encode, auto_encode,), (), '''\
dbtpn(field, value)
This command performs a dbProcessNotify request. If a non-null value argument
string is provided it issues a putProcessRequest to the named record; if no
value is provided it issues a processGetRequest. This routine is mainly of
interest to system developers for testing database access.''')
ExportTest('dbior', (auto_encode, c_int,), ('', 0,), '''\
dbior(driver='', interest=0)
Prints driver reports for the selected driver (or all drivers if driver is
omitted) at the given interest level.''')
ExportTest('dbhcr', (), (), '''\
dbhcr()
Prints hardware configuration report.''')
ExportTest('gft', (auto_encode,), (), '''\
gft(field)
Get Field Test for old database access''')
ExportTest('pft', (auto_encode,), (), '''\
pft(field, value)
Put Field Test for old database access''')
ExportTest('tpn', (auto_encode, auto_encode,), (), '''\
tpn(field, value)
Test Process Notify for old database access''')
ExportTest('dblsr', (auto_encode, c_int,), (), '''\
dblsr(recordname, level)
This command generates a report showing the lock set to which each record
belongs. If recordname is 0, "", or "*" all records are shown, otherwise only
records in the same lock set as recordname are shown.
level can have the following values:
= =======================================================
0 Show lock set information only
1 Show each record in the lock set
2 Show each record and all database links in the lock set
= =======================================================''')
ExportTest('dbLockShowLocked', (c_int,), (), '''\
dbLockShowLocked(level)
This command generates a report showing all locked locksets, the records they
contain, the lockset state and the thread that currently owns the lockset. The
level argument is passed to epicsMutexShow to adjust the information reported
about each locked epicsMutex.''')
ExportTest('scanppl', (c_double,), (0.0,), '''\
scanppl(rate=0.0)
Prints all records with the selected scan rate (or all if rate=0).''')
ExportTest('scanpel', (c_int,), (0,), '''\
scanpel(event=0)
Prints all records with selected event number (or all if event=0).''')
ExportTest('scanpiol', (), (), '''\
scanpiol()
Prints all records in the I/O event scan lists.''')
ExportTest('generalTimeReport', (c_int,), (0,), '''\
generalTimeReport(int level)
This routine displays the time providers and their priority levels that have
registered with the General Time subsystem for both current and event times. At
level 1 it also shows the current time as obtained from each provider.''',
lib=imports.Com)
ExportTest('eltc', (c_int,), (), '''\
eltc(noYes)
This determines if error messages are displayed on the IOC console. 0 means no
and any other value means yes.''',
lib=imports.Com)
# Hacked up exit object so that when soft IOC framework sends us an exit command
# we actually exit.
class Exiter:
def __repr__(self): # hack to exit when "called" with no parenthesis
sys.exit(0)
def __call__(self, code=0):
sys.exit(code)
exit = Exiter()
command_names.append('exit')
def dbLoadDatabase(database, path = None, substitutions = None):
'''Loads a database file and applies any given substitutions.'''
imports.dbLoadDatabase(database, path, substitutions)
def _add_records_from_file(dirname, file, substitutions):
# This is very naive, it loads all includes before their parents which
# possibly can put them out of order, but it works well enough for
# devIocStats
with open(os.path.join(dirname, file)) as f:
lines, include_subs = [], ""
for line in f.readlines():
line = line.rstrip()
if line.startswith('substitute'):
# substitute "QUEUE=scanOnce, QUEUE_CAPS=SCANONCE"
# keep hold of the substitutions
include_subs = line.split('"')[1]
elif line.startswith('include'):
# include "iocQueue.db"
subs = substitutions
if substitutions and include_subs:
subs = substitutions + ", " + include_subs
else:
subs = substitutions + include_subs
_add_records_from_file(dirname, line.split('"')[1], subs)
else:
# A record line
lines.append(line)
# Write a tempfile and load it
with NamedTemporaryFile(suffix='.db', delete=False) as f:
f.write(os.linesep.join(lines).encode())
dbLoadDatabase(f.name, substitutions=substitutions)
os.unlink(f.name)
def devIocStats(ioc_name):
'''This will load a template for the devIocStats library with the specified
IOC name. This should be called before `iocInit`'''
substitutions = 'IOCNAME=' + ioc_name + ', TODFORMAT=%m/%d/%Y %H:%M:%S'
iocstats_dir = os.path.join(os.path.dirname(__file__), 'iocStatsDb')
_add_records_from_file(iocstats_dir, 'ioc.template', substitutions)
def interactive_ioc(context = {}, call_exit = True):
'''Fires up the interactive IOC prompt with the given context.
Args:
context: A dictionary of values that will be made available to the
interactive Python shell together with a number of EPICS test
functions
call_exit: If `True`, the IOC will be terminated by calling epicsExit
which means that `interactive_ioc` will not return
'''
# Add all our commands to the given context.
exports = dict((key, globals()[key]) for key in command_names)
import code
if sys.version_info < (3, 6):
interact_args = {}
else:
# This suppresses irritating exit message introduced by Python3. Alas,
# this option is only available in Python 3.6!
interact_args = dict(exitmsg = '')
try:
code.interact(local = dict(exports, **context), **interact_args)
except SystemExit as e:
if call_exit:
safeEpicsExit(e.code)
raise
if call_exit:
safeEpicsExit(0)