diff --git a/CHANGES b/CHANGES
index 8750128b05..b955681b89 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,4 @@
+    * Add dynamic_startup_nodes parameter to async RedisCluster (#2472)
     * Move doctests (doc code examples) to main branch
     * Update `ResponseT` type hint
     * Allow to control the minimum SSL version
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 408fa19363..dbc32047aa 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -136,6 +136,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
         | Enable read from replicas in READONLY mode. You can read possibly stale data.
           When set to true, read commands will be assigned between the primary and
           its replications in a Round-Robin manner.
+    :param dynamic_startup_nodes:
+        | Set the RedisCluster's startup nodes to all the discovered nodes.
+          If true (default value), the cluster's discovered nodes will be used to
+          determine the cluster nodes-slots mapping in the next topology refresh.
+          It will remove the initial passed startup nodes if their endpoints aren't
+          listed in the CLUSTER SLOTS output.
+          If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
+          specific IP addresses, it is best to set it to false.
     :param reinitialize_steps:
         | Specifies the number of MOVED errors that need to occur before reinitializing
           the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -236,6 +244,7 @@ def __init__(
         startup_nodes: Optional[List["ClusterNode"]] = None,
         require_full_coverage: bool = True,
         read_from_replicas: bool = False,
+        dynamic_startup_nodes: bool = True,
         reinitialize_steps: int = 5,
         cluster_error_retry_attempts: int = 3,
         connection_error_retry_attempts: int = 3,
@@ -379,6 +388,7 @@ def __init__(
             startup_nodes,
             require_full_coverage,
             kwargs,
+            dynamic_startup_nodes=dynamic_startup_nodes,
             address_remap=address_remap,
             event_dispatcher=self._event_dispatcher,
         )
@@ -1136,6 +1146,7 @@ class NodesManager:
         "require_full_coverage",
         "slots_cache",
         "startup_nodes",
+        "_dynamic_startup_nodes",
         "address_remap",
     )
 
@@ -1144,12 +1155,14 @@ def __init__(
         startup_nodes: List["ClusterNode"],
         require_full_coverage: bool,
         connection_kwargs: Dict[str, Any],
+        dynamic_startup_nodes: bool = True,
         address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
         event_dispatcher: Optional[EventDispatcher] = None,
     ) -> None:
         self.startup_nodes = {node.name: node for node in startup_nodes}
         self.require_full_coverage = require_full_coverage
         self.connection_kwargs = connection_kwargs
+        self._dynamic_startup_nodes = dynamic_startup_nodes
         self.address_remap = address_remap
 
         self.default_node: "ClusterNode" = None
@@ -1392,8 +1405,10 @@ async def initialize(self) -> None:
         # Set the tmp variables to the real variables
         self.slots_cache = tmp_slots
         self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
-        # Populate the startup nodes with all discovered nodes
-        self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
+
+        if self._dynamic_startup_nodes:
+            # Populate the startup nodes with all discovered nodes
+            self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
 
         # Set the default node
         self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py
index c95babf687..48ddd5e4f3 100644
--- a/tests/test_asyncio/test_cluster.py
+++ b/tests/test_asyncio/test_cluster.py
@@ -2620,6 +2620,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
                     assert rc.get_node(host=default_host, port=7001) is not None
                     assert rc.get_node(host=default_host, port=7002) is not None
 
+    @pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
+    async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
+        rc = await get_mocked_redis_client(
+            host="my@DNS.com",
+            port=7000,
+            cluster_slots=default_cluster_slots,
+            dynamic_startup_nodes=dynamic_startup_nodes,
+        )
+        # Nodes are taken from default_cluster_slots
+        discovered_nodes = [
+            "127.0.0.1:7000",
+            "127.0.0.1:7001",
+            "127.0.0.1:7002",
+            "127.0.0.1:7003",
+        ]
+        startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
+        if dynamic_startup_nodes is True:
+            assert startup_nodes.sort() == discovered_nodes.sort()
+        else:
+            assert startup_nodes == ["my@DNS.com:7000"]
+
 
 class TestClusterPipeline:
     """Tests for the ClusterPipeline class."""