Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][dvc][vpj][controller] Add getPositionByTimestamp as a replacement for offsetForTime #1540

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public boolean hasAnySubscription() {

@Override
public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition) {
pubSubTopicPartition = Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null");
Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null");
String topic = pubSubTopicPartition.getPubSubTopic().getName();
int partition = pubSubTopicPartition.getPartitionNumber();
TopicPartition tp = new TopicPartition(topic, partition);
Expand Down Expand Up @@ -431,8 +431,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetMap =
this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), timeout);
if (topicPartitionOffsetMap.isEmpty()) {
return -1L;
if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) {
return null;
}
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition);
if (offsetAndTimestamp == null) {
Expand Down Expand Up @@ -460,8 +460,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
pubSubTopicPartition.getPartitionNumber());
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetMap =
this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));
if (topicPartitionOffsetMap.isEmpty()) {
return -1L;
if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) {
return null;
}
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition);
if (offsetAndTimestamp == null) {
Expand All @@ -479,6 +479,27 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
}
}

@Override
public PubSubPosition getPositionByTimestamp(
PubSubTopicPartition pubSubTopicPartition,
long timestamp,
Duration timeout) {
Long offset = offsetForTime(pubSubTopicPartition, timestamp, timeout);
if (offset == null) {
return null;
}
return new ApacheKafkaOffsetPosition(offset);
}

@Override
public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
Long offset = offsetForTime(pubSubTopicPartition, timestamp);
if (offset == null) {
return null;
}
return new ApacheKafkaOffsetPosition(offset);
}

@Override
public Long beginningOffset(PubSubTopicPartition pubSubTopicPartition, Duration timeout) {
TopicPartition kafkaTp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,27 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
*/
Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout);

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
* will be returned for the partition.
*
* @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset.
* @param timestamp The target timestamp to search for in milliseconds since the Unix epoch.
* @param timeout The maximum duration to wait for the operation to complete.
* @return The offset of the first message with a timestamp greater than or equal to the target timestamp,
* or {@code null} if no such message is found for the partition.
* @throws PubSubOpTimeoutException If the operation times out while fetching the offset.
* @throws PubSubClientException If there is an error while attempting to fetch the offset.
*/
@UnderDevelopment("Under development and may change in the future.")
default PubSubPosition getPositionByTimestamp(
PubSubTopicPartition pubSubTopicPartition,
long timestamp,
Duration timeout) {
throw new UnsupportedOperationException("getPositionByTimestamp is not supported");
}

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
Expand All @@ -192,6 +213,23 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
*/
Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp);

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
* will be returned for the partition.
*
* @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset.
* @param timestamp The target timestamp to search for in milliseconds since the Unix epoch.
* @return The offset of the first message with a timestamp greater than or equal to the target timestamp,
* or {@code null} if no such message is found for the partition.
* @throws PubSubOpTimeoutException If the operation times out while fetching the offset.
* @throws PubSubClientException If there is an error while attempting to fetch the offset.
*/
@UnderDevelopment("Under development and may change in the future.")
default PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
throw new UnsupportedOperationException("getPositionByTimestamp is not supported");
}

/**
* Retrieves the beginning offset for the specified PubSub topic-partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubClientException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubClientRetriableException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubOpTimeoutException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicAuthorizationException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubUnsubscribedTopicPartitionException;
import com.linkedin.venice.serialization.KafkaKeySerializer;
Expand All @@ -60,6 +61,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -502,4 +504,142 @@ public void testIsValidTopicPartition() {
assertTrue(kafkaConsumerAdapter.isValidTopicPartition(pubSubTopicPartition));
verify(internalKafkaConsumer).partitionsFor(pubSubTopicPartition.getTopicName());
}

@Test
public void testOffsetForTimeWithTimeoutSuccess() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Long expectedOffset = 500L;
OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp);
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp);

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertEquals(actualOffset, expectedOffset);
}

@Test
public void testOffsetForTimeWithTimeoutReturnsNull() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.emptyMap();

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertNull(actualOffset);
}

@Test(expectedExceptions = PubSubOpTimeoutException.class)
public void testOffsetForTimeWithTimeoutThrowsTimeoutException() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenThrow(new TimeoutException("Test timeout"));

kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
}

@Test
public void testOffsetForTimeWithoutTimeoutSuccess() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Long expectedOffset = 500L;
OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp);
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp);

when(internalKafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp);
assertEquals(actualOffset, expectedOffset);
}

@Test
public void testGetPositionByTimestampWithTimeout() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
long expectedOffset = 500L;

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResponse =
Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp));

doReturn(offsetsForTimesResponse).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500));

PubSubPosition position =
kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertNotNull(position);
assertTrue(position instanceof ApacheKafkaOffsetPosition);
assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset);
}

@Test
public void testGetPositionByTimestampWithoutTimeout() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
long expectedOffset = 500L;
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResponse =
Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp));

doReturn(offsetsForTimesResponse).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp);
assertNotNull(position);
assertTrue(position instanceof ApacheKafkaOffsetPosition);
assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset);
}

@Test
public void testGetPositionByTimestampReturnsNull() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

doReturn(Collections.emptyMap()).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp);
assertNull(position);
}

@Test
public void testGetPositionByTimestampThrowsException() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

doThrow(new RuntimeException("Simulate exception")).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

Exception e = expectThrows(
PubSubClientException.class,
() -> kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp));
assertTrue(e.getMessage().contains("Failed to fetch offset for time"), "Actual message: " + e.getMessage());
}
}
Loading