Skip to content

Commit

Permalink
KAFKA-1893: Allow regex subscriptions in the new consumer
Browse files Browse the repository at this point in the history
Author: Ashish Singh <[email protected]>

Reviewers: Jason Gustafson, Guozhang Wang, Edward Ribeiro, Ismael Juma

Closes apache#128 from SinghAsDev/KAFKA-1893
  • Loading branch information
Ashish Singh authored and guozhangwang committed Sep 10, 2015
1 parent b8b1bca commit fd12396
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 24 deletions.
70 changes: 64 additions & 6 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
*/
package org.apache.kafka.clients;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

/**
* A class encapsulating some of the logic around metadata.
* <p>
Expand All @@ -41,6 +43,8 @@ public final class Metadata {
private Cluster cluster;
private boolean needUpdate;
private final Set<String> topics;
private final List<Listener> listeners;
private boolean needMetadataForAllTopics;

/**
* Create a metadata instance with reasonable defaults
Expand All @@ -64,6 +68,8 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashSet<String>();
this.listeners = new ArrayList<>();
this.needMetadataForAllTopics = false;
}

/**
Expand Down Expand Up @@ -153,11 +159,17 @@ public synchronized void update(Cluster cluster, long now) {
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
this.cluster = cluster;

for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);

// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
Expand Down Expand Up @@ -186,4 +198,50 @@ public synchronized long lastSuccessfulUpdate() {
public long refreshBackoff() {
return refreshBackoffMs;
}

/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadaForAllTopics boolean indicating need for metadata of all topics in cluster.
*/
public void needMetadataForAllTopics(boolean needMetadaForAllTopics) {
this.needMetadataForAllTopics = needMetadaForAllTopics;
}

/**
* Get whether metadata for all topics is needed or not
*/
public boolean needMetadataForAllTopics() {
return this.needMetadataForAllTopics;
}

/**
* Add a Metadata listener that gets notified of metadata updates
*/
public void addListener(Listener listener) {
this.listeners.add(listener);
}

/**
* Stop notifying the listener of metadata updates
*/
public void removeListener(Listener listener) {
this.listeners.remove(listener);
}

/**
* MetadataUpdate Listener
*/
public interface Listener {
void onMetadataUpdate(Cluster cluster);
}

private Cluster getClusterForCurrentTopics(Cluster cluster) {
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
if (cluster != null) {
for (String topic : this.topics) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
}
return new Cluster(cluster.nodes(), partitionInfos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -598,7 +599,7 @@ private void maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();

if (canSendRequest(nodeConnectionId)) {
Set<String> topics = metadata.topics();
Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -55,6 +56,16 @@ public interface Consumer<K, V> extends Closeable {
*/
public void assign(List<TopicPartition> partitions);

/**
* @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
*/
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

/**
* @see KafkaConsumer#unsubscribe()
*/
public void unsubscribe();

/**
* @see KafkaConsumer#poll(long)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -57,6 +58,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

/**
* A Kafka client that consumes records from a Kafka cluster.
Expand Down Expand Up @@ -393,7 +395,7 @@
*
*/
@InterfaceStability.Unstable
public class KafkaConsumer<K, V> implements Consumer<K, V> {
public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long NO_CURRENT_THREAD = -1L;
Expand Down Expand Up @@ -673,6 +675,50 @@ public void subscribe(List<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}

/**
* Subscribes to topics matching specified pattern and uses the consumer's group
* management functionality. The pattern matching will be done periodically against topics
* existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
* belong to a particular group and will trigger a rebalance operation if one of the
* following events trigger -
* <ul>
* <li>Number of partitions change for any of the subscribed list of topics
* <li>Topic is created or deleted
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
*
* @param pattern Pattern to subscribe to
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
acquire();
try {
log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
this.metadata.needMetadataForAllTopics(true);
this.metadata.addListener(this);
} finally {
release();
}
}

/**
* Unsubscribe from topics currently subscribed to
*/
public void unsubscribe() {
acquire();
try {
this.subscriptions.unsubscribe();
this.metadata.needMetadataForAllTopics(false);
this.metadata.removeListener(this);
} finally {
release();
}
}

/**
* Assign a list of partition to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
Expand Down Expand Up @@ -1156,4 +1202,17 @@ private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}

@Override
public void onMetadataUpdate(Cluster cluster) {
final List<String> topicsToSubscribe = new ArrayList<>();

for (String topic : cluster.topics())
if (this.subscriptions.getSubscribedPattern().matcher(topic).matches())
topicsToSubscribe.add(topic);

subscriptions.changeSubscription(topicsToSubscribe);
metadata.setTopics(topicsToSubscribe);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;

/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
Expand Down Expand Up @@ -63,6 +64,20 @@ public synchronized void subscribe(List<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}

@Override
public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
List<String> topicsToSubscribe = new ArrayList<>();
for (String topic: partitions.keySet()) {
if (pattern.matcher(topic).matches() &&
!subscriptions.subscription().contains(topic))
topicsToSubscribe.add(topic);
}
ensureNotClosed();
this.subscriptions.changeSubscription(topicsToSubscribe);
}

@Override
public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
Expand All @@ -75,6 +90,12 @@ public synchronized void assign(List<TopicPartition> partitions) {
this.subscriptions.assign(partitions);
}

@Override
public void unsubscribe() {
ensureNotClosed();
subscriptions.unsubscribe();
}

@Override
public synchronized ConsumerRecords<K, V> poll(long timeout) {
ensureNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,8 @@ public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
long startTime = time.milliseconds();

while (time.milliseconds() - startTime < timeout) {
final Node node = client.leastLoadedNode();
if (node != null) {
MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
final RequestFuture<ClientResponse> requestFuture =
client.send(node, ApiKeys.METADATA, metadataRequest);

RequestFuture<ClientResponse> requestFuture = sendMetadataRequest();
if (requestFuture != null) {
client.poll(requestFuture);

if (requestFuture.succeeded()) {
Expand All @@ -203,6 +199,17 @@ public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
return topicsPartitionInfos;
}

/**
* Send Metadata Request to least loaded node in Kafka cluster asynchronously
* @return A future that indicates result of sent metadata request
*/
public RequestFuture<ClientResponse> sendMetadataRequest() {
final Node node = client.leastLoadedNode();
return node == null ? null :
client.send(
node, ApiKeys.METADATA, new MetadataRequest(Collections.<String>emptyList()));
}

/**
* Reset offsets for the given partition using the offset reset strategy.
*
Expand Down
Loading

0 comments on commit fd12396

Please sign in to comment.