From 965da38f438938be15eecb3b6775f31a04972068 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 29 Jan 2025 12:20:53 -0800 Subject: [PATCH] review comments --- .../apache/kafka/streams/kstream/KStream.java | 18 ++++++++++-------- .../internals/KStreamKTableJoinProcessor.java | 9 +-------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index aa2cd51e0f7d2..3023ff2513c62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import java.time.Duration; @@ -2058,7 +2059,7 @@ KStream outerJoin(final KStream otherStream, final StreamJoined streamJoined); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join. + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join. * The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. * This is done by performing a lookup for matching records into the internal {@link KTable} state. @@ -2163,7 +2164,7 @@ KStream join(final KTable table, final ValueJoinerWithKey joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join. + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join. * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional * {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully. @@ -2181,12 +2182,13 @@ KStream join(final KTable table, * Given that the {@link KTable} state is versioned, the lookup can use "event time", allowing out-of-order * {@code KStream} records, to join to the right (older) version of a {@link KTable} record with the same key. * Also, {@link KTable} out-of-order updates are handled correctly by the versioned state store. - * Note, that using a join grace-period introduces the notion of "late records", i.e., records with a timestamp - * smaller than the defined grace-period allows; these "late records" will be dropped, and not join computation + * Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp + * smaller than the defined grace-period allows; these late records will be dropped, and not join computation * is triggered. - * Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides - * a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state. - + * Using a versioned state store for the {@link KTable} also implies that the defined + * {@link VersionedBytesStoreSupplier#historyRetentionMs() history retention} provides + * a cut-off point, and late records will be dropped, not updating the {@link KTable} state. + * *

If a join grace-period is specified, the {@code KStream} will be materialized in a local state store. * For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-<storename>-changelog", @@ -2211,7 +2213,7 @@ KStream join(final KTable table, final Joined joined); /** - * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default + * Join records of this stream with {@link KTable}'s records using non-windowed left equi-join with default * serializers and deserializers. * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an * output record (cf. below). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index 9b443f0fa5b37..86e1ece13aa1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor extends private final Optional gracePeriod; private TimeOrderedKeyValueBuffer buffer; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private final boolean useBuffer; private final String storeName; @@ -122,16 +122,9 @@ protected void updateObservedStreamTime(final long timestamp) { observedStreamTime = Math.max(observedStreamTime, timestamp); } -<<<<<<< HEAD - private void doJoin(final Record record) { - final K2 mappedKey = keyMapper.apply(record.key(), record.value()); - final V2 value2 = getValue2(record, mappedKey); -======= - @SuppressWarnings("unchecked") private void doJoin(final Record record) { final KTable mappedKey = keyMapper.apply(record.key(), record.value()); final VTable value2 = getValue2(record, mappedKey); ->>>>>>> 500ffca9e3 (generics) if (leftJoin || value2 != null) { internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2))); }