Skip to content

Commit 06d5e2e

Browse files
committed
Iterators without aclose() (rare) are notoriously problematic...
- So now aclose() is explicitly required. - Since we effectively had an in-project `aclosing()` implementation, also removed async_generator dep. - The new, better-annotated `aclosing()` triggered Pyright to alert on a few places where it was being applied to an iterable instead of an iterator. Fixed those, too.
1 parent 4a2e9f4 commit 06d5e2e

18 files changed

+102
-109
lines changed

docs/user.rst

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ pipeline out of individual sections. A ``PipelineSection`` is any object that is
3838
function. This currently includes the following types:
3939

4040
AsyncIterables
41-
Async iterables are valid only as the very first ``PipelineSection``. Subsequent
42-
sections will use this async iterable as input source. Placing an ``AsyncIterable`` into the middle of
43-
a sequence of pipeline sections, will cause a ``ValueError``.
41+
Async iterables are valid only as the very first ``PipelineSection``, and must support the ``aclose()``
42+
method (nearly all do). Subsequent sections will use this async iterable as input source. Placing an
43+
``AsyncIterable`` into the middle of a sequence of pipeline sections, will cause a ``ValueError``.
4444
Sections
4545
Any :class:`Section <slurry.sections.abc.Section>` abc subclass is a valid ``PipelineSection``, at any
4646
position in the pipeline.

poetry.lock

+1-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ classifiers = [
2323

2424

2525
[tool.poetry.dependencies]
26-
async-generator = "^1.10"
2726
python = "^3.8"
2827
trio = "^0.23.0"
2928

slurry/_pipeline.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
from contextlib import asynccontextmanager
66

77
import trio
8-
from async_generator import aclosing
98

109
from .sections.weld import weld
1110
from ._tap import Tap
1211
from ._types import PipelineSection
12+
from ._utils import aclosing
1313

1414
class Pipeline:
1515
"""The main Slurry ``Pipeline`` class.
@@ -54,7 +54,7 @@ async def _pump(self):
5454
output = weld(nursery, *self.sections)
5555

5656
# Output to taps
57-
async with aclosing(output) as aiter:
57+
async with aclosing(output.__aiter__()) as aiter:
5858
async for item in aiter:
5959
self._taps = set(filter(lambda tap: not tap.closed, self._taps))
6060
if not self._taps:

slurry/_types.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
1-
from typing import Any, AsyncIterable, Awaitable, Protocol, Tuple, Union, runtime_checkable
1+
from typing import Any, Awaitable, Protocol, Tuple, TypeVar, Union, runtime_checkable
22

3-
from .sections.abc import Section
3+
from .sections import abc
44

5-
PipelineSection = Union[AsyncIterable[Any], Section, Tuple["PipelineSection", ...]]
5+
PipelineSection = Union["AsyncIterableWithAcloseableIterator[Any]", "abc.Section", Tuple["PipelineSection", ...]]
6+
7+
_T_co = TypeVar("_T_co", covariant=True)
68

79
@runtime_checkable
810
class SupportsAclose(Protocol):
9-
def aclose(self) -> Awaitable[object]:
10-
...
11+
def aclose(self) -> Awaitable[object]: ...
12+
13+
@runtime_checkable
14+
class AcloseableAsyncIterator(SupportsAclose, Protocol[_T_co]):
15+
def __anext__(self) -> Awaitable[_T_co]: ...
16+
def __aiter__(self) -> "AcloseableAsyncIterator[_T_co]": ...
17+
18+
@runtime_checkable
19+
class AsyncIterableWithAcloseableIterator(Protocol[_T_co]):
20+
def __aiter__(self) -> AcloseableAsyncIterator[_T_co]: ...

slurry/_utils.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
from typing import AsyncGenerator, AsyncIterator, TypeVar
1+
from typing import AsyncGenerator, TypeVar
22

3-
from ._types import SupportsAclose
3+
from ._types import AcloseableAsyncIterator
44

55
from contextlib import asynccontextmanager
66

77
_T = TypeVar("_T")
88

99
@asynccontextmanager
10-
async def safe_aclosing(obj: AsyncIterator[_T]) -> AsyncGenerator[AsyncIterator[_T], None]:
10+
async def aclosing(obj: AcloseableAsyncIterator[_T]) -> AsyncGenerator[AcloseableAsyncIterator[_T], None]:
1111
try:
1212
yield obj
1313
finally:
14-
if isinstance(obj, SupportsAclose):
15-
await obj.aclose()
14+
await obj.aclose()

slurry/environments/_multiprocessing.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
"""Implements a section that runs in an independent python proces."""
22

33
from multiprocessing import Process, SimpleQueue
4-
from typing import AsyncIterable, Any, Awaitable, Callable, Optional, cast
4+
from typing import Any, Awaitable, Callable, Optional, cast
55

66
import trio
77

88
from ..sections.abc import SyncSection
9+
from .._types import AsyncIterableWithAcloseableIterator
910

1011
class ProcessSection(SyncSection):
1112
"""ProcessSection defines a section interface with a synchronous
@@ -19,7 +20,9 @@ class ProcessSection(SyncSection):
1920
<https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled>`_.
2021
"""
2122

22-
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
23+
async def pump(
24+
self, input: Optional[AsyncIterableWithAcloseableIterator[Any]], output: Callable[[Any], Awaitable[None]]
25+
):
2326
"""
2427
The ``ProcessSection`` pump method works similar to the threaded version, however
2528
since communication between processes is not as simple as it is between threads,

slurry/environments/_threading.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""The threading module implements a synchronous section that runs in a background thread."""
2-
from typing import Any, AsyncIterable, Awaitable, Callable, Optional
2+
from typing import Any, Awaitable, Callable, Optional
33

44
import trio
55

66
from ..sections.abc import SyncSection
7+
from .._types import AsyncIterableWithAcloseableIterator
78

89

910
class ThreadSection(SyncSection):
@@ -12,7 +13,7 @@ class ThreadSection(SyncSection):
1213
"""
1314

1415
async def pump(self,
15-
input: Optional[AsyncIterable[Any]],
16+
input: Optional[AsyncIterableWithAcloseableIterator[Any]],
1617
output: Callable[[Any], Awaitable[None]]):
1718
"""Runs the refine method in a background thread with synchronous input and output
1819
wrappers, which transparently bridges the input and outputs between the parent

slurry/environments/_trio.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
"""The Trio environment implements ``TrioSection``, which is a Trio-native
22
:class:`AsyncSection <slurry.sections.abc.AsyncSection>`."""
3-
from typing import Any, AsyncIterable, Awaitable, Callable, Optional
3+
from typing import Any, Awaitable, Callable, Optional
44

55
from ..sections.abc import AsyncSection
6+
from .._types import AsyncIterableWithAcloseableIterator
67

78
class TrioSection(AsyncSection):
89
"""Since Trio is the native Slurry event loop, this environment is simple to implement.
910
The pump method does not need to do anything special to bridge the input and output. It
1011
simply delegates directly to the refine method, as the api is identical."""
11-
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
12+
async def pump(
13+
self, input: Optional[AsyncIterableWithAcloseableIterator[Any]], output: Callable[[Any], Awaitable[None]]
14+
):
1215
"""Calls refine."""
1316
await self.refine(input, output)

slurry/sections/_buffers.py

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
"""Pipeline sections with age- and volume-based buffers."""
22
from collections import deque
33
import math
4-
from typing import Any, AsyncIterable, Callable, Optional, Sequence
4+
from typing import Any, Callable, Optional, Sequence
55

66
import trio
7-
from async_generator import aclosing
87

98
from ..environments import TrioSection
9+
from .._types import AsyncIterableWithAcloseableIterator
10+
from .._utils import aclosing
1011

1112
class Window(TrioSection):
1213
"""Window buffer with size and age limits.
@@ -26,13 +27,13 @@ class Window(TrioSection):
2627
:param max_size: The maximum buffer size.
2728
:type max_size: int
2829
:param source: Input when used as first section.
29-
:type source: Optional[AsyncIterable[Any]]
30+
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
3031
:param max_age: Maximum item age in seconds. (default: unlimited)
3132
:type max_age: float
3233
:param min_size: Minimum amount of items in the buffer to trigger an output.
3334
:type min_size: int
3435
"""
35-
def __init__(self, max_size: int, source: Optional[AsyncIterable[Any]] = None, *,
36+
def __init__(self, max_size: int, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None, *,
3637
max_age: float = math.inf,
3738
min_size: int = 1):
3839
super().__init__()
@@ -51,7 +52,7 @@ async def refine(self, input, output):
5152

5253
buf = deque()
5354

54-
async with aclosing(source) as aiter:
55+
async with aclosing(source.__aiter__()) as aiter:
5556
async for item in aiter:
5657
now = trio.current_time()
5758
buf.append((item, now))
@@ -80,7 +81,7 @@ class Group(TrioSection):
8081
:param interval: Time in seconds from when an item arrives until the buffer is sent.
8182
:type interval: float
8283
:param source: Input when used as first section.
83-
:type source: Optional[AsyncIterable[Any]]
84+
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
8485
:param max_size: Maximum number of items in buffer, which when reached, will cause the buffer
8586
to be sent.
8687
:type max_size: int
@@ -89,7 +90,7 @@ class Group(TrioSection):
8990
:param reducer: Optional reducer function used to transform the buffer to a single value.
9091
:type reducer: Optional[Callable[[Sequence[Any]], Any]]
9192
"""
92-
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *,
93+
def __init__(self, interval: float, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None, *,
9394
max_size: Optional[int] = None,
9495
mapper: Optional[Callable[[Any], Any]] = None,
9596
reducer: Optional[Callable[[Sequence[Any]], Any]] = None):
@@ -111,7 +112,7 @@ async def refine(self, input, output):
111112

112113
send_channel, receive_channel = trio.open_memory_channel(0)
113114
async def pull_task():
114-
async with send_channel, aclosing(source) as aiter:
115+
async with send_channel, aclosing(source.__aiter__()) as aiter:
115116
async for item in aiter:
116117
await send_channel.send(item)
117118
nursery.start_soon(pull_task)
@@ -152,9 +153,9 @@ class Delay(TrioSection):
152153
:param interval: Number of seconds that each item is delayed.
153154
:type interval: float
154155
:param source: Input when used as first section.
155-
:type source: Optional[AsyncIterable[Any]]
156+
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
156157
"""
157-
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None):
158+
def __init__(self, interval: float, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None):
158159
super().__init__()
159160
self.source = source
160161
self.interval = interval
@@ -169,7 +170,7 @@ async def refine(self, input, output):
169170
buffer_input_channel, buffer_output_channel = trio.open_memory_channel(math.inf)
170171

171172
async def pull_task():
172-
async with buffer_input_channel, aclosing(source) as aiter:
173+
async with buffer_input_channel, aclosing(source.__aiter__()) as aiter:
173174
async for item in aiter:
174175
await buffer_input_channel.send((item, trio.current_time() + self.interval))
175176

slurry/sections/_combiners.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
import itertools
44

55
import trio
6-
from async_generator import aclosing
76

87
from ..environments import TrioSection
98
from .weld import weld
109
from .._types import PipelineSection
10+
from .._utils import aclosing
1111

1212
class Chain(TrioSection):
1313
"""Chains input from one or more sources. Any valid ``PipelineSection`` is an allowed source.
@@ -40,7 +40,7 @@ async def refine(self, input, output):
4040
sources = self.sources
4141
async with trio.open_nursery() as nursery:
4242
for source in sources:
43-
async with aclosing(weld(nursery, source)) as agen:
43+
async with aclosing(weld(nursery, source).__aiter__()) as agen:
4444
async for item in agen:
4545
await output(item)
4646

@@ -66,7 +66,7 @@ async def refine(self, input, output):
6666
async with trio.open_nursery() as nursery:
6767

6868
async def pull_task(source):
69-
async with aclosing(weld(nursery, source)) as aiter:
69+
async with aclosing(weld(nursery, source).__aiter__()) as aiter:
7070
async for item in aiter:
7171
await output(item)
7272

@@ -155,7 +155,8 @@ class ZipLatest(TrioSection):
155155
default value to output, until an input has arrived on a source. Defaults to ``None``.
156156
:type default: Any
157157
:param monitor: Additional asynchronous sequences to monitor.
158-
:type monitor: Optional[Union[AsyncIterable[Any], Sequence[AsyncIterable[Any]]]]
158+
:type monitor: Optional[Union[AsyncIterableWithAcloseableIterator[Any],
159+
Sequence[AsyncIterableWithAcloseableIterator[Any]]]]
159160
:param place_input: Position of the pipeline input source in the output tuple. Options:
160161
``'first'`` (default)|``'last'``
161162
:type place_input: string
@@ -203,7 +204,7 @@ async def refine(self, input, output):
203204
async with trio.open_nursery() as nursery:
204205

205206
async def pull_task(index, source, monitor=False):
206-
async with aclosing(weld(nursery, source)) as aiter:
207+
async with aclosing(weld(nursery, source).__aiter__()) as aiter:
207208
async for item in aiter:
208209
results[index] = item
209210
ready[index] = True

0 commit comments

Comments
 (0)