22
22
23
23
from kafkatest .services .console_consumer import ConsoleConsumer
24
24
from kafkatest .services .kafka import KafkaService , quorum
25
- from kafkatest .services .zookeeper import ZookeeperService
26
25
from kafkatest .utils .remote_account import line_count , file_exists
27
26
28
27
@@ -32,20 +31,14 @@ def __init__(self, test_context):
32
31
super (ConsoleConsumerTest , self ).__init__ (test_context )
33
32
34
33
self .topic = "topic"
35
- self .zk = ZookeeperService (test_context , num_nodes = 1 ) if quorum .for_test (test_context ) == quorum .zk else None
36
- self .kafka = KafkaService (self .test_context , num_nodes = 1 , zk = self .zk , zk_chroot = "/kafka" ,
34
+ self .kafka = KafkaService (self .test_context , num_nodes = 1 , zk = None ,
37
35
topics = {self .topic : {"partitions" : 1 , "replication-factor" : 1 }})
38
36
self .consumer = ConsoleConsumer (self .test_context , num_nodes = 1 , kafka = self .kafka , topic = self .topic )
39
37
40
- def setUp (self ):
41
- if self .zk :
42
- self .zk .start ()
43
-
44
38
@cluster (num_nodes = 3 )
45
39
@matrix (security_protocol = ['PLAINTEXT' , 'SSL' ], metadata_quorum = quorum .all_kraft )
46
40
@cluster (num_nodes = 4 )
47
- @matrix (security_protocol = ['SASL_SSL' ], sasl_mechanism = ['PLAIN' ], metadata_quorum = quorum .all_kraft )
48
- @matrix (security_protocol = ['SASL_SSL' ], sasl_mechanism = ['SCRAM-SHA-256' , 'SCRAM-SHA-512' ]) # SCRAM not yet supported with KRaft
41
+ @matrix (security_protocol = ['SASL_SSL' ], sasl_mechanism = ['PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' ], metadata_quorum = quorum .all_kraft )
49
42
@matrix (security_protocol = ['SASL_PLAINTEXT' , 'SASL_SSL' ], metadata_quorum = quorum .all_kraft )
50
43
def test_lifecycle (self , security_protocol , sasl_mechanism = 'GSSAPI' , metadata_quorum = quorum .zk ):
51
44
"""Check that console consumer starts/stops properly, and that we are capturing log output."""
0 commit comments