Skip to content

Add connection_holder_class to Pool for custom connection handling #1251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import asyncio
import typing
from collections.abc import Awaitable, Callable
import functools
import inspect
Expand Down Expand Up @@ -341,7 +342,7 @@ class Pool:
'_queue', '_loop', '_minsize', '_maxsize',
'_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
'_holders', '_initialized', '_initializing', '_closing',
'_closed', '_connection_class', '_record_class', '_generation',
'_closed', '_connection_class', '_connection_holder_class', '_record_class', '_generation',
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
)

Expand All @@ -356,6 +357,7 @@ def __init__(self, *connect_args,
reset=None,
loop,
connection_class,
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
record_class,
**connect_kwargs):

Expand Down Expand Up @@ -408,6 +410,7 @@ def __init__(self, *connect_args,
self._queue = None

self._connection_class = connection_class
self._connection_holder_class = connection_holder_class
self._record_class = record_class

self._closing = False
Expand Down Expand Up @@ -443,37 +446,48 @@ async def _async__init__(self):
self._initialized = True

async def _initialize(self):
self._initialize_connections_queue()
if self._minsize:
await self._initialize_connections()

def _initialize_connections_queue(self) -> None:
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
for _ in range(self._maxsize):
ch = PoolConnectionHolder(
self,
ch = self._connection_holder_class(
pool=self,
setup=self._setup,
max_queries=self._max_queries,
max_inactive_time=self._max_inactive_connection_lifetime,
setup=self._setup)

)
self._holders.append(ch)
self._queue.put_nowait(ch)

if self._minsize:
# Since we use a LIFO queue, the first items in the queue will be
# the last ones in `self._holders`. We want to pre-connect the
# first few connections in the queue, therefore we want to walk
# `self._holders` in reverse.

# Connect the first connection holder in the queue so that
# any connection issues are visible early.
first_ch = self._holders[-1] # type: PoolConnectionHolder
await first_ch.connect()

if self._minsize > 1:
connect_tasks = []
for i, ch in enumerate(reversed(self._holders[:-1])):
# `minsize - 1` because we already have first_ch
if i >= self._minsize - 1:
break
connect_tasks.append(ch.connect())

await asyncio.gather(*connect_tasks)

async def _initialize_connections(self) -> None:

if not self._minsize:
raise exceptions.InterfaceError(
'pool is already initialized with min_size > 0')

# Since we use a LIFO queue, the first items in the queue will be
# the last ones in `self._holders`. We want to pre-connect the
# first few connections in the queue, therefore we want to walk
# `self._holders` in reverse.

# Connect the first connection holder in the queue so that
# any connection issues are visible early.
first_ch = self._holders[-1] # type: PoolConnectionHolder
await first_ch.connect()

if self._minsize > 1:
connect_tasks = []
for i, ch in enumerate(reversed(self._holders[:-1])):
# `minsize - 1` because we already have first_ch
if i >= self._minsize - 1:
break
connect_tasks.append(ch.connect())

await asyncio.gather(*connect_tasks)

def is_closing(self):
"""Return ``True`` if the pool is closing or is closed.
Expand Down Expand Up @@ -1083,6 +1097,7 @@ def create_pool(dsn=None, *,
reset=None,
loop=None,
connection_class=connection.Connection,
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
record_class=protocol.Record,
**connect_kwargs):
r"""Create a connection pool.
Expand Down Expand Up @@ -1142,6 +1157,11 @@ def create_pool(dsn=None, *,
The class to use for connections. Must be a subclass of
:class:`~asyncpg.connection.Connection`.

:param PoolConnectionHolder connection_holder_class:
The class to use for connection holders. This class is used
to manage the connection lifecycle in the pool. Must be a subclass of
:class:`~asyncpg.pool.PoolConnectionHolder`

:param type record_class:
If specified, the class to use for records returned by queries on
the connections in this pool. Must be a subclass of
Expand Down Expand Up @@ -1230,10 +1250,14 @@ def create_pool(dsn=None, *,

.. versionchanged:: 0.30.0
Added the *connect* and *reset* parameters.

.. versionchanged:: 0.31.0
Added the *pool_connection_holder_class* parameter.
"""
return Pool(
dsn,
connection_class=connection_class,
connection_holder_class=connection_holder_class,
record_class=record_class,
min_size=min_size,
max_size=max_size,
Expand Down