Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3759 Additonal Trident Kafka Spout Metrics #3385

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_TIMESTAMP;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -38,10 +40,13 @@
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.RecordTranslator;
Expand All @@ -60,6 +65,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

// Metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all there variables be private? If some is used in unit test, use the modifiers with the smallest scope as possible and add @VisibleForTesting annotation to indicate that.

public static final String UNDERSCORE = "_";
public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
protected transient Gauge<Double> kafkaClientMaxLag;
public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove "Rate" from the metric name and variable name.

A meter has metrics like xx.m1_rate, xx.m15_rate, .count. Rate in the metric name is redundant and somewhat confusing.

protected transient Meter eventEmitRate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This builtin might be sufficient. Our metric names seem to be reporting values like __emit-count-s1 or __emit-count-s2 (there are almost 200 of these on our Yamas metric search, I'm guessing one per stream). Perhaps something is mis-configured in our topologies preventing it from extracting useful stream names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the recent change on metrics in storm (converting to v2 metrics), stream name will be appended to the metric name.

The tuple counting metric names contain "${stream_name}" or "${upstream_component}:${stream_name}". The former is used for all spout metrics and for outgoing bolt metrics (__emit-count and __transfer-count). The latter is used for bolt metrics that deal with incoming tuples.

DimensionalReporter can be used to separate dimensions (stream_name, componentId, etc) from metrics.


// Kafka
private final Consumer<K, V> consumer;
private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
Expand Down Expand Up @@ -87,7 +100,7 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig,

@VisibleForTesting
KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
this.topologyContext = topologyContext;
Expand All @@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig,
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp();
LOG.debug("Created {}", this.toString());

registerMetric();
}

/**
* Acquires metric instances through registration with the TopologyContext.
*/
private void registerMetric() {
LOG.info("Registering Spout Metrics");

String configGroupId = "";
if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) {
configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE;
}

eventEmitRate = topologyContext.registerMeter(
configGroupId + EVENT_EMIT_METRIC_NAME);
kafkaClientMaxLag = topologyContext.registerGauge(
configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME,
new Gauge<Double>() {
@Override
public Double getValue() {
if (consumer == null) {
return 0.0;
}
// Extract spout lag from consumer's internal metrics
for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) {
Metric metric = metricKeyVal.getValue();
if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, there are actually two types of "records-lag-max" metrics,

one is partition level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L133-L134

and the other one is consumer level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L98-L99

They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from consumer.metrics() is.

I noticed that older kafka version has different way of doing things .
https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L108

We will to take care of compatibility issues here.

return metric.value();
}
}
return 0.0;
}
});
}

/**
* Emit a batch that has already been emitted.
*/
public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> currBatch) {
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> currBatch) {

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();

Expand All @@ -115,12 +163,12 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect
if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId())
&& isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
LOG.debug("Skipping re-emit of batch that was originally emitted by another topology,"
+ " because the current first poll offset strategy ignores committed offsets.");
+ " because the current first poll offset strategy ignores committed offsets.");
return;
}

LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, currBatch, collector);
tx, currBatchPartition, currBatch, collector);

try {
// pause other topic-partitions to only poll from current topic-partition
Expand All @@ -129,9 +177,9 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
long seekOffset = currBatchMeta.getFirstOffset();
if (seekOffset < 0 && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) {
LOG.debug("Skipping re-emit of batch with negative starting offset."
+ " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition."
+ " It is not expected that Trident will replay such an empty batch,"
+ " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
+ " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition."
+ " It is not expected that Trident will replay such an empty batch,"
+ " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
return;
}
LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp);
Expand All @@ -145,9 +193,11 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
break;
}
if (record.offset() > currBatchMeta.getLastOffset()) {
throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch."
throw new RuntimeException(String.format(
"Error when re-emitting batch. Overshot the end of the batch."
+ " The batch end offset was [{%d}], but received [{%d}]."
+ " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.",
+ " Ensure log compaction is disabled in Kafka, since it is"
+ " incompatible with non-opaque transactional spouts.",
currBatchMeta.getLastOffset(), record.offset()));
}
emitTuple(collector, record);
Expand All @@ -157,17 +207,17 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], "
+ "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector);
+ "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector);
}

/**
* Emit a new batch.
*/
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
tx, currBatchPartition, lastBatch, collector);

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();

Expand Down Expand Up @@ -208,20 +258,21 @@ public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentC
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);

return currentBatch.toMap();
}

private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
return firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST
|| firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
|| firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
}

private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
final Set<TopicPartition> assignments = consumer.assignment();
if (!assignments.contains(currBatchTp)) {
throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned."
throw new IllegalStateException(
"The spout is asked to emit tuples on a partition it is not assigned."
+ " This indicates a bug in the TopicFilter or ManualPartitioner implementations."
+ " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "].");
}
Expand All @@ -230,6 +281,9 @@ private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record) {
final List<Object> tuple = translator.apply(record);
collector.emit(tuple);

// Track the number of records emitted
eventEmitRate.mark(tuple.size());
LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
}

Expand All @@ -247,8 +301,8 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record)
*/
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
if (isFirstPollSinceExecutorStarted(tp)) {
boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null
|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null
|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
consumer.seekToBeginning(Collections.singleton(tp));
Expand All @@ -269,7 +323,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet
consumer.seekToEnd(Collections.singleton(tp));
} else if (firstPollOffsetStrategy == UNCOMMITTED_TIMESTAMP) {
LOG.debug("First poll for topic partition [{}] with no last batch metadata, "
+ "seeking to partition based on startTimeStamp", tp);
+ "seeking to partition based on startTimeStamp", tp);
seekOffsetByStartTimeStamp(tp);
}
tpToFirstSeekOffset.put(tp, consumer.position(tp));
Expand All @@ -284,7 +338,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet
long initialFetchOffset = tpToFirstSeekOffset.get(tp);
consumer.seek(tp, initialFetchOffset);
LOG.debug("First poll for topic partition [{}], no last batch metadata present."
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
}

final long fetchOffset = consumer.position(tp);
Expand Down Expand Up @@ -327,15 +381,15 @@ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map
.collect(Collectors.toList());
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions);
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}

/**
* Get the partitions that should be handled by this task.
*/
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) {
List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) {
List<TopicPartition> tps = allPartitionInfoSorted.stream()
.map(kttp -> kttp.getTopicPartition())
.collect(Collectors.toList());
Expand Down Expand Up @@ -377,8 +431,8 @@ public void close() {
@Override
public final String toString() {
return super.toString()
+ "{kafkaSpoutConfig=" + kafkaSpoutConfig
+ '}';
+ "{kafkaSpoutConfig=" + kafkaSpoutConfig
+ '}';
}

/**
Expand All @@ -389,13 +443,13 @@ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceLi
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]",
consumer, partitions);
consumer, partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]",
consumer, partitions);
consumer, partitions);
}
}
}
Loading