Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trio base endpoint implementation #126

Conversation

pipermerriam
Copy link
Member

@pipermerriam pipermerriam commented Jun 11, 2019

replaces #56

built on #133 #135 #136

What was wrong?

We need a trio based endpoint implementation.

How was it fixed?

Wrote an implementation using the trio library.

Cute Animal Picture

4zvN5a

@pipermerriam pipermerriam force-pushed the piper/trio-base-endpoint-implementation branch 4 times, most recently from d323f8b to 6183136 Compare June 12, 2019 23:33
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: extract

lahja/base.py Outdated
def is_running(self) -> bool:
return not self.is_stopped and self.running.is_set()
return not self.is_stopped and self._running.is_set()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: extract

@@ -164,7 +163,7 @@ def __init__(
await self.stop()

async def _start(self) -> None:
self._task = asyncio.ensure_future(self._run())
self._task = asyncio.ensure_future(self._process_incoming_messages())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: extract

await self._running.wait()

async def wait_stopped(self) -> None:
await self._stopped.wait()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time trying to make a Runnable abstraction but it ended up making things feel really disconnected complex to understand. Maybe worth trying for again once this code has settled.

_stream_channels: Dict[
Type[BaseEvent], "weakref.WeakSet[trio.abc.SendChannel[BaseEvent]]"
]
_pending_requests: Dict[RequestID, "trio.abc.SendChannel[BaseEvent]"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: don't need quotes for this type hint

Establish a connection to a named endpoint server over an IPC socket.
"""
if not self.is_running:
raise Exception(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: proper exception class

@pipermerriam pipermerriam force-pushed the piper/trio-base-endpoint-implementation branch 7 times, most recently from 95e96e1 to 49914df Compare June 13, 2019 18:18
@pipermerriam pipermerriam changed the title [WIP] Piper/trio base endpoint implementation Trio base endpoint implementation Jun 13, 2019
@pipermerriam pipermerriam force-pushed the piper/trio-base-endpoint-implementation branch from 49914df to a02311f Compare June 13, 2019 18:24
@pipermerriam pipermerriam marked this pull request as ready for review June 13, 2019 18:26
@pipermerriam
Copy link
Member Author

This is ready for review but only the latest commit is relevant. The others are part of other pull requests.

@pipermerriam pipermerriam requested review from lithp and cburgdorf June 13, 2019 20:53
@pipermerriam
Copy link
Member Author

I won't be surprised if there are a few underlying issues but this should have rough parity to the existing asyncio implementation and I'd like to start using it as soon as possible. One thing that I want to focus on a little is unification of the test suites since they are effectively duplicates.

@lithp
Copy link
Contributor

lithp commented Jun 14, 2019

About to dive in, 1500 is a lot of lines but I'm excited to see this shipped!

self._stopped.set()
await self._cleanup()

async def _run(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, why did you decide to add this internal buffer, maybe this improved performance in the benchmark? The kernel already adds a pretty big buffer here, I was thinking about ways to simplify this code but the easiest way might be to remove it entirely and rely on the buffer that the OS provides.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try that. I'm not entirely sure this is actually necessary so I'll verify the performance gain is real.

async def send_message(self, message: Msg) -> None:
msg_data = pickle.dumps(message)
size = len(msg_data)
async with self._lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this lock is because send_all can switch away when there's backpressure and then throw an exception if we try to use it reentrantly. I was surprised that trio didn't lock for us but then found this thread.

I think future Brian's will have an easier time reading this if the lock was called something like _write_lock and if there to be a comment saying what it's doing, maybe even with a link to the above thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

size = len(msg_data)
async with self._lock:
try:
# TODO: look into buffering the writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently if a cancel happens during send_all the socket is corrupted, so a cancel in a task calling broadcast() would cause the endpoint to break. Fixing the cancellation problem might involve send_message adding the message into a short queue, with another task which reads from the queue and sends, that would also add some buffering for free!

elif self.is_stopped:
raise LifecycleError("RemoteEndpoint has already been run and stopped")

nursery.start_soon(self._run)
Copy link
Contributor

@lithp lithp Jun 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like trio.Nursery.start is designed to make this pattern easier to write: https://trio.readthedocs.io/en/latest/reference-core.html#trio.Nursery.start

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, remember that from the docs. I'll open an issue to track refactoring this to use something like that pattern.


async def _run(self) -> None:
async with cast(TrioConnection, self.conn).run():
await self._process_incoming_messages()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this goes the other direction from another comment I left, but if TrioConnection didn't need to be run and instead read directly from the socket w/o buffering then this cast wouldn't be necessary and some of these methods could be combined:

@asynccontextmanager
async def run(self) -> AsyncIterator[RemoteEndpointAPI]:
  if self.is_running:
    raise LifecycleError("RemoteEndpoint is already running")
  elif self.is_stopped:
    raise LifecycleError("RemoteEndpoint has already been run and stopped")

  async with trio.open_nursery() as nursery:
    nursery.start_soon(self._process_incoming_messages)
    try:
      yield self
    finally:
      await self.stop()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


TWaitForEvent = TypeVar("TWaitForEvent", bound=BaseEvent)

async def wait_for(self, event_type: Type[TWaitForEvent]) -> TWaitForEvent:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this overrides and exactly duplicates lahja.base.BaseEndpoint.wait_for

"""
(send_channel, receive_channel) = cast(
StreamChannelPair, trio.open_memory_channel(100)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, it's annoying that we have to add all these casts everywhere, it might be worth adding our own typestubs, or using someone elses: https://github.com/python-trio/trio-typing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already using trio-typing. I'm not sure it comes with anything for this but I'll look.


# Trigger the subscription change since the event will be removed from
# the set once the local reference to the channel goes out of scope.
self._subscriptions_changed.set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels dangerous. If the weakref hasn't been cleaned up, and when it's cleaned up is entirely up to implementation details, then _monitor_subscription_changes will check subscriptions and find that the subscription still exists, even though it doesn't. Remote endpoints will never get the message that the subscription is gone until another subscription update happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like weakref includes finalizers, which could be used to notify _monitor_subscription_changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

# Create an asynchronous generator that we use to pipe the result
send_channel, receive_channel = cast(
RequestResponseChannelPair, trio.open_memory_channel(0)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that trio doesn't have a Future. A channel can do everything a future can do and more, sure, but this is so much more code than a future would have been!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was surprised as well. Supposedly it should be pretty easy to write one..

# as running.
await self._message_processing_loop_running.wait()
await self._connection_loop_running.wait()
await self._process_broadcasts_running.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these could be removed with Nursery.start

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looking at it again, if they're waiting on channels there's no need to wait for them to start at all! The channels will queue up any interactions with them which occur before they start.

async for (item, config) in channel:
nursery.start_soon(self.broadcast, item, config)

async def _process_connections(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to pull this out into another coro+channel? If the nursery is added somewhere like self.nursery then connect_to_endpoint could do the connect and then call nursery.start_soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a leftover from a previous design problem that no-longer exists. Will very and remove if it isn't needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because it needs access to the nursery to run the endpoint in the background and I much prefer trying to keep access to the nursery isolated so this indirection is the cost.

# objects which feed received events into the send side of the channel.
# Those messages are then retrieved from the receive channel by the
# `_process_received_messages` daemon.
(self._message_send_channel, message_receive_channel) = cast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that trio gives us two channels makes naming hard, but _message_send_channel does not sound like the channel we receive messages with, it took me a bit to figure out what it was for. Maybe _incoming_message_channel_send and _incoming_message_channel_receive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if I can find better names. Agreed on this being a confusing one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to _inbound_send_channel (and there is a new _outbound_send_channel to signal the direction of message flow.

async def test_trio_endpoint_stream_without_limit(endpoint_pair):
alice, bob = endpoint_pair

async with trio.open_nursery() as nursery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a good fixture candidate

# subscriptions end up operating on the *new* event and will be
# picked up in the next iteration of the loop.
await self._subscriptions_changed.wait()
self._subscriptions_changed = trio.Event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're in trio world this might be easier to implement with a channel. Using a channel would also allow for the batching of events that you wanted, when an event comes in this can first empty the queue and then send messages to the remotes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right but it feels roughly equivalent and I'm inclined to leave this as-is.

await bob.wait_until_endpoint_subscribed_to(alice.name, EventTest)

await bob.broadcast(event)
await done.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test could call alice.wait_for directly instead of waiting on another coro to do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it requires that the alice.wait_for be run first since the subscription needs to be in place for alice to receive it.

Copy link
Contributor

@lithp lithp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of code and I only spent a few hours with it so this wasn't a full review, but I know you want to get it merged quickly and given the retreat is so near the next chance I'll have to look at it more is probably a while away.

Left a bunch of comments but nothing here stands out besides the weakref thing. Once the weakrefs are figured out I'm okay with merging this and trying to clean it up later as it evolves.

Separately, I think there's a refactoring of all the run(), _run(), start() methods which makes them a lot simpler but I haven't been able to find it yet.

@pipermerriam pipermerriam force-pushed the piper/trio-base-endpoint-implementation branch 3 times, most recently from 6f9796d to bca1f52 Compare June 24, 2019 18:47
@pipermerriam pipermerriam force-pushed the piper/trio-base-endpoint-implementation branch from bca1f52 to 37b33c0 Compare June 24, 2019 19:39
@pipermerriam pipermerriam merged commit f0b7ead into ethereum:master Jun 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants