Skip to content

Commit 902e7b2

Browse files
authored
[3.11] Restore FlowControlDataQueue class (#9963)
fixes aio-libs/aiodocker#918
1 parent 97be030 commit 902e7b2

File tree

4 files changed

+125
-0
lines changed

4 files changed

+125
-0
lines changed

CHANGES/9963.bugfix.rst

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Restored the ``FlowControlDataQueue`` class -- by :user:`bdraco`.
2+
3+
This class is no longer used internally, and will be permanently removed in the next major version.

aiohttp/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
EMPTY_PAYLOAD as EMPTY_PAYLOAD,
9494
DataQueue as DataQueue,
9595
EofStream as EofStream,
96+
FlowControlDataQueue as FlowControlDataQueue,
9697
StreamReader as StreamReader,
9798
)
9899
from .tracing import (
@@ -148,6 +149,7 @@
148149
"ConnectionTimeoutError",
149150
"ContentTypeError",
150151
"Fingerprint",
152+
"FlowControlDataQueue",
151153
"InvalidURL",
152154
"InvalidUrlClientError",
153155
"InvalidUrlRedirectClientError",

aiohttp/streams.py

+43
Original file line numberDiff line numberDiff line change
@@ -677,3 +677,46 @@ async def read(self) -> _T:
677677

678678
def __aiter__(self) -> AsyncStreamIterator[_T]:
679679
return AsyncStreamIterator(self.read)
680+
681+
682+
class FlowControlDataQueue(DataQueue[_T]):
683+
"""FlowControlDataQueue resumes and pauses an underlying stream.
684+
685+
It is a destination for parsed data.
686+
687+
This class is deprecated and will be removed in version 4.0.
688+
"""
689+
690+
def __init__(
691+
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
692+
) -> None:
693+
super().__init__(loop=loop)
694+
self._size = 0
695+
self._protocol = protocol
696+
self._limit = limit * 2
697+
698+
def feed_data(self, data: _T, size: int = 0) -> None:
699+
super().feed_data(data, size)
700+
self._size += size
701+
702+
if self._size > self._limit and not self._protocol._reading_paused:
703+
self._protocol.pause_reading()
704+
705+
async def read(self) -> _T:
706+
if not self._buffer and not self._eof:
707+
assert not self._waiter
708+
self._waiter = self._loop.create_future()
709+
try:
710+
await self._waiter
711+
except (asyncio.CancelledError, asyncio.TimeoutError):
712+
self._waiter = None
713+
raise
714+
if self._buffer:
715+
data, size = self._buffer.popleft()
716+
self._size -= size
717+
if self._size < self._limit and self._protocol._reading_paused:
718+
self._protocol.resume_reading()
719+
return data
720+
if self._exception is not None:
721+
raise self._exception
722+
raise EofStream

tests/test_flowcontrol_streams.py

+77
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from unittest import mock
23

34
import pytest
@@ -15,6 +16,13 @@ def stream(loop, protocol):
1516
return streams.StreamReader(protocol, limit=1, loop=loop)
1617

1718

19+
@pytest.fixture
20+
def buffer(loop, protocol: mock.Mock) -> streams.FlowControlDataQueue:
21+
out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
22+
out._allow_pause = True
23+
return out
24+
25+
1826
class TestFlowControlStreamReader:
1927
async def test_read(self, stream) -> None:
2028
stream.feed_data(b"da", 2)
@@ -103,3 +111,72 @@ async def test_read_nowait(self, stream) -> None:
103111
res = stream.read_nowait(5)
104112
assert res == b""
105113
assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined]
114+
115+
116+
async def test_flow_control_data_queue_waiter_cancelled(
117+
buffer: streams.FlowControlDataQueue,
118+
) -> None:
119+
"""Test that the waiter is cancelled it is cleared."""
120+
task = asyncio.create_task(buffer.read())
121+
await asyncio.sleep(0)
122+
assert buffer._waiter is not None
123+
buffer._waiter.cancel()
124+
125+
with pytest.raises(asyncio.CancelledError):
126+
await task
127+
assert buffer._waiter is None
128+
129+
130+
async def test_flow_control_data_queue_has_buffer(
131+
buffer: streams.FlowControlDataQueue,
132+
) -> None:
133+
"""Test reading from the buffer."""
134+
data = object()
135+
buffer.feed_data(data, 100)
136+
assert buffer._size == 100
137+
read_data = await buffer.read()
138+
assert read_data is data
139+
assert buffer._size == 0
140+
141+
142+
async def test_flow_control_data_queue_read_with_exception(
143+
buffer: streams.FlowControlDataQueue,
144+
) -> None:
145+
"""Test reading when the buffer is empty and an exception is set."""
146+
buffer.set_exception(ValueError("unique_string"))
147+
with pytest.raises(ValueError, match="unique_string"):
148+
await buffer.read()
149+
150+
151+
def test_flow_control_data_queue_feed_pause(
152+
buffer: streams.FlowControlDataQueue,
153+
) -> None:
154+
"""Test feeding data and pausing the reader."""
155+
buffer._protocol._reading_paused = False
156+
buffer.feed_data(object(), 100)
157+
assert buffer._protocol.pause_reading.called
158+
159+
buffer._protocol._reading_paused = True
160+
buffer._protocol.pause_reading.reset_mock()
161+
buffer.feed_data(object(), 100)
162+
assert not buffer._protocol.pause_reading.called
163+
164+
165+
async def test_flow_control_data_queue_resume_on_read(
166+
buffer: streams.FlowControlDataQueue,
167+
) -> None:
168+
"""Test that the reader is resumed when reading."""
169+
buffer.feed_data(object(), 100)
170+
171+
buffer._protocol._reading_paused = True
172+
await buffer.read()
173+
assert buffer._protocol.resume_reading.called
174+
175+
176+
async def test_flow_control_data_queue_read_eof(
177+
buffer: streams.FlowControlDataQueue,
178+
) -> None:
179+
"""Test that reading after eof raises EofStream."""
180+
buffer.feed_eof()
181+
with pytest.raises(streams.EofStream):
182+
await buffer.read()

0 commit comments

Comments
 (0)