Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make find_actor() delete stale sockaddr entries from registrar on OSError #366

Open
wants to merge 5 commits into
base: asyncio_debugger_support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
],
install_requires=[

# discovery subsys
'bidict',

# trio related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def daemon(
arb_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
Run a daemon actor as a "remote registrar" and/or plain ol
separate actor (service) tree.

'''
if loglevel in ('trace', 'debug'):
Expand Down
136 changes: 98 additions & 38 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Actor "discovery" testing
"""
'''
Discovery subsystem via a "registrar" actor scenarios.

'''
import os
import signal
import platform
Expand Down Expand Up @@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal):
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')

return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}


async def spawn_and_check_registry(
Expand Down Expand Up @@ -283,37 +287,41 @@ async def close_chans_before_nursery(

async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
name='consumer1',
enable_modules=[__name__],
)
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
'consumer2',
enable_modules=[__name__],
)

async with (
portal1.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that
# happens **before** exiting the
# actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2,
):
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that
# happens **before** exiting the
# actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
Expand All @@ -331,10 +339,12 @@ def test_close_channel_explicit(
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""

'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
Expand All @@ -347,16 +357,18 @@ def test_close_channel_explicit(


@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon,
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""

'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
Expand All @@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)


@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:

await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()



# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.

'''
async def main():

name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None

await ptl.cancel_actor()

trio.run(main)
2 changes: 2 additions & 0 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from ._discovery import (
get_arbiter,
get_registrar,
find_actor,
wait_for_actor,
query_actor,
Expand Down Expand Up @@ -77,6 +78,7 @@
'find_actor',
'query_actor',
'get_arbiter',
'get_registrar',
'is_root_process',
'msg',
'open_actor_cluster',
Expand Down
68 changes: 53 additions & 15 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


@acm
async def get_arbiter(
async def get_registrar(

host: str,
port: int,
Expand All @@ -56,11 +56,14 @@ async def get_arbiter(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as arb_portal,
):
yield arb_portal

async with open_portal(chan) as arb_portal:

yield arb_portal
get_arbiter = get_registrar


@acm
Expand Down Expand Up @@ -101,7 +104,10 @@ async def query_actor(

# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")

yield sockaddr if sockaddr else None
Expand All @@ -112,25 +118,57 @@ async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None

) -> AsyncGenerator[Optional[Portal], None]:
) -> AsyncGenerator[Portal | None, None]:
'''
Ask the arbiter to find actor(s) by name.

Returns a connected portal to the last registered matching actor
known to the arbiter.

'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:

sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name,
)

# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")

if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
try:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
return

# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)

yield None


@acm
Expand Down
25 changes: 23 additions & 2 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import warnings

from async_generator import aclosing
from bidict import bidict
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
Expand Down Expand Up @@ -1774,10 +1775,10 @@ class Arbiter(Actor):

def __init__(self, *args, **kwargs) -> None:

self._registry: dict[
self._registry: bidict[
tuple[str, str],
tuple[str, int],
] = {}
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
Expand Down Expand Up @@ -1871,3 +1872,23 @@ async def unregister_actor(
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')


async def delete_sockaddr(
self,
sockaddr: tuple[str, int],

) -> tuple[str, str]:
uid: tuple | None = self._registry.inverse.pop(
sockaddr,
None,
)
if uid:
log.warning(
f'Deleting registry entry for {sockaddr}@{uid}!'
)
else:
log.warning(
f'No registry entry for {sockaddr}@{uid}!'
)
return uid
Loading