Skip to content

Commit cce34d7

Browse files
committed
Add dynamic_startup_nodes parameter to async RedisCluster
1 parent c73a43d commit cce34d7

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

CHANGES

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Add dynamic_startup_nodes parameter to async RedisCluster (#2472)
12
* Move doctests (doc code examples) to main branch
23
* Update `ResponseT` type hint
34
* Allow to control the minimum SSL version

redis/asyncio/cluster.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
133133
| Enable read from replicas in READONLY mode. You can read possibly stale data.
134134
When set to true, read commands will be assigned between the primary and
135135
its replications in a Round-Robin manner.
136+
:param dynamic_startup_nodes:
137+
| Set the RedisCluster's startup nodes to all the discovered nodes.
138+
If true (default value), the cluster's discovered nodes will be used to
139+
determine the cluster nodes-slots mapping in the next topology refresh.
140+
It will remove the initial passed startup nodes if their endpoints aren't
141+
listed in the CLUSTER SLOTS output.
142+
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
143+
specific IP addresses, it is best to set it to false.
136144
:param reinitialize_steps:
137145
| Specifies the number of MOVED errors that need to occur before reinitializing
138146
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -233,6 +241,7 @@ def __init__(
233241
startup_nodes: Optional[List["ClusterNode"]] = None,
234242
require_full_coverage: bool = True,
235243
read_from_replicas: bool = False,
244+
dynamic_startup_nodes: bool = True,
236245
reinitialize_steps: int = 5,
237246
cluster_error_retry_attempts: int = 3,
238247
connection_error_retry_attempts: int = 3,
@@ -370,6 +379,7 @@ def __init__(
370379
startup_nodes,
371380
require_full_coverage,
372381
kwargs,
382+
dynamic_startup_nodes=dynamic_startup_nodes,
373383
address_remap=address_remap,
374384
)
375385
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
@@ -1093,6 +1103,7 @@ class NodesManager:
10931103
"require_full_coverage",
10941104
"slots_cache",
10951105
"startup_nodes",
1106+
"_dynamic_startup_nodes",
10961107
"address_remap",
10971108
)
10981109

@@ -1101,11 +1112,13 @@ def __init__(
11011112
startup_nodes: List["ClusterNode"],
11021113
require_full_coverage: bool,
11031114
connection_kwargs: Dict[str, Any],
1115+
dynamic_startup_nodes: bool = True,
11041116
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
11051117
) -> None:
11061118
self.startup_nodes = {node.name: node for node in startup_nodes}
11071119
self.require_full_coverage = require_full_coverage
11081120
self.connection_kwargs = connection_kwargs
1121+
self._dynamic_startup_nodes = dynamic_startup_nodes
11091122
self.address_remap = address_remap
11101123

11111124
self.default_node: "ClusterNode" = None
@@ -1338,8 +1351,10 @@ async def initialize(self) -> None:
13381351
# Set the tmp variables to the real variables
13391352
self.slots_cache = tmp_slots
13401353
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
1341-
# Populate the startup nodes with all discovered nodes
1342-
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
1354+
1355+
if self._dynamic_startup_nodes:
1356+
# Populate the startup nodes with all discovered nodes
1357+
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
13431358

13441359
# Set the default node
13451360
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]

tests/test_asyncio/test_cluster.py

+21
Original file line numberDiff line numberDiff line change
@@ -2620,6 +2620,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
26202620
assert rc.get_node(host=default_host, port=7001) is not None
26212621
assert rc.get_node(host=default_host, port=7002) is not None
26222622

2623+
@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
2624+
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
2625+
rc = await get_mocked_redis_client(
2626+
2627+
port=7000,
2628+
cluster_slots=default_cluster_slots,
2629+
dynamic_startup_nodes=dynamic_startup_nodes,
2630+
)
2631+
# Nodes are taken from default_cluster_slots
2632+
discovered_nodes = [
2633+
"127.0.0.1:7000",
2634+
"127.0.0.1:7001",
2635+
"127.0.0.1:7002",
2636+
"127.0.0.1:7003",
2637+
]
2638+
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
2639+
if dynamic_startup_nodes is True:
2640+
assert startup_nodes.sort() == discovered_nodes.sort()
2641+
else:
2642+
assert startup_nodes == ["[email protected]:7000"]
2643+
26232644

26242645
class TestClusterPipeline:
26252646
"""Tests for the ClusterPipeline class."""

0 commit comments

Comments
 (0)