Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trio base endpoint implementation #126

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-asyncio
py36-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-trio
py37-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-asyncio
py37-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-trio
py36-examples:
<<: *common
docker:
Expand All @@ -93,12 +105,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-asyncio
py36-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-trio
py37-snappy-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-asyncio
py37-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-trio
workflows:
version: 2
test:
Expand All @@ -108,8 +132,12 @@ workflows:
- doctest
- lint
- py36-core-asyncio
- py37-core-asyncio
- py36-core-trio
- py36-examples
- py37-examples
- py37-core-asyncio
- py37-core-trio
- py36-snappy-core-asyncio
- py36-snappy-core-trio
- py37-snappy-core-asyncio
- py37-snappy-core-trio
15 changes: 3 additions & 12 deletions lahja/asyncio/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
ConnectionConfig,
Message,
Msg,
RequestIDGenerator,
Subscription,
should_endpoint_receive_item,
)
Expand Down Expand Up @@ -154,7 +153,7 @@ def __init__(
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[RemoteEndpointAPI]:
await self._start()

Expand Down Expand Up @@ -199,20 +198,12 @@ class AsyncioEndpoint(BaseEndpoint):
_sync_handler: DefaultDict[Type[BaseEvent], List[SubscriptionSyncHandler]]

_loop: Optional[asyncio.AbstractEventLoop] = None
_get_request_id: Iterator[RequestID]

_subscriptions_changed: asyncio.Event

def __init__(self, name: str) -> None:
super().__init__(name)

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

# Signal when a new remote connection is established
self._remote_connections_changed = asyncio.Condition() # type: ignore

Expand Down Expand Up @@ -293,7 +284,7 @@ def run(self, *args, **kwargs): # type: ignore
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[EndpointAPI]:
if not self._loop:
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -430,7 +421,7 @@ async def _run_async_subscription_handler(
# Server API
#
@classmethod
@asynccontextmanager # type: ignore
@asynccontextmanager
async def serve(cls, config: ConnectionConfig) -> AsyncIterator["AsyncioEndpoint"]:
endpoint = cls(config.name)
async with endpoint.run():
Expand Down
25 changes: 20 additions & 5 deletions lahja/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Expand All @@ -31,12 +32,13 @@
Hello,
Message,
Msg,
RequestIDGenerator,
Subscription,
SubscriptionsAck,
SubscriptionsUpdated,
)
from .exceptions import ConnectionAttemptRejected, RemoteDisconnected
from .typing import ConditionAPI, EventAPI, LockAPI
from .typing import ConditionAPI, EventAPI, LockAPI, RequestID

TResponse = TypeVar("TResponse", bound=BaseEvent)
TWaitForEvent = TypeVar("TWaitForEvent", bound=BaseEvent)
Expand Down Expand Up @@ -239,11 +241,15 @@ def is_stopped(self) -> bool:
async def _process_incoming_messages(self) -> None:
self._running.set()

# Send the hello message
await self.send_message(Hello(self._local_name))
try:
# Send the hello message
await self.send_message(Hello(self._local_name))
# Wait for the other endpoint to identify itself.
hello = await self.conn.read_message()
except RemoteDisconnected:
self._stopped.set()
return

# Wait for the other endpoint to identify itself.
hello = await self.conn.read_message()
if isinstance(hello, Hello):
self._name = hello.name
self.logger.debug(
Expand Down Expand Up @@ -587,11 +593,20 @@ class BaseEndpoint(EndpointAPI):

_connections: Set[RemoteEndpointAPI]

_get_request_id: Iterator[RequestID]

logger = logging.getLogger("lahja.endpoint.Endpoint")

def __init__(self, name: str) -> None:
self.name = name

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: extract

# storage containers for inbound and outbound connections to other
# endpoints
self._connections = set()
Expand Down
10 changes: 10 additions & 0 deletions lahja/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ class BindError(LahjaError):
"""


class LifecycleError(LahjaError):
"""
Raised when attempting to violate the lifecycle of an endpoint such as
starting an already started endpoint or starting an endpoint that has
already stopped.
"""

pass


class ConnectionAttemptRejected(LahjaError):
"""
Raised when an attempt was made to connect to an endpoint that is already connected.
Expand Down
21 changes: 21 additions & 0 deletions lahja/tools/benchmark/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from lahja.asyncio import AsyncioEndpoint
from lahja.base import BaseEndpoint
from lahja.trio import TrioEndpoint


class BaseBackend(ABC):
Expand Down Expand Up @@ -40,3 +41,23 @@ async def sleep(seconds: float) -> None:
import asyncio

await asyncio.sleep(seconds)


class TrioBackend(BaseBackend):
name = "trio"
Endpoint = TrioEndpoint

@staticmethod
def run(coro: Any, *args: Any) -> None:
# UNCOMMENT FOR DEBUGGING
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(logging.INFO)
import trio

trio.run(coro, *args)

@staticmethod
async def sleep(seconds: float) -> None:
import trio

await trio.sleep(seconds)
1 change: 1 addition & 0 deletions lahja/trio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .endpoint import TrioEndpoint # noqa: F401
Loading