Skip to content

Commit 1fe0532

Browse files
authored
Allow multiple handlers in separate thread (#299)
1 parent a816f86 commit 1fe0532

File tree

3 files changed

+66
-64
lines changed

3 files changed

+66
-64
lines changed

scrapy_playwright/_utils.py

+45-55
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import platform
44
import threading
5-
from typing import Awaitable, Iterator, Optional, Tuple, Union
5+
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union
66

77
import scrapy
88
from playwright.async_api import Error, Page, Request, Response
@@ -103,68 +103,58 @@ async def _get_header_value(
103103
return None
104104

105105

106-
if platform.system() == "Windows":
107-
108-
class _ThreadedLoopAdapter:
109-
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
110-
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
111-
use ProactorEventLoop which is supported by Playwright on Windows.
112-
"""
113-
114-
_loop: asyncio.AbstractEventLoop
115-
_thread: threading.Thread
116-
_coro_queue: asyncio.Queue = asyncio.Queue()
117-
_stop_event: asyncio.Event = asyncio.Event()
118-
119-
@classmethod
120-
async def _handle_coro(cls, coro, future) -> None:
121-
try:
122-
future.set_result(await coro)
123-
except Exception as exc:
124-
future.set_exception(exc)
125-
126-
@classmethod
127-
async def _process_queue(cls) -> None:
128-
while not cls._stop_event.is_set():
129-
coro, future = await cls._coro_queue.get()
130-
asyncio.create_task(cls._handle_coro(coro, future))
131-
cls._coro_queue.task_done()
132-
133-
@classmethod
134-
def _deferred_from_coro(cls, coro) -> Deferred:
135-
future: asyncio.Future = asyncio.Future()
136-
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop)
137-
return scrapy.utils.defer.deferred_from_coro(future)
138-
139-
@classmethod
140-
def start(cls) -> None:
141-
policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined]
106+
class _ThreadedLoopAdapter:
107+
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
108+
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
109+
use ProactorEventLoop which is supported by Playwright on Windows.
110+
"""
111+
112+
_loop: asyncio.AbstractEventLoop
113+
_thread: threading.Thread
114+
_coro_queue: asyncio.Queue = asyncio.Queue()
115+
_stop_events: Dict[int, asyncio.Event] = {}
116+
117+
@classmethod
118+
async def _handle_coro(cls, coro, future) -> None:
119+
try:
120+
future.set_result(await coro)
121+
except Exception as exc:
122+
future.set_exception(exc)
123+
124+
@classmethod
125+
async def _process_queue(cls) -> None:
126+
while any(not ev.is_set() for ev in cls._stop_events.values()):
127+
coro, future = await cls._coro_queue.get()
128+
asyncio.create_task(cls._handle_coro(coro, future))
129+
cls._coro_queue.task_done()
130+
131+
@classmethod
132+
def _deferred_from_coro(cls, coro) -> Deferred:
133+
future: asyncio.Future = asyncio.Future()
134+
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop)
135+
return scrapy.utils.defer.deferred_from_coro(future)
136+
137+
@classmethod
138+
def start(cls, caller_id: int) -> None:
139+
cls._stop_events[caller_id] = asyncio.Event()
140+
if not getattr(cls, "_loop", None):
141+
policy = asyncio.DefaultEventLoopPolicy()
142+
if platform.system() == "Windows":
143+
policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined]
142144
cls._loop = policy.new_event_loop()
143145
asyncio.set_event_loop(cls._loop)
144146

147+
if not getattr(cls, "_thread", None):
145148
cls._thread = threading.Thread(target=cls._loop.run_forever, daemon=True)
146149
cls._thread.start()
147150
logger.info("Started loop on separate thread: %s", cls._loop)
148-
149151
asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop)
150152

151-
@classmethod
152-
def stop(cls) -> None:
153-
cls._stop_event.set()
153+
@classmethod
154+
def stop(cls, caller_id: int) -> None:
155+
"""Wait until all handlers are closed to stop the event loop and join the thread."""
156+
cls._stop_events[caller_id].set()
157+
if all(ev.is_set() for ev in cls._stop_events.values()):
154158
asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop)
155159
cls._loop.call_soon_threadsafe(cls._loop.stop)
156160
cls._thread.join()
157-
158-
_deferred_from_coro = _ThreadedLoopAdapter._deferred_from_coro
159-
else:
160-
161-
class _ThreadedLoopAdapter: # type: ignore[no-redef]
162-
@classmethod
163-
def start(cls) -> None:
164-
pass
165-
166-
@classmethod
167-
def stop(cls) -> None:
168-
pass
169-
170-
_deferred_from_coro = scrapy.utils.defer.deferred_from_coro

scrapy_playwright/handler.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import platform
34
from contextlib import suppress
45
from dataclasses import dataclass, field as dataclass_field
56
from ipaddress import ip_address
@@ -26,6 +27,7 @@
2627
from scrapy.http.headers import Headers
2728
from scrapy.responsetypes import responsetypes
2829
from scrapy.settings import Settings
30+
from scrapy.utils.defer import deferred_from_coro
2931
from scrapy.utils.misc import load_object
3032
from scrapy.utils.reactor import verify_installed_reactor
3133
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -34,7 +36,6 @@
3436
from scrapy_playwright.page import PageMethod
3537
from scrapy_playwright._utils import (
3638
_ThreadedLoopAdapter,
37-
_deferred_from_coro,
3839
_encode_body,
3940
_get_float_setting,
4041
_get_header_value,
@@ -91,6 +92,7 @@ class Config:
9192
startup_context_kwargs: dict
9293
navigation_timeout: Optional[float]
9394
restart_disconnected_browser: bool
95+
use_threaded_loop: bool
9496

9597
@classmethod
9698
def from_settings(cls, settings: Settings) -> "Config":
@@ -114,6 +116,8 @@ def from_settings(cls, settings: Settings) -> "Config":
114116
restart_disconnected_browser=settings.getbool(
115117
"PLAYWRIGHT_RESTART_DISCONNECTED_BROWSER", default=True
116118
),
119+
use_threaded_loop=platform.system() == "Windows"
120+
or settings.getbool("_PLAYWRIGHT_THREADED_LOOP", False),
117121
)
118122
cfg.cdp_kwargs.pop("endpoint_url", None)
119123
cfg.connect_kwargs.pop("ws_endpoint", None)
@@ -130,13 +134,14 @@ class ScrapyPlaywrightDownloadHandler(HTTPDownloadHandler):
130134

131135
def __init__(self, crawler: Crawler) -> None:
132136
super().__init__(settings=crawler.settings, crawler=crawler)
133-
_ThreadedLoopAdapter.start()
134137
verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor")
135138
crawler.signals.connect(self._engine_started, signals.engine_started)
136139
self.stats = crawler.stats
137-
138140
self.config = Config.from_settings(crawler.settings)
139141

142+
if self.config.use_threaded_loop:
143+
_ThreadedLoopAdapter.start(id(self))
144+
140145
self.browser_launch_lock = asyncio.Lock()
141146
self.context_launch_lock = asyncio.Lock()
142147
self.context_wrappers: Dict[str, BrowserContextWrapper] = {}
@@ -162,9 +167,14 @@ def __init__(self, crawler: Crawler) -> None:
162167
def from_crawler(cls: Type[PlaywrightHandler], crawler: Crawler) -> PlaywrightHandler:
163168
return cls(crawler)
164169

170+
def _deferred_from_coro(self, coro: Awaitable) -> Deferred:
171+
if self.config.use_threaded_loop:
172+
return _ThreadedLoopAdapter._deferred_from_coro(coro)
173+
return deferred_from_coro(coro)
174+
165175
def _engine_started(self) -> Deferred:
166176
"""Launch the browser. Use the engine_started signal as it supports returning deferreds."""
167-
return _deferred_from_coro(self._launch())
177+
return self._deferred_from_coro(self._launch())
168178

169179
async def _launch(self) -> None:
170180
"""Launch Playwright manager and configured startup context(s)."""
@@ -333,8 +343,9 @@ def _set_max_concurrent_context_count(self):
333343
def close(self) -> Deferred:
334344
logger.info("Closing download handler")
335345
yield super().close()
336-
yield _deferred_from_coro(self._close())
337-
_ThreadedLoopAdapter.stop()
346+
yield self._deferred_from_coro(self._close())
347+
if self.config.use_threaded_loop:
348+
_ThreadedLoopAdapter.stop(id(self))
338349

339350
async def _close(self) -> None:
340351
await asyncio.gather(*[ctx.context.close() for ctx in self.context_wrappers.values()])
@@ -349,7 +360,7 @@ async def _close(self) -> None:
349360

350361
def download_request(self, request: Request, spider: Spider) -> Deferred:
351362
if request.meta.get("playwright"):
352-
return _deferred_from_coro(self._download_request(request, spider))
363+
return self._deferred_from_coro(self._download_request(request, spider))
353364
return super().download_request(request, spider)
354365

355366
async def _download_request(self, request: Request, spider: Spider) -> Response:

tests/__init__.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ def allow_windows(test_method):
2424

2525
@wraps(test_method)
2626
async def wrapped(self, *args, **kwargs):
27-
_ThreadedLoopAdapter.start()
27+
caller_id = 1234
28+
_ThreadedLoopAdapter.start(caller_id)
2829
coro = test_method(self, *args, **kwargs)
2930
asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result()
30-
_ThreadedLoopAdapter.stop()
31+
_ThreadedLoopAdapter.stop(caller_id)
3132

3233
return wrapped
3334

0 commit comments

Comments
 (0)