4
4
*/
5
5
package io .strimzi .test .container ;
6
6
7
+ import org .apache .kafka .clients .admin .AdminClient ;
8
+ import org .apache .kafka .clients .admin .AdminClientConfig ;
9
+ import org .apache .kafka .clients .admin .NewTopic ;
10
+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
11
+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
12
+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
13
+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
14
+ import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
15
+ import org .apache .kafka .clients .producer .KafkaProducer ;
16
+ import org .apache .kafka .clients .producer .ProducerConfig ;
17
+ import org .apache .kafka .clients .producer .ProducerRecord ;
18
+ import org .apache .kafka .common .serialization .StringDeserializer ;
19
+ import org .apache .kafka .common .serialization .StringSerializer ;
7
20
import org .junit .jupiter .params .ParameterizedTest ;
8
21
import org .junit .jupiter .params .provider .MethodSource ;
9
22
import org .slf4j .Logger ;
13
26
import org .testcontainers .containers .Container ;
14
27
import org .testcontainers .containers .Network ;
15
28
import org .testcontainers .containers .ToxiproxyContainer ;
29
+ import org .testcontainers .shaded .com .google .common .collect .ImmutableMap ;
16
30
import org .testcontainers .utility .DockerImageName ;
17
31
import org .testcontainers .utility .MountableFile ;
18
32
21
35
import java .net .Socket ;
22
36
import java .net .SocketAddress ;
23
37
import java .net .SocketTimeoutException ;
38
+ import java .time .Duration ;
39
+ import java .util .Collection ;
40
+ import java .util .Collections ;
24
41
import java .util .HashMap ;
42
+ import java .util .Locale ;
25
43
import java .util .Map ;
44
+ import java .util .UUID ;
45
+ import java .util .concurrent .ExecutionException ;
46
+ import java .util .concurrent .TimeUnit ;
47
+ import java .util .concurrent .TimeoutException ;
26
48
27
49
import static org .hamcrest .CoreMatchers .containsString ;
28
50
import static org .hamcrest .CoreMatchers .equalTo ;
@@ -42,15 +64,16 @@ public class StrimziKafkaContainerIT extends AbstractIT {
42
64
@ MethodSource ("retrieveKafkaVersionsFile" )
43
65
void testStartContainerWithEmptyConfiguration (final String imageName ) {
44
66
assumeDocker ();
45
- systemUnderTest = new StrimziKafkaContainer (imageName )
46
- .withBrokerId (1 )
47
- .waitForRunning ();
48
- systemUnderTest .start ();
49
67
50
- assertThat (systemUnderTest .getBootstrapServers (), is ("PLAINTEXT://"
51
- + systemUnderTest .getContainerIpAddress () + ":" + systemUnderTest .getMappedPort (9092 )));
68
+ try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer (imageName )
69
+ .withBrokerId (1 )
70
+ .waitForRunning ()) {
52
71
53
- systemUnderTest .stop ();
72
+ systemUnderTest .start ();
73
+
74
+ assertThat (systemUnderTest .getBootstrapServers (), is ("PLAINTEXT://"
75
+ + systemUnderTest .getContainerIpAddress () + ":" + systemUnderTest .getMappedPort (9092 )));
76
+ }
54
77
}
55
78
56
79
@ ParameterizedTest (name = "testStartContainerWithSomeConfiguration-{0}" )
@@ -311,12 +334,76 @@ void testStartBrokerWithProxyContainer(final String imageName) {
311
334
systemUnderTest .stop ();
312
335
}
313
336
314
- @ ParameterizedTest (name = "testStartBrokerWithProxyContainer -{0}" )
337
+ @ ParameterizedTest (name = "testGetProxyWithNoContainer -{0}" )
315
338
@ MethodSource ("retrieveKafkaVersionsFile" )
316
339
void testGetProxyWithNoContainer (final String imageName ) {
317
340
systemUnderTest = new StrimziKafkaContainer (imageName )
318
341
.waitForRunning ();
319
342
systemUnderTest .start ();
320
343
assertThrows (IllegalStateException .class , () -> systemUnderTest .getProxy ());
321
344
}
345
+
346
+ @ Test
347
+ void testKafkaContainerFunctionality () {
348
+ // using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection)
349
+ try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer ()
350
+ .waitForRunning ()) {
351
+
352
+ systemUnderTest .start ();
353
+
354
+ try (final AdminClient adminClient = AdminClient .create (ImmutableMap .of (
355
+ AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , systemUnderTest .getBootstrapServers ()));
356
+ KafkaProducer <String , String > producer = new KafkaProducer <>(
357
+ ImmutableMap .of (
358
+ ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , systemUnderTest .getBootstrapServers (),
359
+ ProducerConfig .CLIENT_ID_CONFIG , UUID .randomUUID ().toString ()
360
+ ),
361
+ new StringSerializer (),
362
+ new StringSerializer ()
363
+ );
364
+ KafkaConsumer <String , String > consumer = new KafkaConsumer <>(
365
+ ImmutableMap .of (
366
+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , systemUnderTest .getBootstrapServers (),
367
+ ConsumerConfig .GROUP_ID_CONFIG , "tc-" + UUID .randomUUID (),
368
+ ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , OffsetResetStrategy .EARLIEST .name ().toLowerCase (Locale .ROOT )
369
+ ),
370
+ new StringDeserializer (),
371
+ new StringDeserializer ())) {
372
+
373
+ final String topicName = "example-topic" ;
374
+ final String recordKey = "strimzi" ;
375
+ final String recordValue = "the-best-project-in-the-world" ;
376
+
377
+ final Collection <NewTopic > topics = Collections .singletonList (new NewTopic (topicName , 1 , (short ) 1 ));
378
+ adminClient .createTopics (topics ).all ().get (30 , TimeUnit .SECONDS );
379
+
380
+ consumer .subscribe (Collections .singletonList (topicName ));
381
+
382
+ producer .send (new ProducerRecord <>(topicName , recordKey , recordValue )).get ();
383
+
384
+ Utils .waitFor ("Consumer records are present" , Duration .ofSeconds (10 ).toMillis (), Duration .ofMinutes (2 ).toMillis (),
385
+ () -> {
386
+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
387
+
388
+ if (records .isEmpty ()) {
389
+ return false ;
390
+ }
391
+
392
+ // verify count
393
+ assertThat (records .count (), is (1 ));
394
+
395
+ ConsumerRecord <String , String > consumerRecord = records .records (topicName ).iterator ().next ();
396
+
397
+ // verify content of the record
398
+ assertThat (consumerRecord .topic (), is (topicName ));
399
+ assertThat (consumerRecord .key (), is (recordKey ));
400
+ assertThat (consumerRecord .value (), is (recordValue ));
401
+
402
+ return true ;
403
+ });
404
+ } catch (ExecutionException | InterruptedException | TimeoutException e ) {
405
+ throw new RuntimeException (e );
406
+ }
407
+ }
408
+ }
322
409
}
0 commit comments