@@ -64,7 +64,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
64
64
import org.apache.kafka.clients.consumer.KafkaConsumer
65
65
import org.apache.kafka.clients.consumer.OffsetAndMetadata
66
66
import org.apache.kafka.clients.consumer.RangeAssignor
67
- import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
68
67
import org.apache.kafka.common.metrics.Sensor
69
68
import org.apache.kafka.common.serialization.Deserializer
70
69
import org.apache.kafka.common.TopicPartition
@@ -195,7 +194,7 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
195
194
public fun <K , V > Flow <KafkaConsumer <K , V >>.subscribeTo (
196
195
name : String ,
197
196
dispatcher : CoroutineDispatcher = IO ,
198
- listener : ConsumerRebalanceListener = NoOpConsumerRebalanceListener () ,
197
+ listener : ConsumerRebalanceListener = NoOpConsumerRebalanceListener ,
199
198
timeout : kotlin.time.Duration = 500.milliseconds,
200
199
): Flow <ConsumerRecord <K , V >> = flatMapConcat { consumer ->
201
200
consumer.subscribeTo(name, dispatcher, listener, timeout)
@@ -212,7 +211,7 @@ public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
212
211
public fun <K , V > KafkaConsumer <K , V >.subscribeTo (
213
212
name : String ,
214
213
dispatcher : CoroutineDispatcher = IO ,
215
- listener : ConsumerRebalanceListener = NoOpConsumerRebalanceListener () ,
214
+ listener : ConsumerRebalanceListener = NoOpConsumerRebalanceListener ,
216
215
timeout : kotlin.time.Duration = 500.milliseconds,
217
216
): Flow <ConsumerRecord <K , V >> = flow {
218
217
subscribe(listOf (name), listener)
@@ -365,3 +364,8 @@ public data class ConsumerSettings<K, V>(
365
364
put(EXCLUDE_INTERNAL_TOPICS_CONFIG , excludeInternalTopics)
366
365
}
367
366
}
367
+
368
+ private object NoOpConsumerRebalanceListener : ConsumerRebalanceListener {
369
+ override fun onPartitionsRevoked (partitions : MutableCollection <TopicPartition >? ) {}
370
+ override fun onPartitionsAssigned (partitions : MutableCollection <TopicPartition >? ) {}
371
+ }
0 commit comments