diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 5c7ea9ca..a38859e0 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -7,6 +7,7 @@ from __future__ import annotations import asyncio +import typing from collections.abc import Awaitable, Callable import functools import inspect @@ -341,7 +342,7 @@ class Pool: '_queue', '_loop', '_minsize', '_maxsize', '_init', '_connect', '_reset', '_connect_args', '_connect_kwargs', '_holders', '_initialized', '_initializing', '_closing', - '_closed', '_connection_class', '_record_class', '_generation', + '_closed', '_connection_class', '_connection_holder_class', '_record_class', '_generation', '_setup', '_max_queries', '_max_inactive_connection_lifetime' ) @@ -356,6 +357,7 @@ def __init__(self, *connect_args, reset=None, loop, connection_class, + connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder, record_class, **connect_kwargs): @@ -408,6 +410,7 @@ def __init__(self, *connect_args, self._queue = None self._connection_class = connection_class + self._connection_holder_class = connection_holder_class self._record_class = record_class self._closing = False @@ -443,37 +446,48 @@ async def _async__init__(self): self._initialized = True async def _initialize(self): + self._initialize_connections_queue() + if self._minsize: + await self._initialize_connections() + + def _initialize_connections_queue(self) -> None: self._queue = asyncio.LifoQueue(maxsize=self._maxsize) for _ in range(self._maxsize): - ch = PoolConnectionHolder( - self, + ch = self._connection_holder_class( + pool=self, + setup=self._setup, max_queries=self._max_queries, max_inactive_time=self._max_inactive_connection_lifetime, - setup=self._setup) - + ) self._holders.append(ch) self._queue.put_nowait(ch) - if self._minsize: - # Since we use a LIFO queue, the first items in the queue will be - # the last ones in `self._holders`. We want to pre-connect the - # first few connections in the queue, therefore we want to walk - # `self._holders` in reverse. - - # Connect the first connection holder in the queue so that - # any connection issues are visible early. - first_ch = self._holders[-1] # type: PoolConnectionHolder - await first_ch.connect() - - if self._minsize > 1: - connect_tasks = [] - for i, ch in enumerate(reversed(self._holders[:-1])): - # `minsize - 1` because we already have first_ch - if i >= self._minsize - 1: - break - connect_tasks.append(ch.connect()) - - await asyncio.gather(*connect_tasks) + + async def _initialize_connections(self) -> None: + + if not self._minsize: + raise exceptions.InterfaceError( + 'pool is already initialized with min_size > 0') + + # Since we use a LIFO queue, the first items in the queue will be + # the last ones in `self._holders`. We want to pre-connect the + # first few connections in the queue, therefore we want to walk + # `self._holders` in reverse. + + # Connect the first connection holder in the queue so that + # any connection issues are visible early. + first_ch = self._holders[-1] # type: PoolConnectionHolder + await first_ch.connect() + + if self._minsize > 1: + connect_tasks = [] + for i, ch in enumerate(reversed(self._holders[:-1])): + # `minsize - 1` because we already have first_ch + if i >= self._minsize - 1: + break + connect_tasks.append(ch.connect()) + + await asyncio.gather(*connect_tasks) def is_closing(self): """Return ``True`` if the pool is closing or is closed. @@ -1083,6 +1097,7 @@ def create_pool(dsn=None, *, reset=None, loop=None, connection_class=connection.Connection, + connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder, record_class=protocol.Record, **connect_kwargs): r"""Create a connection pool. @@ -1142,6 +1157,11 @@ def create_pool(dsn=None, *, The class to use for connections. Must be a subclass of :class:`~asyncpg.connection.Connection`. + :param PoolConnectionHolder connection_holder_class: + The class to use for connection holders. This class is used + to manage the connection lifecycle in the pool. Must be a subclass of + :class:`~asyncpg.pool.PoolConnectionHolder` + :param type record_class: If specified, the class to use for records returned by queries on the connections in this pool. Must be a subclass of @@ -1230,10 +1250,14 @@ def create_pool(dsn=None, *, .. versionchanged:: 0.30.0 Added the *connect* and *reset* parameters. + + .. versionchanged:: 0.31.0 + Added the *pool_connection_holder_class* parameter. """ return Pool( dsn, connection_class=connection_class, + connection_holder_class=connection_holder_class, record_class=record_class, min_size=min_size, max_size=max_size,