Skip to content

Commit

Permalink
Implement trio based Endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
pipermerriam committed Jun 12, 2019
1 parent 5b44d2c commit 6183136
Show file tree
Hide file tree
Showing 21 changed files with 1,493 additions and 32 deletions.
30 changes: 29 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-asyncio
py36-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-core-trio
py37-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-asyncio
py37-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-core-trio
py36-examples:
<<: *common
docker:
Expand All @@ -93,12 +105,24 @@ jobs:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-asyncio
py36-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.6
environment:
TOXENV: py36-snappy-core-trio
py37-snappy-core-asyncio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-asyncio
py37-snappy-core-trio:
<<: *common
docker:
- image: circleci/python:3.7
environment:
TOXENV: py37-snappy-core-trio
workflows:
version: 2
test:
Expand All @@ -108,8 +132,12 @@ workflows:
- doctest
- lint
- py36-core-asyncio
- py37-core-asyncio
- py36-core-trio
- py36-examples
- py37-examples
- py37-core-asyncio
- py37-core-trio
- py36-snappy-core-asyncio
- py36-snappy-core-trio
- py37-snappy-core-asyncio
- py37-snappy-core-trio
17 changes: 4 additions & 13 deletions lahja/asyncio/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
ConnectionConfig,
Message,
Msg,
RequestIDGenerator,
Subscription,
should_endpoint_receive_item,
)
Expand Down Expand Up @@ -154,7 +153,7 @@ def __init__(
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[RemoteEndpointAPI]:
await self._start()

Expand All @@ -164,7 +163,7 @@ async def run(self) -> AsyncIterator[RemoteEndpointAPI]:
await self.stop()

async def _start(self) -> None:
self._task = asyncio.ensure_future(self._run())
self._task = asyncio.ensure_future(self._process_incoming_messages())
await self.wait_started()

async def stop(self) -> None:
Expand Down Expand Up @@ -199,20 +198,12 @@ class AsyncioEndpoint(BaseEndpoint):
_sync_handler: DefaultDict[Type[BaseEvent], List[SubscriptionSyncHandler]]

_loop: Optional[asyncio.AbstractEventLoop] = None
_get_request_id: Iterator[RequestID]

_subscriptions_changed: asyncio.Event

def __init__(self, name: str) -> None:
super().__init__(name)

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

# Signal when a new remote connection is established
self._remote_connections_changed = asyncio.Condition() # type: ignore

Expand Down Expand Up @@ -293,7 +284,7 @@ def run(self, *args, **kwargs): # type: ignore
#
# Running API
#
@asynccontextmanager # type: ignore
@asynccontextmanager
async def run(self) -> AsyncIterator[EndpointAPI]:
if not self._loop:
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -430,7 +421,7 @@ async def _run_async_subscription_handler(
# Server API
#
@classmethod
@asynccontextmanager # type: ignore
@asynccontextmanager
async def serve(cls, config: ConnectionConfig) -> AsyncIterator["AsyncioEndpoint"]:
endpoint = cls(config.name)
async with endpoint.run():
Expand Down
21 changes: 18 additions & 3 deletions lahja/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Expand All @@ -31,12 +32,13 @@
Hello,
Message,
Msg,
RequestIDGenerator,
Subscription,
SubscriptionsAck,
SubscriptionsUpdated,
)
from .exceptions import ConnectionAttemptRejected, RemoteDisconnected
from .typing import ConditionAPI, EventAPI, LockAPI
from .typing import ConditionAPI, EventAPI, LockAPI, RequestID

TResponse = TypeVar("TResponse", bound=BaseEvent)
TWaitForEvent = TypeVar("TWaitForEvent", bound=BaseEvent)
Expand Down Expand Up @@ -221,16 +223,20 @@ async def wait_ready(self) -> None:
async def wait_stopped(self) -> None:
await self._stopped.wait()

@property
def is_running(self) -> bool:
return not self.is_stopped and self.running.is_set()
return not self.is_stopped and self._running.is_set()

@property
def is_ready(self) -> bool:
return self.is_running and self._ready.is_set()

@property
def is_stopped(self) -> bool:
return self._stopped.is_set()

async def _run(self) -> None:
async def _process_incoming_messages(self) -> None:
self.logger.debug("Starting RemoteEndpoint")
self._running.set()

# Send the hello message
Expand Down Expand Up @@ -587,11 +593,20 @@ class BaseEndpoint(EndpointAPI):

_connections: Set[RemoteEndpointAPI]

_get_request_id: Iterator[RequestID]

logger = logging.getLogger("lahja.endpoint.Endpoint")

def __init__(self, name: str) -> None:
self.name = name

try:
self._get_request_id = RequestIDGenerator(name.encode("ascii") + b":")
except UnicodeDecodeError:
raise Exception(
f"TODO: Invalid endpoint name: '{name}'. Must be ASCII encodable string"
)

# storage containers for inbound and outbound connections to other
# endpoints
self._connections = set()
Expand Down
21 changes: 21 additions & 0 deletions lahja/tools/benchmark/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from lahja.asyncio import AsyncioEndpoint
from lahja.base import BaseEndpoint
from lahja.trio import TrioEndpoint


class BaseBackend(ABC):
Expand Down Expand Up @@ -40,3 +41,23 @@ async def sleep(seconds: float) -> None:
import asyncio

await asyncio.sleep(seconds)


class TrioBackend(BaseBackend):
name = "trio"
Endpoint = TrioEndpoint

@staticmethod
def run(coro: Any, *args: Any) -> None:
# UNCOMMENT FOR DEBUGGING
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(logging.INFO)
import trio

trio.run(coro, *args)

@staticmethod
async def sleep(seconds: float) -> None:
import trio

await trio.sleep(seconds)
1 change: 1 addition & 0 deletions lahja/trio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .endpoint import TrioEndpoint # noqa: F401
Loading

0 comments on commit 6183136

Please sign in to comment.