Skip to content

Commit a89d8ae

Browse files
Merge pull request #138 from mdavidsaver/clean-asyncio-tasks
AsyncioDispatcher cleanup tasks atexit
2 parents d55483d + f7a1f53 commit a89d8ae

File tree

5 files changed

+127
-14
lines changed

5 files changed

+127
-14
lines changed

.github/workflows/code.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
fail-fast: false
5555
matrix:
5656
os: [ubuntu-latest, windows-latest, macos-latest]
57-
python: [cp36, cp37, cp38, cp39, cp310]
57+
python: [cp37, cp38, cp39, cp310]
5858

5959
include:
6060
# Put coverage and results files in the project directory for mac
@@ -147,7 +147,7 @@ jobs:
147147
fail-fast: false
148148
matrix:
149149
os: [ubuntu-latest, windows-latest, macos-latest]
150-
python: [cp36, cp37, cp38, cp39, cp310]
150+
python: [cp37, cp38, cp39, cp310]
151151

152152
runs-on: ${{ matrix.os }}
153153

CHANGELOG.rst

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ Versioning <https://semver.org/spec/v2.0.0.html>`_.
1010
Unreleased_
1111
-----------
1212

13+
Changed:
14+
15+
- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_
16+
1317
Fixed:
1418

1519
- `Fix conversion of ctypes pointers passed to C extension <../../pull/154>`_

setup.cfg

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@ long_description_content_type = text/x-rst
1010
classifiers =
1111
Development Status :: 5 - Production/Stable
1212
License :: OSI Approved :: Apache Software License
13-
Programming Language :: Python :: 3.6
1413
Programming Language :: Python :: 3.7
1514
Programming Language :: Python :: 3.8
1615
Programming Language :: Python :: 3.9
1716
Programming Language :: Python :: 3.10
1817

1918
[options]
2019
packages = softioc
21-
python_requires = >=3.6
20+
python_requires = >=3.7
2221

2322
[options.entry_points]
2423
# Include a command line script

softioc/asyncio_dispatcher.py

+44-9
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,62 @@
55
import atexit
66

77
class AsyncioDispatcher:
8-
def __init__(self, loop=None):
8+
def __init__(self, loop=None, debug=False):
99
"""A dispatcher for `asyncio` based IOCs, suitable to be passed to
1010
`softioc.iocInit`. Means that `on_update` callback functions can be
1111
async.
1212
1313
If a ``loop`` is provided it must already be running. Otherwise a new
1414
Event Loop will be created and run in a dedicated thread.
15+
``debug`` is passed through to ``asyncio.run()``.
16+
17+
For a clean exit, call ``softioc.interactive_ioc(..., call_exit=False)``
1518
"""
1619
if loop is None:
20+
# will wait until worker is executing the new loop
21+
started = threading.Event()
1722
# Make one and run it in a background thread
18-
self.loop = asyncio.new_event_loop()
19-
worker = threading.Thread(target=self.loop.run_forever)
23+
self.__worker = threading.Thread(
24+
target=asyncio.run,
25+
args=(self.__inloop(started),),
26+
kwargs={'debug': debug})
2027
# Explicitly manage worker thread as part of interpreter shutdown.
2128
# Otherwise threading module will deadlock trying to join()
2229
# before our atexit hook runs, while the loop is still running.
23-
worker.daemon = True
30+
self.__worker.daemon = True
31+
32+
self.__worker.start()
33+
started.wait()
34+
35+
self.__atexit = atexit.register(self.__shutdown)
36+
37+
assert self.loop is not None and self.loop.is_running()
2438

25-
@atexit.register
26-
def aioJoin(worker=worker, loop=self.loop):
27-
loop.call_soon_threadsafe(loop.stop)
28-
worker.join()
29-
worker.start()
3039
elif not loop.is_running():
3140
raise ValueError("Provided asyncio event loop is not running")
3241
else:
3342
self.loop = loop
3443

44+
def close(self):
45+
if self.__atexit is not None:
46+
atexit.unregister(self.__atexit)
47+
self.__atexit = None
48+
49+
self.__shutdown()
50+
51+
async def __inloop(self, started):
52+
self.loop = asyncio.get_running_loop()
53+
self.__interrupt = asyncio.Event()
54+
started.set()
55+
del started
56+
await self.__interrupt.wait()
57+
58+
def __shutdown(self):
59+
if self.__worker is not None:
60+
self.loop.call_soon_threadsafe(self.__interrupt.set)
61+
self.__worker.join()
62+
self.__worker = None
63+
3564
def __call__(
3665
self,
3766
func,
@@ -48,3 +77,9 @@ async def async_wrapper():
4877
except Exception:
4978
logging.exception("Exception when running dispatched callback")
5079
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
80+
81+
def __enter__(self):
82+
return self
83+
84+
def __exit__(self, A, B, C):
85+
self.close()

tests/test_asyncio.py

+76-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44

55
from multiprocessing.connection import Listener
66

7-
from conftest import requires_cothread, ADDRESS, select_and_recv
7+
from conftest import (
8+
ADDRESS, select_and_recv,
9+
log, get_multiprocessing_context, TIMEOUT,
10+
create_random_prefix
11+
)
812

913
from softioc.asyncio_dispatcher import AsyncioDispatcher
14+
from softioc import builder, softioc
1015

1116
@pytest.mark.asyncio
1217
async def test_asyncio_ioc(asyncio_ioc):
@@ -131,3 +136,73 @@ def test_asyncio_dispatcher_event_loop():
131136
event_loop = asyncio.get_event_loop()
132137
with pytest.raises(ValueError):
133138
AsyncioDispatcher(loop=event_loop)
139+
140+
def asyncio_dispatcher_test_func(device_name, child_conn):
141+
142+
log("CHILD: Child started")
143+
144+
builder.SetDeviceName(device_name)
145+
146+
147+
with AsyncioDispatcher() as dispatcher:
148+
# Create some records
149+
ai = builder.aIn('AI', initial_value=5)
150+
builder.aOut('AO', initial_value=12.45, always_update=True,
151+
on_update=lambda v: ai.set(v))
152+
153+
# Boilerplate get the IOC started
154+
builder.LoadDatabase()
155+
softioc.iocInit(dispatcher)
156+
157+
# Start processes required to be run after iocInit
158+
async def update():
159+
while True:
160+
ai.set(ai.get() + 1)
161+
await asyncio.sleep(0.01)
162+
163+
dispatcher(update)
164+
165+
log("CHILD: Sending Ready")
166+
child_conn.send("R")
167+
168+
# Keep process alive while main thread runs CAGET
169+
if child_conn.poll(TIMEOUT):
170+
val = child_conn.recv()
171+
assert val == "D", "Did not receive expected Done character"
172+
173+
174+
async def test_asyncio_dispatcher_as_context_manager():
175+
"""Test that the asyncio dispatcher can be used as a context manager"""
176+
ctx = get_multiprocessing_context()
177+
parent_conn, child_conn = ctx.Pipe()
178+
179+
device_name = create_random_prefix()
180+
181+
process = ctx.Process(
182+
target=asyncio_dispatcher_test_func,
183+
args=(device_name, child_conn),
184+
)
185+
186+
process.start()
187+
188+
log("PARENT: Child started, waiting for R command")
189+
190+
from aioca import caget
191+
try:
192+
# Wait for message that IOC has started
193+
select_and_recv(parent_conn, "R")
194+
195+
# ao_val = await caget(device_name + ":AO")
196+
ao_val = await caget(device_name + ":AO")
197+
assert ao_val == 12.45
198+
199+
# Confirm the value of the AI record is increasing
200+
ai_val_1 = await caget(device_name + ":AI")
201+
await asyncio.sleep(1)
202+
ai_val_2 = await caget(device_name + ":AI")
203+
assert ai_val_2 > ai_val_1
204+
205+
finally:
206+
parent_conn.send("D") # "Done"
207+
process.join(timeout=TIMEOUT)
208+
assert process.exitcode == 0 # clean exit

0 commit comments

Comments
 (0)