Skip to content

Commit

Permalink
Merge pull request #126 from pipermerriam/piper/trio-base-endpoint-im…
Browse files Browse the repository at this point in the history
…plementation

Trio base endpoint implementation
  • Loading branch information
pipermerriam authored Jun 24, 2019
2 parents 7df1d3d + 37b33c0 commit f0b7ead
Show file tree
Hide file tree
Showing 24 changed files with 1,516 additions and 33 deletions.
30 changes: 29 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-asyncio
py36-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-trio
py37-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-asyncio
py37-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-trio
py36-examples:
<<: *common
docker:
Expand All @@ -93,12 +105,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-asyncio
py36-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-trio
py37-snappy-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-asyncio
py37-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-trio
workflows:
version: 2
test:
Expand All @@ -108,8 +132,12 @@ workflows:
- doctest
- lint
- py36-core-asyncio
- py37-core-asyncio
- py36-core-trio
- py36-examples
- py37-examples
- py37-core-asyncio
- py37-core-trio
- py36-snappy-core-asyncio
- py36-snappy-core-trio
- py37-snappy-core-asyncio
- py37-snappy-core-trio
15 changes: 3 additions & 12 deletions lahja/asyncio/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
ConnectionConfig,
Message,
Msg,
RequestIDGenerator,
Subscription,
should_endpoint_receive_item,
)
Expand Down Expand Up @@ -154,7 +153,7 @@ def __init__(
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[RemoteEndpointAPI]:
await self._start()

Expand Down Expand Up @@ -199,20 +198,12 @@ class AsyncioEndpoint(BaseEndpoint):
_sync_handler: DefaultDict[Type[BaseEvent], List[SubscriptionSyncHandler]]

_loop: Optional[asyncio.AbstractEventLoop] = None
_get_request_id: Iterator[RequestID]

_subscriptions_changed: asyncio.Event

def __init__(self, name: str) -> None:
super().__init__(name)

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

# Signal when a new remote connection is established
self._remote_connections_changed = asyncio.Condition() # type: ignore

Expand Down Expand Up @@ -293,7 +284,7 @@ def run(self, *args, **kwargs): # type: ignore
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[EndpointAPI]:
if not self._loop:
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -430,7 +421,7 @@ async def _run_async_subscription_handler(
# Server API
#
@classmethod
@asynccontextmanager # type: ignore
@asynccontextmanager
async def serve(cls, config: ConnectionConfig) -> AsyncIterator["AsyncioEndpoint"]:
endpoint = cls(config.name)
async with endpoint.run():
Expand Down
25 changes: 20 additions & 5 deletions lahja/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Expand All @@ -31,12 +32,13 @@
Hello,
Message,
Msg,
RequestIDGenerator,
Subscription,
SubscriptionsAck,
SubscriptionsUpdated,
)
from .exceptions import ConnectionAttemptRejected, RemoteDisconnected
from .typing import ConditionAPI, EventAPI, LockAPI
from .typing import ConditionAPI, EventAPI, LockAPI, RequestID

TResponse = TypeVar("TResponse", bound=BaseEvent)
TWaitForEvent = TypeVar("TWaitForEvent", bound=BaseEvent)
Expand Down Expand Up @@ -239,11 +241,15 @@ def is_stopped(self) -> bool:
async def _process_incoming_messages(self) -> None:
self._running.set()

# Send the hello message
await self.send_message(Hello(self._local_name))
try:
# Send the hello message
await self.send_message(Hello(self._local_name))
# Wait for the other endpoint to identify itself.
hello = await self.conn.read_message()
except RemoteDisconnected:
self._stopped.set()
return

# Wait for the other endpoint to identify itself.
hello = await self.conn.read_message()
if isinstance(hello, Hello):
self._name = hello.name
self.logger.debug(
Expand Down Expand Up @@ -587,11 +593,20 @@ class BaseEndpoint(EndpointAPI):

_connections: Set[RemoteEndpointAPI]

_get_request_id: Iterator[RequestID]

logger = logging.getLogger("lahja.endpoint.Endpoint")

def __init__(self, name: str) -> None:
self.name = name

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

# storage containers for inbound and outbound connections to other
# endpoints
self._connections = set()
Expand Down
10 changes: 10 additions & 0 deletions lahja/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ class BindError(LahjaError):
"""


class LifecycleError(LahjaError):
"""
Raised when attempting to violate the lifecycle of an endpoint such as
starting an already started endpoint or starting an endpoint that has
already stopped.
"""

pass


class ConnectionAttemptRejected(LahjaError):
"""
Raised when an attempt was made to connect to an endpoint that is already connected.
Expand Down
21 changes: 21 additions & 0 deletions lahja/tools/benchmark/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from lahja.asyncio import AsyncioEndpoint
from lahja.base import BaseEndpoint
from lahja.trio import TrioEndpoint


class BaseBackend(ABC):
Expand Down Expand Up @@ -40,3 +41,23 @@ async def sleep(seconds: float) -> None:
import asyncio

await asyncio.sleep(seconds)


class TrioBackend(BaseBackend):
name = "trio"
Endpoint = TrioEndpoint

@staticmethod
def run(coro: Any, *args: Any) -> None:
# UNCOMMENT FOR DEBUGGING
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(logging.INFO)
import trio

trio.run(coro, *args)

@staticmethod
async def sleep(seconds: float) -> None:
import trio

await trio.sleep(seconds)
1 change: 1 addition & 0 deletions lahja/trio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .endpoint import TrioEndpoint # noqa: F401
Loading

0 comments on commit f0b7ead

Please sign in to comment.