Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 29, 2025
1 parent f397662 commit 965da38
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;

import java.time.Duration;

Expand Down Expand Up @@ -2058,7 +2059,7 @@ <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final StreamJoined<K, V, VO> streamJoined);

/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join.
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join.
* The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records into the internal {@link KTable} state.
Expand Down Expand Up @@ -2163,7 +2164,7 @@ <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner);

/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join.
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join.
* In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a
* {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional
* {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully.
Expand All @@ -2181,12 +2182,13 @@ <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
* Given that the {@link KTable} state is versioned, the lookup can use "event time", allowing out-of-order
* {@code KStream} records, to join to the right (older) version of a {@link KTable} record with the same key.
* Also, {@link KTable} out-of-order updates are handled correctly by the versioned state store.
* Note, that using a join grace-period introduces the notion of "late records", i.e., records with a timestamp
* smaller than the defined grace-period allows; these "late records" will be dropped, and not join computation
* Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp
* smaller than the defined grace-period allows; these late records will be dropped, and not join computation
* is triggered.
* Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides
* a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state.
* Using a versioned state store for the {@link KTable} also implies that the defined
* {@link VersionedBytesStoreSupplier#historyRetentionMs() history retention} provides
* a cut-off point, and late records will be dropped, not updating the {@link KTable} state.
*
* <p>If a join grace-period is specified, the {@code KStream} will be materialized in a local state store.
* For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog",
Expand All @@ -2211,7 +2213,7 @@ <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final Joined<K, V, VTable> joined);

/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
* Join records of this stream with {@link KTable}'s records using non-windowed left equi-join with default
* serializers and deserializers.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an
* output record (cf. below).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor<KStream, VStream, KTable, VTable, VOut> extends
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<KStream, VStream, VStream> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext<K1, VOut> internalProcessorContext;
private InternalProcessorContext<KStream, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;

Expand Down Expand Up @@ -122,16 +122,9 @@ protected void updateObservedStreamTime(final long timestamp) {
observedStreamTime = Math.max(observedStreamTime, timestamp);
}

<<<<<<< HEAD
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);
=======
@SuppressWarnings("unchecked")
private void doJoin(final Record<KStream, VStream> record) {
final KTable mappedKey = keyMapper.apply(record.key(), record.value());
final VTable value2 = getValue2(record, mappedKey);
>>>>>>> 500ffca9e3 (generics)
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}
Expand Down

0 comments on commit 965da38

Please sign in to comment.